[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-tools] [PATCH 3/3] libxenstore: reconnect on daemon restart
# HG changeset patch # User Rusty Russell <rusty@xxxxxxxxxxxxxxx> # Node ID 6bf9c27fa43b4da8d3be9b65e1bebd0385223487 # Parent 703e0e8e7efce364f62a79b209af0d48195b6825 Handle xenstored restarting in libxenstore Store filename, watches and transaction ID in handle, and on read/write errors reopen and try to reestablish watches and transaction trasparently to client. Add reconnecting flag to handle to avoid recursion if something badly wrong. Add committing flag: xs_transaction_end is the only operation which isn't simply repeatable. If we can't re-establish the transaction, we know it has successfully been committed. Add --restart test to xs_random which kills daemon partway through writes or reads. Signed-off-by: Rusty Russell <rusty@xxxxxxxxxxxxxxx> diff -r 703e0e8e7efc -r 6bf9c27fa43b tools/xenstore/Makefile --- a/tools/xenstore/Makefile Mon Sep 26 10:53:13 2005 +++ b/tools/xenstore/Makefile Mon Sep 26 11:26:45 2005 @@ -49,7 +49,7 @@ $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@ xs_test: xs_test.o xs_lib.o utils.o -xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o +xs_random: xs_random.o xs_lib.o talloc.o utils.o xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o xs_crashme: xs_crashme.o xs_lib.o talloc.o utils.o @@ -84,9 +84,12 @@ print-end: @echo -check: print-dir testsuite-fast randomcheck-fast print-end +check: $(XEN_LIBXC)/xen print-dir testsuite-fast randomcheck-fast print-end -fullcheck: testsuite-run randomcheck stresstest +fullcheck: $(XEN_LIBXC)/xen testsuite-run randomcheck stresstest + +$(XEN_LIBXC)/xen: + make -C $(XEN_LIBXC) mk-symlinks $(TESTDIR): mkdir $@ @@ -107,6 +110,7 @@ $(TESTENV) ./xs_random --simple --fast /tmp/xs_random 200000 $(RANDSEED) && echo $(TESTENV) ./xs_random --fast /tmp/xs_random 100000 $(RANDSEED) && echo # $(TESTENV) ./xs_random --fail /tmp/xs_random 10000 $(RANDSEED) + $(TESTENV) ./xs_random --restart /tmp/xs_random 500 $(RANDSEED) crashme: xs_crashme xenstored_test $(TESTDIR) rm -rf $(TESTDIR)/store $(TESTDIR)/transactions /tmp/xs_crashme.vglog* /tmp/trace diff -r 703e0e8e7efc -r 6bf9c27fa43b tools/xenstore/xs.c --- a/tools/xenstore/xs.c Mon Sep 26 10:53:13 2005 +++ b/tools/xenstore/xs.c Mon Sep 26 11:26:45 2005 @@ -36,12 +36,39 @@ #include "xenstored.h" #include "xs_lib.h" #include "utils.h" +#include "list.h" + +struct xs_watch +{ + struct list_head list; + char *path, *token; +}; struct xs_handle { int fd; + const char *connect_to; + struct list_head watches; + char *transid; + bool committing; + bool reconnecting; }; +/* free(), but don't change errno. */ +static void free_no_errno(void *p) +{ + int saved_errno = errno; + free(p); + errno = saved_errno; +} + +static void close_no_errno(int fd) +{ + int saved_errno = errno; + close(fd); + errno = saved_errno; +} + /* Get the socket from the store daemon handle. */ int xs_fileno(struct xs_handle *h) @@ -49,60 +76,36 @@ return h->fd; } -static struct xs_handle *get_socket(const char *connect_to) +static int get_socket(const char *connect_to) { struct sockaddr_un addr; - int sock, saved_errno; - struct xs_handle *h = NULL; + int sock; sock = socket(PF_UNIX, SOCK_STREAM, 0); if (sock < 0) - return NULL; + return sock; addr.sun_family = AF_UNIX; strcpy(addr.sun_path, connect_to); - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) { - h = malloc(sizeof(*h)); - if (h) { - h->fd = sock; - return h; - } - } - - saved_errno = errno; - close(sock); - errno = saved_errno; - return NULL; -} - -static struct xs_handle *get_dev(const char *connect_to) -{ - int fd, saved_errno; - struct xs_handle *h; - - fd = open(connect_to, O_RDWR); - if (fd < 0) - return NULL; - - h = malloc(sizeof(*h)); - if (h) { - h->fd = fd; - return h; - } - - saved_errno = errno; - close(fd); - errno = saved_errno; - return NULL; -} - -static struct xs_handle *get_handle(const char *connect_to) + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) + return sock; + + close_no_errno(sock); + return -1; +} + +static int get_dev(const char *connect_to) +{ + return open(connect_to, O_RDWR); +} + +static int get_handle(const char *connect_to) { struct stat buf; if (stat(connect_to, &buf) != 0) - return NULL; + return -1; if (S_ISSOCK(buf.st_mode)) return get_socket(connect_to); @@ -110,19 +113,71 @@ return get_dev(connect_to); } +static bool restore_watches(struct xs_handle *h); +static bool restore_transaction(struct xs_handle *h); + +static bool recover(struct xs_handle *h) +{ + return restore_watches(h) && restore_transaction(h); +} + +/* Don't fail easily: give 30 seconds. */ +#ifndef RECONNECT_TIME +#define RECONNECT_TIME 30 +#endif + +/* Recovery: daemon can restart, so we reconnect. */ +static bool reconnect(struct xs_handle *h) +{ + unsigned int i; + + if (h->reconnecting) + return 0; + + h->reconnecting = true; + close(h->fd); + + for (i = 0; i < RECONNECT_TIME; i++) { + h->fd = get_handle(h->connect_to); + if (h->fd >= 0 && recover(h)) { + h->reconnecting = false; + return true; + } + close_no_errno(h->fd); + sleep(1); + } + return false; +} + +static struct xs_handle *xs_open(const char *connect_to) +{ + struct xs_handle *h = malloc(sizeof(*h)); + h->fd = -1; + h->reconnecting = false; + h->connect_to = connect_to; + INIT_LIST_HEAD(&h->watches); + h->transid = NULL; + h->committing = false; + if (!reconnect(h)) { + free_no_errno(h); + h = NULL; + } + return h; +} + struct xs_handle *xs_daemon_open(void) { - return get_handle(xs_daemon_socket()); + return xs_open(xs_daemon_socket()); } struct xs_handle *xs_daemon_open_readonly(void) { - return get_handle(xs_daemon_socket_ro()); + return xs_open(xs_daemon_socket_ro()); } struct xs_handle *xs_domain_open(void) { - return get_handle(xs_domain_dev()); + return xs_open(xs_domain_dev()); } void xs_daemon_close(struct xs_handle *h) @@ -175,7 +230,6 @@ { struct xsd_sockmsg msg; void *ret; - int saved_errno; if (!read_all(fd, &msg, sizeof(msg))) return NULL; @@ -185,9 +239,7 @@ return NULL; if (!read_all(fd, ret, msg.len)) { - saved_errno = errno; - free(ret); - errno = saved_errno; + free_no_errno(ret); return NULL; } @@ -209,22 +261,23 @@ unsigned int i; struct sigaction ignorepipe, oldact; + ignorepipe.sa_handler = SIG_IGN; + sigemptyset(&ignorepipe.sa_mask); + ignorepipe.sa_flags = 0; + sigaction(SIGPIPE, &ignorepipe, &oldact); + +again: msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; - ignorepipe.sa_handler = SIG_IGN; - sigemptyset(&ignorepipe.sa_mask); - ignorepipe.sa_flags = 0; - sigaction(SIGPIPE, &ignorepipe, &oldact); - if (!xs_write_all(h->fd, &msg, sizeof(msg))) - goto fail; + goto reconnect; for (i = 0; i < num_vecs; i++) if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len)) - goto fail; + goto reconnect; /* Watches can have fired before reply comes: daemon detects * and re-transmits, so we can ignore this. */ @@ -232,42 +285,30 @@ free(ret); ret = read_reply(h->fd, &msg.type, len); if (!ret) - goto fail; + goto reconnect; } while (msg.type == XS_WATCH_EVENT); sigaction(SIGPIPE, &oldact, NULL); if (msg.type == XS_ERROR) { - saved_errno = get_error(ret); - free(ret); - errno = saved_errno; + errno = get_error(ret); + free_no_errno(ret); return NULL; } if (msg.type != type) { free(ret); - saved_errno = EBADF; - goto close_fd; - + errno = EBADF; + goto reconnect; } return ret; -fail: - /* We're in a bad state, so close fd. */ +reconnect: + /* All operations are idempotent, so no harm in retrying. */ saved_errno = errno; - sigaction(SIGPIPE, &oldact, NULL); -close_fd: - close(h->fd); - h->fd = -1; + if (reconnect(h)) + goto again; errno = saved_errno; return NULL; -} - -/* free(), but don't change errno. */ -static void free_no_errno(void *p) -{ - int saved_errno = errno; - free(p); - errno = saved_errno; } /* Simplified version of xs_talkv: single message. */ @@ -428,6 +469,27 @@ return false; } +static bool one_watch(struct xs_handle *h, struct xs_watch *w) +{ + struct iovec iov[2]; + + iov[0].iov_base = w->path; + iov[0].iov_len = strlen(w->path) + 1; + iov[1].iov_base = w->token; + iov[1].iov_len = strlen(w->token) + 1; + return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); +} + +static bool restore_watches(struct xs_handle *h) +{ + struct xs_watch *i; + + list_for_each_entry(i, &h->watches, list) + if (!one_watch(h, i)) + return false; + return true; +} + /* Watch a node for changes (poll on fd to detect, or call read_watch()). * When the node (or any child) changes, fd will become readable. * Token is returned when watch is read, to allow matching. @@ -435,14 +497,23 @@ */ bool xs_watch(struct xs_handle *h, const char *path, const char *token) { - struct iovec iov[2]; - - iov[0].iov_base = (void *)path; - iov[0].iov_len = strlen(path) + 1; - iov[1].iov_base = (void *)token; - iov[1].iov_len = strlen(token) + 1; - - return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); + struct xs_watch *watch; + + watch = malloc(sizeof (*watch) + strlen(path)+1 + strlen(token)+1); + if (!watch) + return false; + + watch->path = (char *)(watch + 1); + strcpy(watch->path, path); + watch->token = watch->path + strlen(path) + 1; + strcpy(watch->token, token); + + if (one_watch(h, watch)) { + list_add(&watch->list, &h->watches); + return true; + } + free_no_errno(watch); + return false; } /* Find out what node change was on (will block if nothing pending). @@ -453,9 +524,11 @@ { struct xsd_sockmsg msg; char **ret; - + int saved_errno; + +again: if (!read_all(h->fd, &msg, sizeof(msg))) - return NULL; + goto reconnect; assert(msg.type == XS_WATCH_EVENT); ret = malloc(sizeof(char *)*2 + msg.len); @@ -465,10 +538,17 @@ ret[0] = (char *)(ret + 2); if (!read_all(h->fd, ret[0], msg.len)) { free_no_errno(ret); - return NULL; + goto reconnect; } ret[1] = ret[0] + strlen(ret[0]) + 1; return ret; + +reconnect: + saved_errno = errno; + if (reconnect(h)) + goto again; + errno = saved_errno; + return NULL; } /* Acknowledge watch on node. Watches must be acknowledged before @@ -486,13 +566,36 @@ bool xs_unwatch(struct xs_handle *h, const char *path, const char *token) { struct iovec iov[2]; - - iov[0].iov_base = (char *)path; - iov[0].iov_len = strlen(path) + 1; - iov[1].iov_base = (char *)token; - iov[1].iov_len = strlen(token) + 1; - - return xs_bool(xs_talkv(h, XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL)); + struct xs_watch *i; + + list_for_each_entry(i, &h->watches, list) { + if (!streq(i->path, path) || !streq(i->token, token)) + continue; + + iov[0].iov_base = i->path; + iov[0].iov_len = strlen(i->path) + 1; + iov[1].iov_base = i->token; + iov[1].iov_len = strlen(i->token) + 1; + + if (!xs_bool(xs_talkv(h,XS_UNWATCH,iov,ARRAY_SIZE(iov),NULL))) + return false; + list_del(&i->list); + free(i); + return true; + } + errno = ENOENT; + return false; +} + +static bool restore_transaction(struct xs_handle *h) +{ + if (!h->transid) + return true; + + if (xs_bool(xs_single(h, XS_TRANSACTION_START, h->transid, NULL))) + return true; + /* If we're committing and no longer exists, we succeeded. */ + return h->committing; } /* Start a transaction: changes by others will not be seen during this @@ -502,23 +605,47 @@ */ bool xs_transaction_start(struct xs_handle *h) { - return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); + char *transid; + + transid = xs_single(h, XS_TRANSACTION_START, "", NULL); + if (!transid) + return false; + h->transid = transid; + return true; } /* End a transaction. * If abandon is true, transaction is discarded instead of committed. - * Returns false on failure, which indicates an error: transactions will - * not fail spuriously. + * Returns false on failure: if errno == EAGAIN, you have to restart + * transaction. */ bool xs_transaction_end(struct xs_handle *h, bool abort) { char abortstr[2]; + bool ret; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + + if (!h->transid) { + errno = ENOENT; + return false; + } + + h->committing = true; + ret = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + + /* On re-transmission, we can get ENOENT if first one succeeded, + * otherwise we never should. */ + if (!ret && errno == ENOENT) + ret = true; + free_no_errno(h->transid); + h->transid = NULL; + h->committing = false; + + return ret; } /* Introduce a new domain. diff -r 703e0e8e7efc -r 6bf9c27fa43b tools/xenstore/xs_random.c --- a/tools/xenstore/xs_random.c Mon Sep 26 10:53:13 2005 +++ b/tools/xenstore/xs_random.c Mon Sep 26 11:26:45 2005 @@ -17,6 +17,47 @@ #include "talloc.h" #include "utils.h" +static bool fail_once = false; +static unsigned int fail_len; + +static void restart_daemon(void); + +/* Override xs functions so we can fail at "random" times. */ +#define XSTEST +static bool read_all(int fd, void *data, unsigned int len); +static bool read_all_choice(int fd, void *data, unsigned int len) +{ + if (fail_once) { + if (len > fail_len) { + fail_once = false; + restart_daemon(); + errno = EBADF; + return false; + } + fail_len -= len; + } + return read_all(fd, data, len); +} + +static bool write_all_choice(int fd, const void *data, unsigned int len) +{ + if (fail_once) { + if (len > fail_len) { + fail_once = false; + restart_daemon(); + errno = EPIPE; + return false; + } + fail_len -= len; + } + return xs_write_all(fd, data, len); +} + +/* Fail fast, please. */ +#define RECONNECT_TIME 2 + +#include "xs.c" + struct ops { char *name; @@ -53,6 +94,7 @@ { const char *base; char *transact_base; + bool fail_transaction; }; static void convert_to_dir(const char *dirname) @@ -389,6 +431,17 @@ goto success; } + if (info->fail_transaction) { + cmd = talloc_asprintf(NULL, "rm -rf %s", info->transact_base); + do_command(cmd); + info->fail_transaction = false; + talloc_free(cmd); + talloc_free(info->transact_base); + info->transact_base = NULL; + errno = EAGAIN; + return false; + } + old = talloc_asprintf(NULL, "rm -rf %s", info->base); do_command(old); talloc_free(old); @@ -410,6 +463,7 @@ info->base = dir; info->transact_base = NULL; + info->fail_transaction = false; return info; } @@ -962,6 +1016,14 @@ setup_file_ops(dir); setup_xs_ops(); }; + +static void restart_daemon(void) +{ + kill(daemon_pid, SIGKILL); + waitpid(daemon_pid, NULL, 0); + + setup_xs_ops(); +} struct simple_data { @@ -1553,24 +1615,179 @@ reduce_problem(try + 1, try_fail, &data); } +struct restart_data +{ + unsigned int seed; + bool print_progress; + bool fast; + const char *dir; +}; + +/* Restart: try with randomly restarting daemon. */ +static unsigned int try_restart(const bool *trymap, + unsigned int number, + bool verbose, + void *_data) +{ + void *fileh, *xsh; + bool transact = false; + struct ops *fail; + struct diff_data *data = _data; + unsigned int i, print; + int st; + + cleanup(data->dir); + setup(data->dir); + + fileh = file_handle(data->dir); + xsh = xs_handle(data->dir); + + print = number / 76; + if (!print) + print = 1; + + for (i = 0; i < number; i++) { + char *file, *xs; + + if (data->print_progress) { + if (i % print == 0) { + printf("."); + fflush(stdout); + } + } + if (trymap && !trymap[i]) + continue; + + fail_once = true; + st = i+data->seed; + fail_len = get_randomness(&st) % 100; + + if (verbose) + printf("XS: "); + xs = do_next_op(&xs_ops, xsh, i+data->seed, verbose); + if (verbose) + printf("-> %.*s\n", (int)(strchr(xs, '/') - xs), xs); + + /* Due to restart, transaction can return EAGAIN. */ + if (streq(xs, "FAILED:Resource temporarily unavailable")) { + ((struct file_ops_info *)fileh)->fail_transaction + = true; + transact = false; + } + + if (verbose) + printf("FILE: "); + + file = do_next_op(&file_ops, fileh, i+data->seed, verbose); + if (verbose) + printf("-> %.*s\n", + (int)(strchr(file, '/') - file), file); + + if (!streq(file, xs)) + goto out; + + if (strstarts(file, "OK:START-TRANSACT:")) + transact = true; + else if (streq(file, "OK:STOP-TRANSACT")) + transact = false; + + talloc_free(file); + talloc_free(xs); + + if (data->fast) + continue; + + /* In case it didn't fail yet, we don't want that now! */ + fail_once = false; + fail = NULL; + if (!ops_equal(&xs_ops, xsh, &file_ops, fileh, "/", &fail)) { + if (fail) + barf("%s failed during test\n", fail->name); + if (verbose) + printf("Trees differ:\nXS:%s\nFILE%s\n", + dump(&xs_ops, xsh), + dump(&file_ops, fileh)); + goto out; + } + + if (transact) { + void *fileh_pre = file_handle(data->dir); + void *xsh_pre = xs_handle(data->dir); + + fail = NULL; + if (!ops_equal(&xs_ops, xsh_pre, &file_ops, fileh_pre, + "/", &fail)) { + if (fail) + barf("%s failed during transact\n", + fail->name); + + xs_daemon_close(xsh_pre); + talloc_free(fileh_pre); + goto out; + } + xs_daemon_close(xsh_pre); + talloc_free(fileh_pre); + } + } + + fail = NULL; + if (data->fast) + if (!ops_equal(&xs_ops, xsh, &file_ops, fileh, "/", &fail)) + barf("Final result not the same: try without --fast"); +out: + file_ops.close(fileh); + xs_ops.close(xsh); + return i; +} + +/* Restart random test: compare results against file backend, daemon + * randomly restarting. */ +static void restart_test(const char *dir, + unsigned int iters, unsigned int seed, bool fast, + bool verbose) +{ + struct restart_data data; + unsigned int try; + + data.seed = seed; + data.print_progress = !verbose; + data.fast = fast; + data.dir = dir; + + try = try_restart(NULL, iters, verbose, &data); + if (try == iters) { + cleanup_xs_ops(); + exit(0); + } + printf("Failed on iteration %u of seed %u\n", try + 1, seed); + data.print_progress = false; + reduce_problem(try + 1, try_restart, &data); +} + int main(int argc, char *argv[]) { bool verbose = false; - bool simple = false; bool fast = false; - bool fail = false; + void (*test)(const char *, unsigned, unsigned, bool, bool) = diff_test; if (argv[1] && streq(argv[1], "--fail")) { - fail = true; + test = fail_test; argv++; argc--; } if (argv[1] && streq(argv[1], "--simple")) { - simple = true; + test = simple_test; argv++; argc--; } + + if (argv[1] && streq(argv[1], "--restart")) { + test = restart_test; + argv++; + argc--; + } + if (argv[1] && streq(argv[1], "--fast")) { fast = true; @@ -1585,15 +1802,9 @@ } if (argc != 4) - barf("Usage: xs_random [--fail|--simple] [--fast] [--verbose] <directory> <iterations> <seed>"); + barf("Usage: xs_random [--fail|--simple|--restart] [--fast] [--verbose] <directory> <iterations> <seed>"); talloc_enable_null_tracking(); - - if (fail) - fail_test(argv[1], atoi(argv[2]), atoi(argv[3]), fast, verbose); - else if (simple) - simple_test(argv[1], atoi(argv[2]), atoi(argv[3]), fast, verbose); - else - diff_test(argv[1], atoi(argv[2]), atoi(argv[3]), fast, verbose); + test(argv[1], atoi(argv[2]), atoi(argv[3]), fast, verbose); exit(2); } diff -r 703e0e8e7efc -r 6bf9c27fa43b tools/xenstore/xs_test.c --- a/tools/xenstore/xs_test.c Mon Sep 26 10:53:13 2005 +++ b/tools/xenstore/xs_test.c Mon Sep 26 11:26:45 2005 @@ -373,7 +373,7 @@ static void do_readack(unsigned int handle) { - enum xsd_sockmsg_type type; + enum xsd_sockmsg_type type = XS_ERROR; /* Stupid gcc. */ char *ret = NULL; /* Watches can have fired before reply comes: daemon detects @@ -615,6 +615,11 @@ /* Create new handle. */ handles[i] = new(struct xs_handle); handles[i]->fd = -2; + handles[i]->reconnecting = false; + handles[i]->connect_to = NULL; + INIT_LIST_HEAD(&handles[i]->watches); + handles[i]->transid = NULL; + handles[i]->committing = false; /* Read in daemon pid. */ daemon_pid = *(int *)((void *)out + 32); -- A bad analogy is like a leaky screwdriver -- Richard Braakman _______________________________________________ Xen-tools mailing list Xen-tools@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-tools
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |