[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] Refactor xenbus to break up the xenbus_lock and permit watches
# HG changeset patch # User kaf24@xxxxxxxxxxxxxxxxxxxx # Node ID 8016551fde9825fc82bfa4762f17b98e7519b823 # Parent ab93a9a46bd48f9a654b1fdc9caf4b7ae07f6a8b Refactor xenbus to break up the xenbus_lock and permit watches to fire concurrently with request/reply pairs. Remove watch_ack message: no longer needed. Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx> diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c --- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c Sun Oct 9 17:52:54 2005 @@ -1327,18 +1327,14 @@ .callback = handle_vcpu_hotplug_event }; -/* NB: Assumes xenbus_lock is held! */ static int setup_cpu_watcher(struct notifier_block *notifier, unsigned long event, void *data) { - int err = 0; - - BUG_ON(down_trylock(&xenbus_lock) == 0); + int err; + err = register_xenbus_watch(&cpu_watch); - - if (err) { + if (err) printk("Failed to register watch on /cpu\n"); - } return NOTIFY_DONE; } diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/arch/xen/kernel/reboot.c --- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c Sun Oct 9 17:52:54 2005 @@ -360,9 +360,6 @@ static struct notifier_block xenstore_notifier; -/* Setup our watcher - NB: Assumes xenbus_lock is held! -*/ static int setup_shutdown_watcher(struct notifier_block *notifier, unsigned long event, void *data) @@ -371,8 +368,6 @@ #ifdef CONFIG_MAGIC_SYSRQ int err2 = 0; #endif - - BUG_ON(down_trylock(&xenbus_lock) == 0); err1 = register_xenbus_watch(&shutdown_watch); #ifdef CONFIG_MAGIC_SYSRQ diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c --- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c Sun Oct 9 17:52:54 2005 @@ -370,16 +370,11 @@ } -/* Setup our watcher - NB: Assumes xenbus_lock is held! -*/ int balloon_init_watcher(struct notifier_block *notifier, unsigned long event, void *data) { int err; - - BUG_ON(down_trylock(&xenbus_lock) == 0); err = register_xenbus_watch(&target_watch); if (err) diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c Sun Oct 9 17:52:54 2005 @@ -130,15 +130,10 @@ wait_event(xb_waitq, output_avail(out)); - /* Read, then check: not that we don't trust store. - * Hell, some of my best friends are daemons. But, - * in this post-911 world... */ + mb(); h = *out; - mb(); - if (!check_buffer(&h)) { - set_current_state(TASK_RUNNING); - return -EIO; /* ETERRORIST! */ - } + if (!check_buffer(&h)) + return -EIO; dst = get_output_chunk(&h, out->buf, &avail); if (avail > len) @@ -173,12 +168,11 @@ const char *src; wait_event(xb_waitq, xs_input_avail()); + + mb(); h = *in; - mb(); - if (!check_buffer(&h)) { - set_current_state(TASK_RUNNING); + if (!check_buffer(&h)) return -EIO; - } src = get_input_chunk(&h, in->buf, &avail); if (avail > len) @@ -195,10 +189,6 @@ notify_remote_via_evtchn(xen_start_info->store_evtchn); } - /* If we left something, wake watch thread to deal with it. */ - if (xs_input_avail()) - wake_up(&xb_waitq); - return 0; } diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c Sun Oct 9 17:52:54 2005 @@ -46,85 +46,113 @@ #include <asm/hypervisor.h> struct xenbus_dev_data { - /* Are there bytes left to be read in this message? */ - int bytes_left; - /* Are we still waiting for the reply to a message we wrote? */ - int awaiting_reply; - /* Buffer for outgoing messages. */ + int in_transaction; + + /* Partial request. */ unsigned int len; union { struct xsd_sockmsg msg; char buffer[PAGE_SIZE]; } u; + + /* Response queue. */ +#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1)) + char read_buffer[PAGE_SIZE]; + unsigned int read_cons, read_prod; + wait_queue_head_t read_waitq; }; static struct proc_dir_entry *xenbus_dev_intf; -/* Reply can be long (dir, getperm): don't buffer, just examine - * headers so we can discard rest if they die. */ static ssize_t xenbus_dev_read(struct file *filp, char __user *ubuf, size_t len, loff_t *ppos) { - struct xenbus_dev_data *data = filp->private_data; - struct xsd_sockmsg msg; - int err; - - /* Refill empty buffer? */ - if (data->bytes_left == 0) { - if (len < sizeof(msg)) - return -EINVAL; - - err = xb_read(&msg, sizeof(msg)); - if (err) - return err; - data->bytes_left = msg.len; - if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0) - return -EFAULT; - /* We can receive spurious XS_WATCH_EVENT messages. */ - if (msg.type != XS_WATCH_EVENT) - data->awaiting_reply = 0; - return sizeof(msg); + struct xenbus_dev_data *u = filp->private_data; + int i; + + if (wait_event_interruptible(u->read_waitq, + u->read_prod != u->read_cons)) + return -EINTR; + + for (i = 0; i < len; i++) { + if (u->read_cons == u->read_prod) + break; + put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i); + u->read_cons++; } - /* Don't read over next header, or over temporary buffer. */ - if (len > sizeof(data->u.buffer)) - len = sizeof(data->u.buffer); - if (len > data->bytes_left) - len = data->bytes_left; - - err = xb_read(data->u.buffer, len); - if (err) - return err; - - data->bytes_left -= len; - if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0) - return -EFAULT; - return len; -} - -/* We do v. basic sanity checking so they don't screw up kernel later. */ + return i; +} + +static void queue_reply(struct xenbus_dev_data *u, + char *data, unsigned int len) +{ + int i; + + for (i = 0; i < len; i++, u->read_prod++) + u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i]; + + BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer)); + + wake_up(&u->read_waitq); +} + static ssize_t xenbus_dev_write(struct file *filp, const char __user *ubuf, size_t len, loff_t *ppos) { - struct xenbus_dev_data *data = filp->private_data; - int err; - - /* We gather data in buffer until we're ready to send it. */ - if (len > data->len + sizeof(data->u)) + struct xenbus_dev_data *u = filp->private_data; + void *reply; + int err = 0; + + if ((len + u->len) > sizeof(u->u.buffer)) return -EINVAL; - if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0) + + if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0) return -EFAULT; - data->len += len; - if (data->len >= sizeof(data->u.msg) + data->u.msg.len) { - err = xb_write(data->u.buffer, data->len); - if (err) - return err; - data->len = 0; - data->awaiting_reply = 1; + + u->len += len; + if (u->len < (sizeof(u->u.msg) + u->u.msg.len)) + return len; + + switch (u->u.msg.type) { + case XS_TRANSACTION_START: + case XS_TRANSACTION_END: + case XS_DIRECTORY: + case XS_READ: + case XS_GET_PERMS: + case XS_RELEASE: + case XS_GET_DOMAIN_PATH: + case XS_WRITE: + case XS_MKDIR: + case XS_RM: + case XS_SET_PERMS: + reply = xenbus_dev_request_and_reply(&u->u.msg); + if (IS_ERR(reply)) + err = PTR_ERR(reply); + else { + if (u->u.msg.type == XS_TRANSACTION_START) + u->in_transaction = 1; + if (u->u.msg.type == XS_TRANSACTION_END) + u->in_transaction = 0; + queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg)); + queue_reply(u, (char *)reply, u->u.msg.len); + kfree(reply); + } + break; + + default: + err = -EINVAL; + break; } - return len; + + if (err == 0) { + u->len = 0; + err = len; + } + + return err; } static int xenbus_dev_open(struct inode *inode, struct file *filp) @@ -134,7 +162,6 @@ if (xen_start_info->store_evtchn == 0) return -ENOENT; - /* Don't try seeking. */ nonseekable_open(inode, filp); u = kmalloc(sizeof(*u), GFP_KERNEL); @@ -142,28 +169,21 @@ return -ENOMEM; memset(u, 0, sizeof(*u)); + init_waitqueue_head(&u->read_waitq); filp->private_data = u; - down(&xenbus_lock); - return 0; } static int xenbus_dev_release(struct inode *inode, struct file *filp) { - struct xenbus_dev_data *data = filp->private_data; - - /* Discard any unread replies. */ - while (data->bytes_left || data->awaiting_reply) - xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL); - - /* Harmless if no transaction in progress. */ - xenbus_transaction_end(1); - - up(&xenbus_lock); - - kfree(data); + struct xenbus_dev_data *u = filp->private_data; + + if (u->in_transaction) + xenbus_transaction_end(1); + + kfree(u); return 0; } diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c Sun Oct 9 17:52:54 2005 @@ -43,6 +43,9 @@ static struct notifier_block *xenstore_chain; +/* Now used to protect xenbus probes against save/restore. */ +static DECLARE_MUTEX(xenbus_lock); + /* If something in array of ids matches this device, return it. */ static const struct xenbus_device_id * match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev) @@ -625,12 +628,13 @@ down(&xenbus_lock); bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev); bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev); + xs_suspend(); } void xenbus_resume(void) { xb_init_comms(); - reregister_xenbus_watches(); + xs_resume(); bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev); bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev); up(&xenbus_lock); @@ -685,6 +689,7 @@ /* Notify others that xenstore is up */ notifier_call_chain(&xenstore_chain, 0, 0); up(&xenbus_lock); + return 0; } diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c --- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c Sun Oct 9 17:52:54 2005 @@ -42,11 +42,67 @@ #define streq(a, b) (strcmp((a), (b)) == 0) -static char printf_buffer[4096]; +struct xs_stored_msg { + struct xsd_sockmsg hdr; + + union { + /* Stored replies. */ + struct { + struct list_head list; + char *body; + } reply; + + /* Queued watch callbacks. */ + struct { + struct work_struct work; + struct xenbus_watch *handle; + char **vec; + unsigned int vec_size; + } watch; + } u; +}; + +struct xs_handle { + /* A list of replies. Currently only one will ever be outstanding. */ + struct list_head reply_list; + spinlock_t reply_lock; + wait_queue_head_t reply_waitq; + + /* One request at a time. */ + struct semaphore request_mutex; + + /* One transaction at a time. */ + struct semaphore transaction_mutex; + int transaction_pid; +}; + +static struct xs_handle xs_state; + static LIST_HEAD(watches); - -DECLARE_MUTEX(xenbus_lock); -EXPORT_SYMBOL(xenbus_lock); +static DEFINE_SPINLOCK(watches_lock); + +/* Can wait on !xs_resuming for suspend/resume cycle to complete. */ +static int xs_resuming; +static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq); + +static void request_mutex_acquire(void) +{ + /* + * We can't distinguish non-transactional from transactional + * requests right now. So temporarily acquire the transaction mutex + * if this task is outside transaction context. + */ + if (xs_state.transaction_pid != current->pid) + down(&xs_state.transaction_mutex); + down(&xs_state.request_mutex); +} + +static void request_mutex_release(void) +{ + up(&xs_state.request_mutex); + if (xs_state.transaction_pid != current->pid) + up(&xs_state.transaction_mutex); +} static int get_error(const char *errorstring) { @@ -65,29 +121,32 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) { - struct xsd_sockmsg msg; - void *ret; - int err; - - err = xb_read(&msg, sizeof(msg)); - if (err) - return ERR_PTR(err); - - ret = kmalloc(msg.len + 1, GFP_KERNEL); - if (!ret) - return ERR_PTR(-ENOMEM); - - err = xb_read(ret, msg.len); - if (err) { - kfree(ret); - return ERR_PTR(err); - } - ((char*)ret)[msg.len] = '\0'; - - *type = msg.type; + struct xs_stored_msg *msg; + char *body; + + spin_lock(&xs_state.reply_lock); + + while (list_empty(&xs_state.reply_list)) { + spin_unlock(&xs_state.reply_lock); + wait_event(xs_state.reply_waitq, + !list_empty(&xs_state.reply_list)); + spin_lock(&xs_state.reply_lock); + } + + msg = list_entry(xs_state.reply_list.next, + struct xs_stored_msg, u.reply.list); + list_del(&msg->u.reply.list); + + spin_unlock(&xs_state.reply_lock); + + *type = msg->hdr.type; if (len) - *len = msg.len; - return ret; + *len = msg->hdr.len; + body = msg->u.reply.body; + + kfree(msg); + + return body; } /* Emergency write. */ @@ -98,10 +157,45 @@ msg.type = XS_DEBUG; msg.len = sizeof("print") + count + 1; + request_mutex_acquire(); xb_write(&msg, sizeof(msg)); xb_write("print", sizeof("print")); xb_write(str, count); xb_write("", 1); + request_mutex_release(); +} + +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) +{ + void *ret; + struct xsd_sockmsg req_msg = *msg; + int err; + + if (req_msg.type == XS_TRANSACTION_START) { + down(&xs_state.transaction_mutex); + xs_state.transaction_pid = current->pid; + } + + request_mutex_acquire(); + + err = xb_write(msg, sizeof(*msg) + msg->len); + if (err) { + msg->type = XS_ERROR; + ret = ERR_PTR(err); + } else { + ret = read_reply(&msg->type, &msg->len); + } + + request_mutex_release(); + + if ((msg->type == XS_TRANSACTION_END) || + ((req_msg.type == XS_TRANSACTION_START) && + (msg->type == XS_ERROR))) { + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + } + + return ret; } /* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */ @@ -115,31 +209,33 @@ unsigned int i; int err; - WARN_ON(down_trylock(&xenbus_lock) == 0); - msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; + request_mutex_acquire(); + err = xb_write(&msg, sizeof(msg)); - if (err) + if (err) { + up(&xs_state.request_mutex); return ERR_PTR(err); + } for (i = 0; i < num_vecs; i++) { err = xb_write(iovec[i].iov_base, iovec[i].iov_len);; - if (err) + if (err) { + request_mutex_release(); return ERR_PTR(err); - } - - /* Watches can have fired before reply comes: daemon detects - * and re-transmits, so we can ignore this. */ - do { - kfree(ret); - ret = read_reply(&msg.type, len); - if (IS_ERR(ret)) - return ret; - } while (msg.type == XS_WATCH_EVENT); + } + } + + ret = read_reply(&msg.type, len); + + request_mutex_release(); + + if (IS_ERR(ret)) + return ret; if (msg.type == XS_ERROR) { err = get_error(ret); @@ -187,8 +283,6 @@ { static char buffer[4096]; - BUG_ON(down_trylock(&xenbus_lock) == 0); - /* XXX FIXME: might not be correct if name == "" */ BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer)); strcpy(buffer, dir); @@ -207,7 +301,7 @@ *num = count_strings(strings, len); /* Transfer to one big alloc for easy freeing. */ - ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC); + ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL); if (!ret) { kfree(strings); return ERR_PTR(-ENOMEM); @@ -298,7 +392,18 @@ */ int xenbus_transaction_start(void) { - return xs_error(xs_single(XS_TRANSACTION_START, "", NULL)); + int err; + + down(&xs_state.transaction_mutex); + xs_state.transaction_pid = current->pid; + + err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL)); + if (err) { + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + } + + return err; } EXPORT_SYMBOL(xenbus_transaction_start); @@ -308,12 +413,19 @@ int xenbus_transaction_end(int abort) { char abortstr[2]; + int err; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL)); + + err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL)); + + xs_state.transaction_pid = -1; + up(&xs_state.transaction_mutex); + + return err; } EXPORT_SYMBOL(xenbus_transaction_end); @@ -344,14 +456,23 @@ { va_list ap; int ret; - - BUG_ON(down_trylock(&xenbus_lock) == 0); +#define PRINTF_BUFFER_SIZE 4096 + char *printf_buffer; + + printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL); + if (printf_buffer == NULL) + return -ENOMEM; + va_start(ap, fmt); - ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap); + ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap); va_end(ap); - BUG_ON(ret > sizeof(printf_buffer)-1); - return xenbus_write(dir, node, printf_buffer); + BUG_ON(ret > PRINTF_BUFFER_SIZE-1); + ret = xenbus_write(dir, node, printf_buffer); + + kfree(printf_buffer); + + return ret; } EXPORT_SYMBOL(xenbus_printf); @@ -361,19 +482,28 @@ va_list ap; int ret; unsigned int len; - - BUG_ON(down_trylock(&xenbus_lock) == 0); + char *printf_buffer; + + printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL); + if (printf_buffer == NULL) + goto fail; len = sprintf(printf_buffer, "%i ", -err); va_start(ap, fmt); - ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap); + ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap); va_end(ap); - BUG_ON(len + ret > sizeof(printf_buffer)-1); + BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1); dev->has_error = 1; if (xenbus_write(dev->nodename, "error", printf_buffer) != 0) - printk("xenbus: failed to write error node for %s (%s)\n", - dev->nodename, printf_buffer); + goto fail; + + kfree(printf_buffer); + return; + + fail: + printk("xenbus: failed to write error node for %s (%s)\n", + dev->nodename, printf_buffer); } EXPORT_SYMBOL(xenbus_dev_error); @@ -432,26 +562,6 @@ return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); } -static char **xs_read_watch(unsigned int *num) -{ - enum xsd_sockmsg_type type; - char *strings; - unsigned int len; - - strings = read_reply(&type, &len); - if (IS_ERR(strings)) - return (char **)strings; - - BUG_ON(type != XS_WATCH_EVENT); - - return split(strings, len, num); -} - -static int xs_acknowledge_watch(const char *token) -{ - return xs_error(xs_single(XS_WATCH_ACK, token, NULL)); -} - static int xs_unwatch(const char *path, const char *token) { struct kvec iov[2]; @@ -464,7 +574,6 @@ return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL)); } -/* A little paranoia: we don't just trust token. */ static struct xenbus_watch *find_watch(const char *token) { struct xenbus_watch *i, *cmp; @@ -474,6 +583,7 @@ list_for_each_entry(i, &watches, list) if (i == cmp) return i; + return NULL; } @@ -485,11 +595,20 @@ int err; sprintf(token, "%lX", (long)watch); + + spin_lock(&watches_lock); BUG_ON(find_watch(token)); + spin_unlock(&watches_lock); err = xs_watch(watch->node, token); - if (!err) + + /* Ignore errors due to multiple registration. */ + if ((err == 0) || (err == -EEXIST)) { + spin_lock(&watches_lock); list_add(&watch->list, &watches); + spin_unlock(&watches_lock); + } + return err; } EXPORT_SYMBOL(register_xenbus_watch); @@ -500,77 +619,188 @@ int err; sprintf(token, "%lX", (long)watch); + + spin_lock(&watches_lock); BUG_ON(!find_watch(token)); + list_del(&watch->list); + spin_unlock(&watches_lock); + + /* Ensure xs_resume() is not in progress (see comments there). */ + wait_event(xs_resuming_waitq, !xs_resuming); err = xs_unwatch(watch->node, token); - list_del(&watch->list); - if (err) printk(KERN_WARNING "XENBUS Failed to release watch %s: %i\n", watch->node, err); + + /* Make sure watch is not in use. */ + flush_scheduled_work(); } EXPORT_SYMBOL(unregister_xenbus_watch); -/* Re-register callbacks to all watches. */ -void reregister_xenbus_watches(void) -{ +void xs_suspend(void) +{ + down(&xs_state.transaction_mutex); + down(&xs_state.request_mutex); +} + +void xs_resume(void) +{ + struct list_head *ent, *prev_ent = &watches; struct xenbus_watch *watch; char token[sizeof(watch) * 2 + 1]; - list_for_each_entry(watch, &watches, list) { - sprintf(token, "%lX", (long)watch); - xs_watch(watch->node, token); - } -} - -static int watch_thread(void *unused) -{ + /* Protect against concurrent unregistration and freeing of watches. */ + BUG_ON(xs_resuming); + xs_resuming = 1; + + up(&xs_state.request_mutex); + up(&xs_state.transaction_mutex); + + /* + * Iterate over the watch list re-registering each node. We must + * be careful about concurrent registrations and unregistrations. + * We search for the node immediately following the previously + * re-registered node. If we get no match then either we are done + * (previous node is last in list) or the node was unregistered, in + * which case we restart from the beginning of the list. + * register_xenbus_watch() + unregister_xenbus_watch() is safe because + * it will only ever move a watch node earlier in the list, so it + * cannot cause us to skip nodes. + */ for (;;) { - char **vec = NULL; - unsigned int num; - - wait_event(xb_waitq, xs_input_avail()); - - /* If this is a spurious wakeup caused by someone - * doing an op, they'll hold the lock and the buffer - * will be empty by the time we get there. - */ - down(&xenbus_lock); - if (xs_input_avail()) - vec = xs_read_watch(&num); - - if (vec && !IS_ERR(vec)) { - struct xenbus_watch *w; - int err; - - err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]); - if (err) - printk(KERN_WARNING "XENBUS ack %s fail %i\n", - vec[XS_WATCH_TOKEN], err); - w = find_watch(vec[XS_WATCH_TOKEN]); - BUG_ON(!w); - w->callback(w, (const char **)vec, num); - kfree(vec); - } else if (vec) - printk(KERN_WARNING "XENBUS xs_read_watch: %li\n", - PTR_ERR(vec)); - up(&xenbus_lock); + spin_lock(&watches_lock); + list_for_each(ent, &watches) + if (ent->prev == prev_ent) + break; + spin_unlock(&watches_lock); + + /* No match because prev_ent is at the end of the list? */ + if ((ent == &watches) && (watches.prev == prev_ent)) + break; /* We're done! */ + + if ((prev_ent = ent) != &watches) { + /* + * Safe even with watch_lock not held. We are saved by + * (xs_resumed==1) check in unregister_xenbus_watch. + */ + watch = list_entry(ent, struct xenbus_watch, list); + sprintf(token, "%lX", (long)watch); + xs_watch(watch->node, token); + } + } + + xs_resuming = 0; + wake_up(&xs_resuming_waitq); +} + +static void xenbus_fire_watch(void *arg) +{ + struct xs_stored_msg *msg = arg; + + msg->u.watch.handle->callback(msg->u.watch.handle, + (const char **)msg->u.watch.vec, + msg->u.watch.vec_size); + + kfree(msg->u.watch.vec); + kfree(msg); +} + +static int process_msg(void) +{ + struct xs_stored_msg *msg; + char *body; + int err; + + msg = kmalloc(sizeof(*msg), GFP_KERNEL); + if (msg == NULL) + return -ENOMEM; + + err = xb_read(&msg->hdr, sizeof(msg->hdr)); + if (err) { + kfree(msg); + return err; + } + + body = kmalloc(msg->hdr.len + 1, GFP_KERNEL); + if (body == NULL) { + kfree(msg); + return -ENOMEM; + } + + err = xb_read(body, msg->hdr.len); + if (err) { + kfree(body); + kfree(msg); + return err; + } + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg); + + msg->u.watch.vec = split(body, msg->hdr.len, + &msg->u.watch.vec_size); + if (IS_ERR(msg->u.watch.vec)) { + kfree(msg); + return PTR_ERR(msg->u.watch.vec); + } + + spin_lock(&watches_lock); + msg->u.watch.handle = find_watch( + msg->u.watch.vec[XS_WATCH_TOKEN]); + if (msg->u.watch.handle != NULL) { + schedule_work(&msg->u.watch.work); + } else { + kfree(msg->u.watch.vec); + kfree(msg); + } + spin_unlock(&watches_lock); + } else { + msg->u.reply.body = body; + spin_lock(&xs_state.reply_lock); + list_add_tail(&msg->u.reply.list, &xs_state.reply_list); + spin_unlock(&xs_state.reply_lock); + wake_up(&xs_state.reply_waitq); + } + + return 0; +} + +static int read_thread(void *unused) +{ + int err; + + for (;;) { + err = process_msg(); + if (err) + printk(KERN_WARNING "XENBUS error %d while reading " + "message\n", err); } } int xs_init(void) { int err; - struct task_struct *watcher; + struct task_struct *reader; + + INIT_LIST_HEAD(&xs_state.reply_list); + spin_lock_init(&xs_state.reply_lock); + init_waitqueue_head(&xs_state.reply_waitq); + + init_MUTEX(&xs_state.request_mutex); + init_MUTEX(&xs_state.transaction_mutex); + xs_state.transaction_pid = -1; err = xb_init_comms(); if (err) return err; - watcher = kthread_run(watch_thread, NULL, "kxbwatch"); - if (IS_ERR(watcher)) - return PTR_ERR(watcher); + reader = kthread_run(read_thread, NULL, "xenbusd"); + if (IS_ERR(reader)) + return PTR_ERR(reader); + return 0; } diff -r ab93a9a46bd4 -r 8016551fde98 linux-2.6-xen-sparse/include/asm-xen/xenbus.h --- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h Sun Oct 9 16:29:24 2005 +++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h Sun Oct 9 17:52:54 2005 @@ -78,10 +78,6 @@ int xenbus_register_backend(struct xenbus_driver *drv); void xenbus_unregister_driver(struct xenbus_driver *drv); -/* Caller must hold this lock to call these functions: it's also held - * across watch callbacks. */ -extern struct semaphore xenbus_lock; - char **xenbus_directory(const char *dir, const char *node, unsigned int *num); void *xenbus_read(const char *dir, const char *node, unsigned int *len); int xenbus_write(const char *dir, const char *node, const char *string); @@ -113,7 +109,11 @@ struct xenbus_watch { struct list_head list; + + /* Path being watched. */ char *node; + + /* Callback (executed in a process context with no locks held). */ void (*callback)(struct xenbus_watch *, const char **vec, unsigned int len); }; @@ -124,7 +124,11 @@ int register_xenbus_watch(struct xenbus_watch *watch); void unregister_xenbus_watch(struct xenbus_watch *watch); -void reregister_xenbus_watches(void); +void xs_suspend(void); +void xs_resume(void); + +/* Used by xenbus_dev to borrow kernel's store connection. */ +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg); /* Called from xen core code. */ void xenbus_suspend(void); diff -r ab93a9a46bd4 -r 8016551fde98 tools/blktap/xenbus.c --- a/tools/blktap/xenbus.c Sun Oct 9 16:29:24 2005 +++ b/tools/blktap/xenbus.c Sun Oct 9 17:52:54 2005 @@ -260,10 +260,6 @@ node = res[XS_WATCH_PATH]; token = res[XS_WATCH_TOKEN]; - er = xs_acknowledge_watch(h, token); - if (er == 0) - warn("Couldn't acknowledge watch (%s)", token); - w = find_watch(token); if (!w) { diff -r ab93a9a46bd4 -r 8016551fde98 tools/console/daemon/io.c --- a/tools/console/daemon/io.c Sun Oct 9 16:29:24 2005 +++ b/tools/console/daemon/io.c Sun Oct 9 17:52:54 2005 @@ -505,7 +505,6 @@ domain_create_ring(dom); } - xs_acknowledge_watch(xs, vec[1]); free(vec); } diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/lowlevel/xs/xs.c --- a/tools/python/xen/lowlevel/xs/xs.c Sun Oct 9 16:29:24 2005 +++ b/tools/python/xen/lowlevel/xs/xs.c Sun Oct 9 17:52:54 2005 @@ -442,9 +442,6 @@ #define xspy_read_watch_doc "\n" \ "Read a watch notification.\n" \ - "The notification must be acknowledged by passing\n" \ - "the token to acknowledge_watch().\n" \ - " path [string]: xenstore path.\n" \ "\n" \ "Returns: [tuple] (path, token).\n" \ "Raises RuntimeError on error.\n" \ @@ -492,44 +489,6 @@ exit: if (xsval) free(xsval); - return val; -} - -#define xspy_acknowledge_watch_doc "\n" \ - "Acknowledge a watch notification that has been read.\n" \ - " token [string] : from the watch notification\n" \ - "\n" \ - "Returns None on success.\n" \ - "Raises RuntimeError on error.\n" \ - "\n" - -static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args, - PyObject *kwds) -{ - static char *kwd_spec[] = { "token", NULL }; - static char *arg_spec = "O"; - PyObject *token; - char token_str[MAX_STRLEN(unsigned long) + 1]; - - struct xs_handle *xh = xshandle(self); - PyObject *val = NULL; - int xsval = 0; - - if (!xh) - goto exit; - if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token)) - goto exit; - sprintf(token_str, "%li", (unsigned long)token); - Py_BEGIN_ALLOW_THREADS - xsval = xs_acknowledge_watch(xh, token_str); - Py_END_ALLOW_THREADS - if (!xsval) { - PyErr_SetFromErrno(PyExc_RuntimeError); - goto exit; - } - Py_INCREF(Py_None); - val = Py_None; - exit: return val; } @@ -833,7 +792,6 @@ XSPY_METH(set_permissions), XSPY_METH(watch), XSPY_METH(read_watch), - XSPY_METH(acknowledge_watch), XSPY_METH(unwatch), XSPY_METH(transaction_start), XSPY_METH(transaction_end), diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/xend/xenstore/xswatch.py --- a/tools/python/xen/xend/xenstore/xswatch.py Sun Oct 9 16:29:24 2005 +++ b/tools/python/xen/xend/xenstore/xswatch.py Sun Oct 9 17:52:54 2005 @@ -8,6 +8,7 @@ import select import threading from xen.lowlevel import xs +from xen.xend.xenstore.xsutil import xshandle class xswatch: @@ -27,10 +28,7 @@ if cls.watchThread: cls.xslock.release() return - # XXX: When we fix xenstored to have better watch semantics, - # this can change to shared xshandle(). Currently that would result - # in duplicate watch firings, thus failed extra xs.acknowledge_watch. - cls.xs = xs.open() + cls.xs = xshandle() cls.watchThread = threading.Thread(name="Watcher", target=cls.watchMain) cls.watchThread.setDaemon(True) @@ -43,11 +41,10 @@ while True: try: we = cls.xs.read_watch() - watch = we[1] - cls.xs.acknowledge_watch(watch) except RuntimeError, ex: print ex raise + watch = we[1] watch.fn(*watch.args, **watch.kwargs) watchMain = classmethod(watchMain) diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/07watch.test --- a/tools/xenstore/testsuite/07watch.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/07watch.test Sun Oct 9 17:52:54 2005 @@ -5,7 +5,6 @@ 2 write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token 1 close # Check that reads don't set it off. @@ -22,15 +21,12 @@ 2 mkdir /dir/newdir expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 2 setperm /dir/newdir 0 READ expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 2 rm /dir/newdir expect 1:/dir/newdir:token 1 waitwatch -1 ackwatch token 1 close 2 close @@ -49,7 +45,6 @@ read /dir/test expect /dir/test:token waitwatch -ackwatch token close # watch priority test: all simultaneous @@ -59,13 +54,10 @@ write /dir/test contents expect 3:/dir/test:token3 3 waitwatch -3 ackwatch token3 expect 2:/dir/test:token2 2 waitwatch -2 ackwatch token2 expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close 3 close @@ -79,7 +71,6 @@ 2 close expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close # If one dies (without reading at all), the other should still get ack. @@ -89,7 +80,6 @@ 2 close expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close @@ -111,7 +101,6 @@ 2 unwatch /dir token2 expect 1:/dir/test:token1 1 waitwatch -1 ackwatch token1 1 close 2 close @@ -123,14 +112,12 @@ write /dir/test contents2 expect 1:/dir/test:token2 1 waitwatch -1 ackwatch token2 # check we only get notified once. 1 watch /test token 2 write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch 1 close @@ -142,13 +129,10 @@ 2 write /test3 contents expect 1:/test1:token 1 waitwatch -1 ackwatch token expect 1:/test2:token 1 waitwatch -1 ackwatch token expect 1:/test3:token 1 waitwatch -1 ackwatch token 1 close # Creation of subpaths should be covered correctly. @@ -157,10 +141,8 @@ 2 write /test/subnode/subnode contents2 expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token expect 1:/test/subnode/subnode:token 1 waitwatch -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch 1 close @@ -171,7 +153,6 @@ 1 watchnoack / token2 0 expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token expect 1:/:token2 1 waitwatch expect 1: waitwatch failed: Connection timed out @@ -183,7 +164,6 @@ 2 rm /test expect 1:/test/subnode:token 1 waitwatch -1 ackwatch token # Watch should not double-send after we ack, even if we did something in between. 1 watch /test2 token @@ -192,6 +172,5 @@ 1 waitwatch expect 1:contents2 1 read /test2/foo -1 ackwatch token expect 1: waitwatch failed: Connection timed out 1 waitwatch diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/08transaction.test --- a/tools/xenstore/testsuite/08transaction.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/08transaction.test Sun Oct 9 17:52:54 2005 @@ -68,7 +68,6 @@ 2 commit expect 1:/test/dir/sub:token 1 waitwatch -1 ackwatch token 1 close # Rm inside transaction works like rm outside: children get notified. @@ -78,7 +77,6 @@ 2 commit expect 1:/test/dir/sub:token 1 waitwatch -1 ackwatch token 1 close # Multiple events from single transaction don't trigger assert @@ -89,8 +87,6 @@ 2 commit expect 1:/test/1:token 1 waitwatch -1 ackwatch token expect 1:/test/2:token 1 waitwatch -1 ackwatch token 1 close diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/10domain-homedir.test --- a/tools/xenstore/testsuite/10domain-homedir.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/10domain-homedir.test Sun Oct 9 17:52:54 2005 @@ -16,4 +16,3 @@ write /home/foo/bar contents expect 1:foo/bar:token 1 waitwatch -1 ackwatch token diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/11domain-watch.test --- a/tools/xenstore/testsuite/11domain-watch.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/11domain-watch.test Sun Oct 9 17:52:54 2005 @@ -10,7 +10,6 @@ write /test contents2 expect 1:/test:token 1 waitwatch -1 ackwatch token 1 unwatch /test token release 1 1 close @@ -25,7 +24,6 @@ 1 write /dir/test4 contents4 expect 1:/dir/test:token 1 waitwatch -1 ackwatch token release 1 1 close diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/12readonly.test --- a/tools/xenstore/testsuite/12readonly.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/12readonly.test Sun Oct 9 17:52:54 2005 @@ -36,4 +36,3 @@ 1 write /test contents expect /test:token waitwatch -ackwatch token diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/13watch-ack.test --- a/tools/xenstore/testsuite/13watch-ack.test Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/testsuite/13watch-ack.test Sun Oct 9 17:52:54 2005 @@ -18,5 +18,4 @@ 1 waitwatch 3 write /test/1 contents1 4 write /test/3 contents3 -1 ackwatch token2 1 close diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.c --- a/tools/xenstore/xenstored_core.c Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xenstored_core.c Sun Oct 9 17:52:54 2005 @@ -154,7 +154,6 @@ case XS_READ: return "READ"; case XS_GET_PERMS: return "GET_PERMS"; case XS_WATCH: return "WATCH"; - case XS_WATCH_ACK: return "WATCH_ACK"; case XS_UNWATCH: return "UNWATCH"; case XS_TRANSACTION_START: return "TRANSACTION_START"; case XS_TRANSACTION_END: return "TRANSACTION_END"; @@ -1103,10 +1102,6 @@ do_watch(conn, in); break; - case XS_WATCH_ACK: - do_watch_ack(conn, onearg(in)); - break; - case XS_UNWATCH: do_unwatch(conn, in); break; @@ -1167,11 +1162,6 @@ if (verbose) xprintf("Got message %s len %i from %p\n", sockmsg_string(type), conn->in->hdr.msg.len, conn); - - /* We might get a command while waiting for an ack: this means - * the other end discarded it: we will re-transmit. */ - if (type != XS_WATCH_ACK) - conn->waiting_for_ack = NULL; /* Careful: process_message may free connection. We detach * "in" beforehand and allocate the new buffer to avoid @@ -1266,7 +1256,6 @@ new->state = OK; new->out = new->waiting_reply = NULL; - new->waiting_for_ack = NULL; new->fd = -1; new->id = 0; new->domain = NULL; diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.h --- a/tools/xenstore/xenstored_core.h Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xenstored_core.h Sun Oct 9 17:52:54 2005 @@ -70,9 +70,6 @@ /* Is this a read-only connection? */ bool can_write; - - /* Are we waiting for a watch event ack? */ - struct watch *waiting_for_ack; /* Buffered incoming data. */ struct buffered_data *in; diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_watch.c --- a/tools/xenstore/xenstored_watch.c Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xenstored_watch.c Sun Oct 9 17:52:54 2005 @@ -69,18 +69,14 @@ if (conn->waiting_reply) { conn->out = conn->waiting_reply; conn->waiting_reply = NULL; - conn->waiting_for_ack = NULL; - return; - } - - /* If we're already waiting for ack, don't queue more. */ - if (conn->waiting_for_ack) - return; + return; + } list_for_each_entry(watch, &conn->watches, list) { event = list_top(&watch->events, struct watch_event, list); if (event) { - conn->waiting_for_ack = watch; + list_del(&event->list); + talloc_free(event); send_reply(conn,XS_WATCH_EVENT,event->data,event->len); break; } @@ -181,6 +177,15 @@ } } + /* Check for duplicates. */ + list_for_each_entry(watch, &conn->watches, list) { + if (streq(watch->node, vec[0]) && + streq(watch->token, vec[1])) { + send_error(conn, EEXIST); + return; + } + } + watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); watch->token = talloc_strdup(watch, vec[1]); @@ -200,37 +205,6 @@ add_event(conn, watch, watch->node); } -void do_watch_ack(struct connection *conn, const char *token) -{ - struct watch_event *event; - - if (!token) { - send_error(conn, EINVAL); - return; - } - - if (!conn->waiting_for_ack) { - send_error(conn, ENOENT); - return; - } - - if (!streq(conn->waiting_for_ack->token, token)) { - /* They're confused: this will cause us to send event again */ - conn->waiting_for_ack = NULL; - send_error(conn, EINVAL); - return; - } - - /* Remove event: after ack sent, core will call queue_next_event */ - event = list_top(&conn->waiting_for_ack->events, struct watch_event, - list); - list_del(&event->list); - talloc_free(event); - - conn->waiting_for_ack = NULL; - send_ack(conn, XS_WATCH_ACK); -} - void do_unwatch(struct connection *conn, struct buffered_data *in) { struct watch *watch; @@ -241,9 +215,6 @@ return; } - /* We don't need to worry if we're waiting for an ack for the - * watch we're deleting: conn->waiting_for_ack was reset by - * this command in consider_message anyway. */ node = canonicalize(conn, vec[0]); list_for_each_entry(watch, &conn->watches, list) { if (streq(watch->node, node) && streq(watch->token, vec[1])) { @@ -262,11 +233,6 @@ struct watch *watch; struct watch_event *event; - if (conn->waiting_for_ack) - printf(" waiting_for_ack for watch on %s token %s\n", - conn->waiting_for_ack->node, - conn->waiting_for_ack->token); - list_for_each_entry(watch, &conn->watches, list) { printf(" watch on %s token %s\n", watch->node, watch->token); diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.c --- a/tools/xenstore/xs.c Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xs.c Sun Oct 9 17:52:54 2005 @@ -78,9 +78,29 @@ /* One transaction at a time. */ pthread_mutex_t transaction_mutex; + pthread_t transaction_pthread; }; static void *read_thread(void *arg); + +static void request_mutex_acquire(struct xs_handle *h) +{ + /* + * We can't distinguish non-transactional from transactional + * requests right now. So temporarily acquire the transaction mutex + * if this task is outside transaction context. + */ + if (h->transaction_pthread != pthread_self()) + pthread_mutex_lock(&h->transaction_mutex); + pthread_mutex_lock(&h->request_mutex); +} + +static void request_mutex_release(struct xs_handle *h) +{ + pthread_mutex_unlock(&h->request_mutex); + if (h->transaction_pthread != pthread_self()) + pthread_mutex_unlock(&h->transaction_mutex); +} int xs_fileno(struct xs_handle *h) { @@ -163,6 +183,7 @@ pthread_mutex_init(&h->request_mutex, NULL); pthread_mutex_init(&h->transaction_mutex, NULL); + h->transaction_pthread = -1; if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) goto error; @@ -316,7 +337,7 @@ ignorepipe.sa_flags = 0; sigaction(SIGPIPE, &ignorepipe, &oldact); - pthread_mutex_lock(&h->request_mutex); + request_mutex_acquire(h); if (!xs_write_all(h->fd, &msg, sizeof(msg))) goto fail; @@ -329,7 +350,7 @@ if (!ret) goto fail; - pthread_mutex_unlock(&h->request_mutex); + request_mutex_release(h); sigaction(SIGPIPE, &oldact, NULL); if (msg.type == XS_ERROR) { @@ -350,7 +371,7 @@ fail: /* We're in a bad state, so close fd. */ saved_errno = errno; - pthread_mutex_unlock(&h->request_mutex); + request_mutex_release(h); sigaction(SIGPIPE, &oldact, NULL); close_fd: close(h->fd); @@ -593,15 +614,6 @@ return ret; } -/* Acknowledge watch on node. Watches must be acknowledged before - * any other watches can be read. - * Returns false on failure. - */ -bool xs_acknowledge_watch(struct xs_handle *h, const char *token) -{ - return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL)); -} - /* Remove a watch on a node. * Returns false on failure (no watch on that node). */ @@ -624,8 +636,18 @@ */ bool xs_transaction_start(struct xs_handle *h) { + bool rc; + pthread_mutex_lock(&h->transaction_mutex); - return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); + h->transaction_pthread = pthread_self(); + + rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); + if (!rc) { + h->transaction_pthread = -1; + pthread_mutex_unlock(&h->transaction_mutex); + } + + return rc; } /* End a transaction. @@ -645,6 +667,7 @@ rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + h->transaction_pthread = -1; pthread_mutex_unlock(&h->transaction_mutex); return rc; diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.h --- a/tools/xenstore/xs.h Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xs.h Sun Oct 9 17:52:54 2005 @@ -96,12 +96,6 @@ */ char **xs_read_watch(struct xs_handle *h, unsigned int *num); -/* Acknowledge watch on node. Watches must be acknowledged before - * any other watches can be read. - * Returns false on failure. - */ -bool xs_acknowledge_watch(struct xs_handle *h, const char *token); - /* Remove a watch on a node: implicitly acks any outstanding watch. * Returns false on failure (no watch on that node). */ diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs_test.c --- a/tools/xenstore/xs_test.c Sun Oct 9 16:29:24 2005 +++ b/tools/xenstore/xs_test.c Sun Oct 9 17:52:54 2005 @@ -201,7 +201,6 @@ " watch <path> <token>\n" " watchnoack <path> <token>\n" " waitwatch\n" - " ackwatch <token>\n" " unwatch <path> <token>\n" " close\n" " start <node>\n" @@ -455,8 +454,6 @@ !streq(vec[XS_WATCH_PATH], node) || !streq(vec[XS_WATCH_TOKEN], token)) failed(handle); - if (!xs_acknowledge_watch(handles[handle], token)) - failed(handle); } } @@ -513,12 +510,6 @@ else output("%s:%s\n", vec[XS_WATCH_PATH], vec[XS_WATCH_TOKEN]); free(vec); -} - -static void do_ackwatch(unsigned int handle, const char *token) -{ - if (!xs_acknowledge_watch(handles[handle], token)) - failed(handle); } static void do_unwatch(unsigned int handle, const char *node, const char *token) @@ -746,8 +737,6 @@ do_watch(handle, arg(line, 1), arg(line, 2), false); else if (streq(command, "waitwatch")) do_waitwatch(handle); - else if (streq(command, "ackwatch")) - do_ackwatch(handle, arg(line, 1)); else if (streq(command, "unwatch")) do_unwatch(handle, arg(line, 1), arg(line, 2)); else if (streq(command, "close")) { diff -r ab93a9a46bd4 -r 8016551fde98 xen/include/public/io/xs_wire.h --- a/xen/include/public/io/xs_wire.h Sun Oct 9 16:29:24 2005 +++ b/xen/include/public/io/xs_wire.h Sun Oct 9 17:52:54 2005 @@ -35,11 +35,9 @@ XS_READ, XS_GET_PERMS, XS_WATCH, - XS_WATCH_ACK, XS_UNWATCH, XS_TRANSACTION_START, XS_TRANSACTION_END, - XS_OP_READ_ONLY = XS_TRANSACTION_END, XS_INTRODUCE, XS_RELEASE, XS_GET_DOMAIN_PATH, _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |