[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] Change watches: operations block until everyone has acked.
# HG changeset patch # User cl349@xxxxxxxxxxxxxxxxxxxx # Node ID 4e833037159dd09a62ce10011592f9f67005a7e4 # Parent 997b2b07b96df5ddc1e382460c11ba59e11ee970 Change watches: operations block until everyone has acked. Watch events are no longer sent to self Watches no longer take a priority async and asyncwait commands for xs_test, now we need to continue despite blocking ops. Print test name at end of verbose run on failure. Use --trace-file arg to xenstored when testing Signed-off-by: Rusty Russel <rusty@xxxxxxxxxxxxxxx> Signed-off-by: Christian Limpach <Christian.Limpach@xxxxxxxxxxxx> diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/TODO --- a/tools/xenstore/TODO Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/TODO Tue Jul 26 15:20:09 2005 @@ -2,8 +2,9 @@ are omissions of important but necessary things. It is up to the reader to fill in the blanks. -- Remove calls to system() from daemon - Timeout failed watch responses -- Dynamic nodes +- Dynamic/supply nodes - Persistant storage of introductions, watches and transactions, so daemon can restart - Remove assumption that rename doesn't fail +- Multi-root transactions, for setting up front and back ends at same time. + diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/07watch.sh --- a/tools/xenstore/testsuite/07watch.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/07watch.sh Tue Jul 26 15:20:09 2005 @@ -3,45 +3,52 @@ # Watch something, write to it, check watch has fired. [ "`echo -e 'write /test create contents' | ./xs_test 2>&1`" = "" ] -[ "`echo -e '1 watch /test token 100 -2 write /test create contents2 +[ "`echo -e '1 watch /test token +2 async write /test create contents2 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test:token" ] # Check that reads don't set it off. -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 read /test 1 waitwatch' | ./xs_test 2>&1`" = "2:contents2 1:waitwatch timeout" ] # mkdir, setperm and rm should (also tests watching dirs) [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ] -[ "`echo -e '1 watch /dir token 100 -2 mkdir /dir/newdir +[ "`echo -e '1 watch /dir token +2 async mkdir /dir/newdir 1 waitwatch 1 ackwatch token -2 setperm /dir/newdir 0 READ +asyncwait +2 async setperm /dir/newdir 0 READ 1 waitwatch 1 ackwatch token -2 rm /dir/newdir +asyncwait +2 async rm /dir/newdir 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/dir/newdir:token 1:/dir/newdir:token 1:/dir/newdir:token" ] +# We don't get a watch from our own commands. +[ "`echo -e 'watch /dir token +mkdir /dir/newdir +waitwatch' | ./xs_test 2>&1`" = "waitwatch timeout" ] + # ignore watches while doing commands, should work. -[ "`echo -e 'watch /dir token 100 -write /dir/test create contents +[ "`echo -e 'watch /dir token +1 async write /dir/test create contents read /dir/test waitwatch ackwatch token' | ./xs_test 2>&1`" = "contents /dir/test:token" ] -# watch priority /test. -[ "`echo -e '1 watch /dir token1 1 -3 watch /dir token3 3 -2 watch /dir token2 2 -write /dir/test create contents +# watch priority test: all simultaneous +[ "`echo -e '1 watch /dir token1 +3 watch /dir token3 +2 watch /dir token2 +async write /dir/test create contents 3 waitwatch 3 ackwatch token3 2 waitwatch @@ -52,9 +59,9 @@ 1:/dir/test:token1" ] # If one dies (without acking), the other should still get ack. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 waitwatch 2 close 1 waitwatch @@ -62,51 +69,52 @@ 1:/dir/test:token1" ] # If one dies (without reading at all), the other should still get ack. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 close 1 waitwatch 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ] # unwatch -[ "`echo -e '1 watch /dir token1 0 +[ "`echo -e '1 watch /dir token1 1 unwatch /dir token1 -1 watch /dir token2 0 -2 write /dir/test2 create contents +1 watch /dir token2 +2 async write /dir/test2 create contents 1 waitwatch 1 unwatch /dir token2' | ./xs_test 2>&1`" = "1:/dir/test2:token2" ] # unwatch while watch pending. Next watcher gets the event. -[ "`echo -e '1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +2 watch /dir token2 +async write /dir/test create contents 2 unwatch /dir token2 1 waitwatch 1 ackwatch token1' | ./xs_test 2>&1`" = "1:/dir/test:token1" ] # unwatch while watch pending. Should clear this so we get next event. -[ "`echo -e '1 watch /dir token1 0 -write /dir/test create contents +[ "`echo -e '1 watch /dir token1 +async write /dir/test create contents 1 unwatch /dir token1 -1 watch /dir/test token2 0 -write /dir/test none contents2 +1 watch /dir/test token2 +asyncwait +async write /dir/test none contents2 1 waitwatch 1 ackwatch token2' | ./xs_test 2>&1`" = "1:/dir/test:token2" ] # check we only get notified once. -[ "`echo -e '1 watch /test token 100 -2 write /test create contents2 +[ "`echo -e '1 watch /test token +2 async write /test create contents2 1 waitwatch 1 ackwatch token 1 waitwatch' | ./xs_test 2>&1`" = "1:/test:token 1:waitwatch timeout" ] # watches are queued in order. -[ "`echo -e '1 watch / token 100 -2 write /test1 create contents -2 write /test2 create contents -2 write /test3 create contents +[ "`echo -e '1 watch / token +async 2 write /test1 create contents +async 2 write /test2 create contents +async 2 write /test3 create contents 1 waitwatch 1 ackwatch token 1 waitwatch @@ -117,9 +125,9 @@ 1:/test3:token" ] # Creation of subpaths should be covered correctly. -[ "`echo -e '1 watch / token 100 -2 write /test/subnode create contents2 -2 write /test/subnode/subnode create contents2 +[ "`echo -e '1 watch / token +2 async write /test/subnode create contents2 +2 async write /test/subnode/subnode create contents2 1 waitwatch 1 ackwatch token 1 waitwatch @@ -129,23 +137,23 @@ 1:waitwatch timeout" ] # Watch event must have happened before we registered interest. -[ "`echo -e '1 watch / token 100 -2 write /test/subnode create contents2 -2 watch / token2 0 +[ "`echo -e '1 watch / token +2 async write /test/subnode create contents2 +1 watch / token2 0 1 waitwatch 1 ackwatch token -2 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token -2:waitwatch timeout" ] +1 waitwatch' | ./xs_test 2>&1`" = "1:/test/subnode:token +1:waitwatch timeout" ] # Rm fires notification on child. -[ "`echo -e '1 watch /test/subnode token 100 -2 rm /test +[ "`echo -e '1 watch /test/subnode token +2 async rm /test 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/subnode:token" ] # Watch should not double-send after we ack, even if we did something in between. -[ "`echo -e '1 watch /test2 token 100 -2 write /test2/foo create contents2 +[ "`echo -e '1 watch /test2 token +2 async write /test2/foo create contents2 1 waitwatch 1 read /test2/foo 1 ackwatch token diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/08transaction.sh --- a/tools/xenstore/testsuite/08transaction.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/08transaction.sh Tue Jul 26 15:20:09 2005 @@ -45,37 +45,37 @@ sleep 1 rm /test/entry1 commit -dir /test' | ./xs_test`" = "" ] +dir /test' | ./xs_test --no-timeout`" = "" ] # ... as long as noone is waiting. [ "`echo -e '1 start /test 2 mkdir /test/dir 1 mkdir /test/dir 1 dir /test -1 commit' | ./xs_test 2>&1`" = "1:dir +1 commit' | ./xs_test --no-timeout 2>&1`" = "1:dir FATAL: 1: commit: Connection timed out" ] # Events inside transactions don't trigger watches until (successful) commit. -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ] -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub 2 abort 1 waitwatch' | ./xs_test 2>&1`" = "1:waitwatch timeout" ] -[ "`echo -e '1 watch /test token 100 +[ "`echo -e '1 watch /test token 2 start /test 2 mkdir /test/dir/sub -2 commit +2 async commit 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ] # Rm inside transaction works like rm outside: children get notified. -[ "`echo -e '1 watch /test/dir/sub token 100 +[ "`echo -e '1 watch /test/dir/sub token 2 start /test 2 rm /test/dir -2 commit +2 async commit 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "1:/test/dir/sub:token" ] diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/10domain-homedir.sh --- a/tools/xenstore/testsuite/10domain-homedir.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/10domain-homedir.sh Tue Jul 26 15:20:09 2005 @@ -13,8 +13,8 @@ # Place a watch using a relative path: expect relative answer. [ "`echo 'introduce 1 100 7 /home 1 mkdir foo -1 watch foo token 0 -write /home/foo/bar create contents +1 watch foo token +async write /home/foo/bar create contents 1 waitwatch 1 ackwatch token' | ./xs_test 2>&1`" = "handle is 1 1:foo/bar:token" ] diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/11domain-watch.sh --- a/tools/xenstore/testsuite/11domain-watch.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/11domain-watch.sh Tue Jul 26 15:20:09 2005 @@ -6,42 +6,46 @@ [ "`echo -e 'mkdir /dir' | ./xs_test 2>&1`" = "" ] [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /test token 100 -write /test create contents2 +1 watch /test token +async write /test create contents2 1 waitwatch 1 ackwatch token 1 unwatch /test token +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 1:/test:token" ] # ignore watches while doing commands, should work. [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /dir token 100 -1 write /dir/test create contents -1 read /dir/test +1 watch /dir token +async write /dir/test create contents +1 write /dir/test2 create contents2 +1 write /dir/test3 create contents3 +1 write /dir/test4 create contents4 1 waitwatch 1 ackwatch token +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 -1:contents 1:/dir/test:token" ] # unwatch [ "`echo -e 'introduce 1 100 7 /my/home -1 watch /dir token1 0 +1 watch /dir token1 1 unwatch /dir token1 -1 watch /dir token2 0 -2 write /dir/test2 create contents +1 watch /dir token2 +async 2 write /dir/test2 create contents 1 waitwatch 1 unwatch /dir token2 +asyncwait release 1' | ./xs_test 2>&1`" = "handle is 1 1:/dir/test2:token2" ] # unwatch while watch pending. [ "`echo -e 'introduce 1 100 7 /my/home introduce 2 101 8 /my/secondhome -1 watch /dir token1 0 -2 watch /dir token2 1 -write /dir/test create contents +1 watch /dir token1 +2 watch /dir token2 +3 async write /dir/test create contents 2 unwatch /dir token2 1 waitwatch 1 ackwatch token1 diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/12readonly.sh --- a/tools/xenstore/testsuite/12readonly.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/12readonly.sh Tue Jul 26 15:20:09 2005 @@ -9,7 +9,7 @@ [ "`echo 'read /test getperm /test -watch /test token 0 +watch /test token unwatch /test token start / commit @@ -27,7 +27,7 @@ # Check that watches work like normal. set -m -[ "`echo 'watch / token 0 +[ "`echo 'watch / token waitwatch ackwatch token' | ./xs_test --readonly 2>&1`" = "/test:token" ] & @@ -36,6 +36,3 @@ echo Readonly wait test failed: $? exit 1 fi - - - diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/13watch-ack.sh --- a/tools/xenstore/testsuite/13watch-ack.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/13watch-ack.sh Tue Jul 26 15:20:09 2005 @@ -15,8 +15,9 @@ [ "`echo '1 watch /test/1 token1 0 1 watch /test/2 token2 0 1 watch /test/3 token3 0 -2 write /test/2 create contents2 +2 async write /test/2 create contents2 1 waitwatch -2 write /test/1 create contents1 -2 write /test/3 create contents3 -1 ackwatch token2' | ./xs_test 2>&1`" = "1:/test/2:token2" ] +3 async write /test/1 create contents1 +4 async write /test/3 create contents3 +1 ackwatch token2 +1 close' | ./xs_test 2>&1`" = "1:/test/2:token2" ] diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/testsuite/test.sh --- a/tools/xenstore/testsuite/test.sh Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/testsuite/test.sh Tue Jul 26 15:20:09 2005 @@ -9,7 +9,7 @@ mkdir $XENSTORED_ROOTDIR # Weird failures with this. if type valgrind >/dev/null 2>&1; then - valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors & + valgrind -q --logfile-fd=3 ./xenstored_test --output-pid --trace-file=testsuite/tmp/trace --no-fork 3>testsuite/tmp/vgout > /tmp/pid 2> testsuite/tmp/xenstored_errors & while [ ! -s /tmp/pid ]; do sleep 0; done PID=`cat /tmp/pid` rm /tmp/pid @@ -38,7 +38,9 @@ echo Test $f passed... else echo Test $f failed, running verbosely... - run_test $f -x + run_test $f -x || true + # That will have filled the screen, repeat message. + echo Test $f failed exit 1 fi done diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xenstored_core.c --- a/tools/xenstore/xenstored_core.c Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xenstored_core.c Tue Jul 26 15:20:09 2005 @@ -51,7 +51,7 @@ #include "xenstored_domain.h" static bool verbose; -static LIST_HEAD(connections); +LIST_HEAD(connections); static int tracefd = -1; #ifdef TESTING @@ -959,8 +959,11 @@ } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_WRITE; + return; + } send_ack(conn, XS_WRITE); - fire_watches(conn->transaction, node, false); } static void do_mkdir(struct connection *conn, const char *node) @@ -985,8 +988,11 @@ } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_MKDIR; + return; + } send_ack(conn, XS_MKDIR); - fire_watches(conn->transaction, node, false); } static void do_rm(struct connection *conn, const char *node) @@ -1023,8 +1029,11 @@ } add_change_node(conn->transaction, node, true); + if (fire_watches(conn, node, true)) { + conn->watch_ack = XS_RM; + return; + } send_ack(conn, XS_RM); - fire_watches(conn->transaction, node, true); } static void do_get_perms(struct connection *conn, const char *node) @@ -1095,8 +1104,11 @@ } add_change_node(conn->transaction, node, false); + if (fire_watches(conn, node, false)) { + conn->watch_ack = XS_SET_PERMS; + return; + } send_ack(conn, XS_SET_PERMS); - fire_watches(conn->transaction, node, false); } /* Process "in" for conn: "in" will vanish after this conversation, so @@ -1321,14 +1333,23 @@ struct connection *i, *tmp; list_for_each_entry_safe(i, tmp, &connections, list) { - if (i->state == OK) - continue; - - if (!transaction_covering_node(i->blocked_by)) { - talloc_free(i->blocked_by); - i->blocked_by = NULL; - i->state = OK; - consider_message(i); + switch (i->state) { + case BLOCKED: + if (!transaction_covering_node(i->blocked_by)) { + talloc_free(i->blocked_by); + i->blocked_by = NULL; + i->state = OK; + consider_message(i); + } + break; + case WATCHED: + if (i->watches_unacked == 0) { + i->state = OK; + send_ack(i, i->watch_ack); + } + break; + case OK: + break; } } @@ -1351,6 +1372,8 @@ new->state = OK; new->blocked_by = NULL; + new->watch_ack = XS_ERROR; + new->watches_unacked = 0; new->out = new->waiting_reply = NULL; new->fd = -1; new->id = 0; @@ -1359,6 +1382,7 @@ new->write = write; new->read = read; new->can_write = true; + INIT_LIST_HEAD(&new->watches); talloc_set_fail_handler(out_of_mem, &talloc_fail); if (setjmp(talloc_fail)) { @@ -1430,13 +1454,12 @@ printf(" state = %s\n", i->state == OK ? "OK" : i->state == BLOCKED ? "BLOCKED" + : i->state == WATCHED ? "WATCHED" : "INVALID"); if (i->id) printf(" id = %i\n", i->id); if (i->blocked_by) printf(" blocked on = %s\n", i->blocked_by); - if (i->waiting_for_ack) - printf(" waiting_for_ack TRUE\n"); if (!i->in->inhdr || i->in->used) printf(" got %i bytes of %s\n", i->in->used, i->in->inhdr ? "header" : "data"); diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xenstored_core.h --- a/tools/xenstore/xenstored_core.h Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xenstored_core.h Tue Jul 26 15:20:09 2005 @@ -51,6 +51,8 @@ { /* Blocked by transaction. */ BLOCKED, + /* Waiting for watchers to ack event we caused */ + WATCHED, /* Completed */ OK, }; @@ -70,6 +72,12 @@ /* Node we are waiting for (if state == BLOCKED) */ char *blocked_by; + + /* Are we waiting for watches to be acked from an event we caused? */ + unsigned int watches_unacked; + + /* Type of ack to send once watches fired. */ + enum xsd_sockmsg_type watch_ack; /* Is this a read-only connection? */ bool can_write; @@ -92,10 +100,14 @@ /* The domain I'm associated with, if any. */ struct domain *domain; + /* My watches. */ + struct list_head watches; + /* Methods for communicating over this connection: write can be NULL */ connwritefn_t *write; connreadfn_t *read; }; +extern struct list_head connections; /* Return length of string (including nul) at this offset. */ unsigned int get_string(const struct buffered_data *data, diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xenstored_transaction.c --- a/tools/xenstore/xenstored_transaction.c Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xenstored_transaction.c Tue Jul 26 15:20:09 2005 @@ -288,7 +288,6 @@ static bool commit_transaction(struct transaction *trans) { char *tmp, *dir; - struct changed_node *i; /* Move: orig -> .old, repl -> orig. Cleanup deletes .old. */ dir = node_dir_outside_transaction(trans->node); @@ -301,15 +300,15 @@ trans->divert, dir); trans->divert = tmp; - - /* Fire off the watches for everything that changed. */ - list_for_each_entry(i, &trans->changes, list) - fire_watches(NULL, i->node, i->recurse); return true; } void do_transaction_end(struct connection *conn, const char *arg) { + struct changed_node *i; + struct transaction *trans; + bool fired = false; + if (!arg || (!streq(arg, "T") && !streq(arg, "F"))) { send_error(conn, EINVAL); return; @@ -320,24 +319,30 @@ return; } + /* Set to NULL so fire_watches sends events. */ + trans = conn->transaction; + conn->transaction = NULL; + /* Attach transaction to arg for auto-cleanup */ + talloc_steal(arg, trans); + if (streq(arg, "T")) { - if (conn->transaction->destined_to_fail) { + if (trans->destined_to_fail) { send_error(conn, ETIMEDOUT); - goto failed; + return; } - if (!commit_transaction(conn->transaction)) { + if (!commit_transaction(trans)) { send_error(conn, errno); - goto failed; + return; } - } - - talloc_free(conn->transaction); - conn->transaction = NULL; - send_ack(conn, XS_TRANSACTION_END); - return; - -failed: - talloc_free(conn->transaction); - conn->transaction = NULL; -} - + + /* Fire off the watches for everything that changed. */ + list_for_each_entry(i, &trans->changes, list) + fired |= fire_watches(conn, i->node, i->recurse); + } + + if (fired) + conn->watch_ack = XS_TRANSACTION_END; + else + send_ack(conn, XS_TRANSACTION_END); +} + diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xenstored_watch.c --- a/tools/xenstore/xenstored_watch.c Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xenstored_watch.c Tue Jul 26 15:20:09 2005 @@ -33,69 +33,39 @@ #include "xenstored_domain.h" /* FIXME: time out unacked watches. */ - -/* We create this if anyone is interested "node", then we pass it from - * watch to watch as each connection acks it. - */ struct watch_event { - /* The watch we are firing for (watch->events) */ + /* The events on this watch. */ struct list_head list; - /* Watches we need to fire for (watches[0]->events == this). */ - struct watch **watches; - unsigned int num_watches; - - struct timeval timeout; - - /* Name of node which changed. */ - char *node; - - /* For remove, we trigger on all the children of this node too. */ - bool recurse; + /* Data to send (node\0token\0). */ + unsigned int len; + char *data; + + /* Connection which caused watch event (which we are blocking) */ + struct connection *cause; }; struct watch { + /* Watches on this connection */ struct list_head list; - unsigned int priority; /* Current outstanding events applying to this watch. */ struct list_head events; /* Is this relative to connnection's implicit path? */ - bool relative; + const char *relative_path; char *token; char *node; - struct connection *conn; }; -static LIST_HEAD(watches); - -static struct watch_event *get_first_event(struct connection *conn) -{ - struct watch *watch; - struct watch_event *event; - - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - - event = list_top(&watch->events, struct watch_event, list); - if (event) - return event; - } - return NULL; -} /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn) { struct watch_event *event; - const char *node; - char *buffer; - unsigned int len; + struct watch *watch; /* We had a reply queued already? Send it: other end will * discard watch. */ @@ -110,170 +80,93 @@ if (conn->waiting_for_ack) return; - event = get_first_event(conn); - if (!event) - return; - - /* If we decide to cancel, we will reset this. */ - conn->waiting_for_ack = event->watches[0]; - - /* If we deleted /foo and they're watching /foo/bar, that's what we - * tell them has changed. */ - if (!is_child(event->node, event->watches[0]->node)) { - assert(event->recurse); - node = event->watches[0]->node; - } else - node = event->node; - - /* If watch placed using relative path, give them relative answer. */ - if (event->watches[0]->relative) { - node += strlen(get_implicit_path(conn)); - if (node[0] == '/') /* Could be "". */ + list_for_each_entry(watch, &conn->watches, list) { + event = list_top(&watch->events, struct watch_event, list); + if (event) { + conn->waiting_for_ack = watch; + send_reply(conn,XS_WATCH_EVENT,event->data,event->len); + break; + } + } +} + +static int destroy_watch_event(void *_event) +{ + struct watch_event *event = _event; + + trace_destroy(event, "watch_event"); + assert(event->cause->watches_unacked != 0); + /* If it hits zero, will unblock in unblock_connections. */ + event->cause->watches_unacked--; + return 0; +} + +static void add_event(struct connection *cause, struct watch *watch, + const char *node) +{ + struct watch_event *event; + + if (watch->relative_path) { + node += strlen(watch->relative_path); + if (*node == '/') /* Could be "" */ node++; } - /* Create reply from path and token */ - len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1; - buffer = talloc_array(conn, char, len); - strcpy(buffer, node); - strcpy(buffer+strlen(node)+1, event->watches[0]->token); - send_reply(conn, XS_WATCH_EVENT, buffer, len); - talloc_free(buffer); -} - -static struct watch **find_watches(const char *node, bool recurse, - unsigned int *num) -{ - struct watch *i; - struct watch **ret = NULL; - - *num = 0; - - /* We include children too if this is an rm. */ - list_for_each_entry(i, &watches, list) { - if (is_child(node, i->node) || - (recurse && is_child(i->node, node))) { - (*num)++; - ret = talloc_realloc(node, ret, struct watch *, *num); - ret[*num - 1] = i; - } - } - return ret; + event = talloc(watch, struct watch_event); + event->len = strlen(node) + 1 + strlen(watch->token) + 1; + event->data = talloc_array(event, char, event->len); + strcpy(event->data, node); + strcpy(event->data + strlen(node) + 1, watch->token); + event->cause = cause; + cause->watches_unacked++; + talloc_set_destructor(event, destroy_watch_event); + list_add_tail(&event->list, &watch->events); + trace_create(event, "watch_event"); } /* FIXME: we fail to fire on out of memory. Should drop connections. */ -void fire_watches(struct transaction *trans, const char *node, bool recurse) -{ - struct watch **watches; - struct watch_event *event; - unsigned int num_watches; +bool fire_watches(struct connection *conn, const char *node, bool recurse) +{ + struct connection *i; + struct watch *watch; /* During transactions, don't fire watches. */ - if (trans) - return; - - watches = find_watches(node, recurse, &num_watches); - if (!watches) - return; - - /* Create and fill in info about event. */ - event = talloc(talloc_autofree_context(), struct watch_event); - event->node = talloc_strdup(event, node); - - /* Tie event to this watch. */ - event->watches = watches; - talloc_steal(event, watches); - event->num_watches = num_watches; - event->recurse = recurse; - list_add_tail(&event->list, &watches[0]->events); - - /* Warn if not finished after thirty seconds. */ - gettimeofday(&event->timeout, NULL); - event->timeout.tv_sec += 30; - - /* If connection not doing anything, queue this. */ - if (!watches[0]->conn->out) - queue_next_event(watches[0]->conn); -} - -/* We're done with this event: see if anyone else wants it. */ -static void move_event_onwards(struct watch_event *event) -{ - list_del(&event->list); - - event->num_watches--; - event->watches++; - if (!event->num_watches) { - talloc_free(event); - return; - } - - list_add_tail(&event->list, &event->watches[0]->events); - - /* If connection not doing anything, queue this. */ - if (!event->watches[0]->conn->out) - queue_next_event(event->watches[0]->conn); -} - -static void remove_watch_from_events(struct watch *dying_watch) -{ - struct watch *watch; - struct watch_event *event; - unsigned int i; - - list_for_each_entry(watch, &watches, list) { - list_for_each_entry(event, &watch->events, list) { - for (i = 0; i < event->num_watches; i++) { - if (event->watches[i] != dying_watch) - continue; - - assert(i != 0); - memmove(event->watches+i, - event->watches+i+1, - (event->num_watches - (i+1)) - * sizeof(struct watch *)); - event->num_watches--; - } - } - } + if (conn->transaction) + return false; + + assert(conn->state == OK); + + /* Create an event for each watch. Don't send to self. */ + list_for_each_entry(i, &connections, list) { + if (i == conn) + continue; + + list_for_each_entry(watch, &i->watches, list) { + if (is_child(node, watch->node)) + add_event(conn, watch, node); + else if (recurse && is_child(watch->node, node)) + add_event(conn, watch, watch->node); + else + continue; + conn->state = WATCHED; + /* If connection not doing anything, queue this. */ + if (!i->out) + queue_next_event(i); + } + } + return conn->state == WATCHED; } static int destroy_watch(void *_watch) { - struct watch *watch = _watch; - struct watch_event *event; - - /* If we have pending events, pass them on to others. */ - while ((event = list_top(&watch->events, struct watch_event, list))) - move_event_onwards(event); - - /* Remove from global list. */ - list_del(&watch->list); - - /* Other events which match this watch must be cleared. */ - remove_watch_from_events(watch); - - trace_destroy(watch, "watch"); + trace_destroy(_watch, "watch"); return 0; } -/* We keep watches in priority order. */ -static void insert_watch(struct watch *watch) -{ - struct watch *i; - - list_for_each_entry(i, &watches, list) { - if (i->priority <= watch->priority) { - list_add_tail(&watch->list, &i->list); - return; - } - } - - list_add_tail(&watch->list, &watches); -} - void shortest_watch_ack_timeout(struct timeval *tv) { + (void)tv; +#if 0 /* FIXME */ struct watch *watch; list_for_each_entry(watch, &watches, list) { @@ -285,10 +178,12 @@ *tv = i->timeout; } } +#endif } void check_watch_ack_timeout(void) { +#if 0 struct watch *watch; struct timeval now; @@ -308,12 +203,13 @@ } } } +#endif } void do_watch(struct connection *conn, struct buffered_data *in) { struct watch *watch; - char *vec[3]; + char *vec[2]; bool relative; if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) { @@ -331,14 +227,16 @@ watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); watch->token = talloc_strdup(watch, vec[1]); - watch->conn = conn; - watch->priority = strtoul(vec[2], NULL, 0); - watch->relative = relative; + if (relative) + watch->relative_path = get_implicit_path(conn); + else + watch->relative_path = NULL; + INIT_LIST_HEAD(&watch->events); - insert_watch(watch); + list_add_tail(&watch->list, &conn->watches); + trace_create(watch, "watch"); talloc_set_destructor(watch, destroy_watch); - trace_create(watch, "watch"); send_ack(conn, XS_WATCH); } @@ -356,9 +254,6 @@ return; } - event = list_top(&conn->waiting_for_ack->events, - struct watch_event, list); - assert(event->watches[0] == conn->waiting_for_ack); if (!streq(conn->waiting_for_ack->token, token)) { /* They're confused: this will cause us to send event again */ conn->waiting_for_ack = NULL; @@ -366,7 +261,12 @@ return; } - move_event_onwards(event); + /* Remove event: after ack sent, core will call queue_next_event */ + event = list_top(&conn->waiting_for_ack->events, struct watch_event, + list); + list_del(&event->list); + talloc_free(event); + conn->waiting_for_ack = NULL; send_ack(conn, XS_WATCH_ACK); } @@ -385,11 +285,9 @@ * watch we're deleting: conn->waiting_for_ack was reset by * this command in consider_message anyway. */ node = canonicalize(conn, vec[0]); - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - + list_for_each_entry(watch, &conn->watches, list) { if (streq(watch->node, node) && streq(watch->token, vec[1])) { + list_del(&watch->list); talloc_free(watch); send_ack(conn, XS_UNWATCH); return; @@ -404,15 +302,16 @@ struct watch *watch; struct watch_event *event; - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - - printf(" watch on %s token %s prio %i\n", - watch->node, watch->token, watch->priority); + if (conn->waiting_for_ack) + printf(" waiting_for_ack for watch on %s token %s\n", + conn->waiting_for_ack->node, + conn->waiting_for_ack->token); + + list_for_each_entry(watch, &conn->watches, list) { + printf(" watch on %s token %s\n", + watch->node, watch->token); list_for_each_entry(event, &watch->events, list) - printf(" event: %s\n", event->node); + printf(" event: %s\n", event->data); } } #endif diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xenstored_watch.h --- a/tools/xenstore/xenstored_watch.h Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xenstored_watch.h Tue Jul 26 15:20:09 2005 @@ -32,8 +32,10 @@ /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn); -/* Fire all watches: recurse means all the children are effected (ie. rm) */ -void fire_watches(struct transaction *trans, const char *node, bool recurse); +/* Fire all watches: recurse means all the children are effected (ie. rm). + * Returns true if there were any, meaning connection has to wait. + */ +bool fire_watches(struct connection *conn, const char *node, bool recurse); /* Find shortest timeout: if any, reduce tv (may already be set). */ void shortest_watch_ack_timeout(struct timeval *tv); diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xs.c --- a/tools/xenstore/xs.c Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xs.c Tue Jul 26 15:20:09 2005 @@ -401,22 +401,16 @@ /* Watch a node for changes (poll on fd to detect, or call read_watch()). * When the node (or any child) changes, fd will become readable. * Token is returned when watch is read, to allow matching. - * Priority indicates order if multiple watchers: higher is first. * Returns false on failure. */ -bool xs_watch(struct xs_handle *h, const char *path, const char *token, - unsigned int priority) -{ - char prio[MAX_STRLEN(priority)]; - struct iovec iov[3]; - - sprintf(prio, "%u", priority); +bool xs_watch(struct xs_handle *h, const char *path, const char *token) +{ + struct iovec iov[2]; + iov[0].iov_base = (void *)path; iov[0].iov_len = strlen(path) + 1; iov[1].iov_base = (void *)token; iov[1].iov_len = strlen(token) + 1; - iov[2].iov_base = prio; - iov[2].iov_len = strlen(prio) + 1; return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); } diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xs.h --- a/tools/xenstore/xs.h Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xs.h Tue Jul 26 15:20:09 2005 @@ -82,11 +82,9 @@ /* Watch a node for changes (poll on fd to detect, or call read_watch()). * When the node (or any child) changes, fd will become readable. * Token is returned when watch is read, to allow matching. - * Priority indicates order if multiple watchers: higher is first. * Returns false on failure. */ -bool xs_watch(struct xs_handle *h, const char *path, const char *token, - unsigned int priority); +bool xs_watch(struct xs_handle *h, const char *path, const char *token); /* Return the FD to poll on to see if a watch has fired. */ int xs_fileno(struct xs_handle *h); diff -r 997b2b07b96d -r 4e833037159d tools/xenstore/xs_test.c --- a/tools/xenstore/xs_test.c Tue Jul 26 15:13:56 2005 +++ b/tools/xenstore/xs_test.c Tue Jul 26 15:20:09 2005 @@ -20,6 +20,7 @@ #include <stdio.h> #include <stdlib.h> #include <sys/types.h> +#include <sys/wait.h> #include <sys/stat.h> #include <fcntl.h> #include <signal.h> @@ -33,6 +34,10 @@ #define XSTEST static struct xs_handle *handles[10] = { NULL }; +static unsigned int children; + +static bool timeout = true; +static bool readonly = false; struct ringbuf_head { @@ -173,7 +178,9 @@ " getperm <path>\n" " setperm <path> <id> <flags> ...\n" " shutdown\n" - " watch <path> <token> <prio>\n" + " watch <path> <token>\n" + " async <command>...\n" + " asyncwait\n" " waitwatch\n" " ackwatch <token>\n" " unwatch <path> <token>\n" @@ -186,22 +193,34 @@ " dump\n"); } +static int argpos(const char *line, unsigned int num) +{ + unsigned int i, len = 0, off = 0; + + for (i = 0; i <= num; i++) { + off += len; + off += strspn(line + off, " \t\n"); + len = strcspn(line + off, " \t\n"); + if (!len) + return off; + } + return off; +} + static char *arg(char *line, unsigned int num) { static char *args[10]; - unsigned int i, len = 0; - - for (i = 0; i <= num; i++) { - line += len; - line += strspn(line, " \t\n"); - len = strcspn(line, " \t\n"); - if (!len) - barf("Can't get arg %u", num); - } + unsigned int off, len; + + off = argpos(line, num); + len = strcspn(line + off, " \t\n"); + + if (!len) + barf("Can't get arg %u", num); free(args[num]); args[num] = malloc(len + 1); - memcpy(args[num], line, len); + memcpy(args[num], line+off, len); args[num][len] = '\0'; return args[num]; } @@ -360,10 +379,9 @@ failed(handle); } -static void do_watch(unsigned int handle, const char *node, const char *token, - const char *pri) -{ - if (!xs_watch(handles[handle], node, token, atoi(pri))) +static void do_watch(unsigned int handle, const char *node, const char *token) +{ + if (!xs_watch(handles[handle], node, token)) failed(handle); } @@ -386,6 +404,47 @@ { if (!xs_acknowledge_watch(handles[handle], token)) failed(handle); +} + +/* Async wait for watch on handle */ +static void do_command(unsigned int default_handle, char *line); +static void do_async(unsigned int handle, char *line) +{ + int child; + unsigned int i; + children++; + if ((child = fork()) != 0) + return; + + /* Don't keep other handles open in parent. */ + for (i = 0; i < ARRAY_SIZE(handles); i++) { + if (handles[i] && i != handle) { + xs_daemon_close(handles[i]); + handles[i] = NULL; + } + } + + do_command(handle, line + argpos(line, 1)); + exit(0); +} + +static void do_asyncwait(unsigned int handle) +{ + int status; + + if (handle) + barf("handle has no meaning with asyncwait"); + + if (children == 0) + barf("No children to wait for!"); + + if (waitpid(0, &status, 0) > 0) { + if (!WIFEXITED(status)) + barf("async died"); + if (WEXITSTATUS(status)) + exit(WEXITSTATUS(status)); + } + children--; } static void do_unwatch(unsigned int handle, const char *node, const char *token) @@ -533,23 +592,106 @@ free(subdirs); } +static int handle; + +static void alarmed(int sig __attribute__((unused))) +{ + if (handle) { + char handlename[10]; + sprintf(handlename, "%u:", handle); + write(STDOUT_FILENO, handlename, strlen(handlename)); + } + write(STDOUT_FILENO, command, strlen(command)); + write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n")); + exit(1); +} + +static void do_command(unsigned int default_handle, char *line) +{ + char *endp; + + if (strspn(line, " \n") == strlen(line)) + return; + if (strstarts(line, "#")) + return; + + handle = strtoul(line, &endp, 10); + if (endp != line) + memmove(line, endp+1, strlen(endp)); + else + handle = default_handle; + + if (!handles[handle]) { + if (readonly) + handles[handle] = xs_daemon_open_readonly(); + else + handles[handle] = xs_daemon_open(); + if (!handles[handle]) + barf_perror("Opening connection to daemon"); + } + command = arg(line, 0); + + if (timeout) + alarm(5); + + if (streq(command, "dir")) + do_dir(handle, arg(line, 1)); + else if (streq(command, "read")) + do_read(handle, arg(line, 1)); + else if (streq(command, "write")) + do_write(handle, + arg(line, 1), arg(line, 2), arg(line, 3)); + else if (streq(command, "setid")) + do_setid(handle, arg(line, 1)); + else if (streq(command, "mkdir")) + do_mkdir(handle, arg(line, 1)); + else if (streq(command, "rm")) + do_rm(handle, arg(line, 1)); + else if (streq(command, "getperm")) + do_getperm(handle, arg(line, 1)); + else if (streq(command, "setperm")) + do_setperm(handle, arg(line, 1), line); + else if (streq(command, "shutdown")) + do_shutdown(handle); + else if (streq(command, "watch")) + do_watch(handle, arg(line, 1), arg(line, 2)); + else if (streq(command, "waitwatch")) + do_waitwatch(handle); + else if (streq(command, "async")) + do_async(handle, line); + else if (streq(command, "asyncwait")) + do_asyncwait(handle); + else if (streq(command, "ackwatch")) + do_ackwatch(handle, arg(line, 1)); + else if (streq(command, "unwatch")) + do_unwatch(handle, arg(line, 1), arg(line, 2)); + else if (streq(command, "close")) { + xs_daemon_close(handles[handle]); + handles[handle] = NULL; + } else if (streq(command, "start")) + do_start(handle, arg(line, 1)); + else if (streq(command, "commit")) + do_end(handle, false); + else if (streq(command, "abort")) + do_end(handle, true); + else if (streq(command, "introduce")) + do_introduce(handle, arg(line, 1), arg(line, 2), + arg(line, 3), arg(line, 4)); + else if (streq(command, "release")) + do_release(handle, arg(line, 1)); + else if (streq(command, "dump")) + dump(handle); + else if (streq(command, "sleep")) + sleep(atoi(arg(line, 1))); + else + barf("Unknown command %s", command); + fflush(stdout); + alarm(0); +} + int main(int argc, char *argv[]) { char line[1024]; - bool readonly = false, timeout = true; - int handle; - - static void alarmed(int sig __attribute__((unused))) - { - if (handle) { - char handlename[10]; - sprintf(handlename, "%u:", handle); - write(STDOUT_FILENO, handlename, strlen(handlename)); - } - write(STDOUT_FILENO, command, strlen(command)); - write(STDOUT_FILENO, " timeout\n", strlen(" timeout\n")); - exit(1); - } if (argc > 1 && streq(argv[1], "--readonly")) { readonly = true; @@ -557,7 +699,7 @@ argv++; } - if (argc > 1 && streq(argv[1], "--notimeout")) { + if (argc > 1 && streq(argv[1], "--no-timeout")) { timeout = false; argc--; argv++; @@ -570,81 +712,10 @@ ringbuf_datasize = getpagesize() / 2 - sizeof(struct ringbuf_head); signal(SIGALRM, alarmed); - while (fgets(line, sizeof(line), stdin)) { - char *endp; - - if (strspn(line, " \n") == strlen(line)) - continue; - if (strstarts(line, "#")) - continue; - - handle = strtoul(line, &endp, 10); - if (endp != line) - memmove(line, endp+1, strlen(endp)); - else - handle = 0; - - if (!handles[handle]) { - if (readonly) - handles[handle] = xs_daemon_open_readonly(); - else - handles[handle] = xs_daemon_open(); - if (!handles[handle]) - barf_perror("Opening connection to daemon"); - } - command = arg(line, 0); - - if (timeout) - alarm(5); - if (streq(command, "dir")) - do_dir(handle, arg(line, 1)); - else if (streq(command, "read")) - do_read(handle, arg(line, 1)); - else if (streq(command, "write")) - do_write(handle, - arg(line, 1), arg(line, 2), arg(line, 3)); - else if (streq(command, "setid")) - do_setid(handle, arg(line, 1)); - else if (streq(command, "mkdir")) - do_mkdir(handle, arg(line, 1)); - else if (streq(command, "rm")) - do_rm(handle, arg(line, 1)); - else if (streq(command, "getperm")) - do_getperm(handle, arg(line, 1)); - else if (streq(command, "setperm")) - do_setperm(handle, arg(line, 1), line); - else if (streq(command, "shutdown")) - do_shutdown(handle); - else if (streq(command, "watch")) - do_watch(handle, arg(line, 1), arg(line, 2), arg(line, 3)); - else if (streq(command, "waitwatch")) - do_waitwatch(handle); - else if (streq(command, "ackwatch")) - do_ackwatch(handle, arg(line, 1)); - else if (streq(command, "unwatch")) - do_unwatch(handle, arg(line, 1), arg(line, 2)); - else if (streq(command, "close")) { - xs_daemon_close(handles[handle]); - handles[handle] = NULL; - } else if (streq(command, "start")) - do_start(handle, arg(line, 1)); - else if (streq(command, "commit")) - do_end(handle, false); - else if (streq(command, "abort")) - do_end(handle, true); - else if (streq(command, "introduce")) - do_introduce(handle, arg(line, 1), arg(line, 2), - arg(line, 3), arg(line, 4)); - else if (streq(command, "release")) - do_release(handle, arg(line, 1)); - else if (streq(command, "dump")) - dump(handle); - else if (streq(command, "sleep")) - sleep(atoi(arg(line, 1))); - else - barf("Unknown command %s", command); - fflush(stdout); - alarm(0); - } + while (fgets(line, sizeof(line), stdin)) + do_command(0, line); + + while (children) + do_asyncwait(0); return 0; } _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |