[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Refactor xenbus to break up the xenbus_lock and permit watches



# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 8016551fde9825fc82bfa4762f17b98e7519b823
# Parent  ab93a9a46bd48f9a654b1fdc9caf4b7ae07f6a8b
Refactor xenbus to break up the xenbus_lock and permit watches
to fire concurrently with request/reply pairs. Remove
watch_ack message: no longer needed.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c
--- a/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c       Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/i386/kernel/smpboot.c       Sun Oct  9 
17:52:54 2005
@@ -1327,18 +1327,14 @@
        .callback = handle_vcpu_hotplug_event
 };
 
-/* NB: Assumes xenbus_lock is held! */
 static int setup_cpu_watcher(struct notifier_block *notifier,
                              unsigned long event, void *data)
 {
-       int err = 0;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+       int err;
+
        err = register_xenbus_watch(&cpu_watch);
-
-       if (err) {
+       if (err)
                printk("Failed to register watch on /cpu\n");
-       }
 
        return NOTIFY_DONE;
 }
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/arch/xen/kernel/reboot.c
--- a/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c     Sun Oct  9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/arch/xen/kernel/reboot.c     Sun Oct  9 17:52:54 2005
@@ -360,9 +360,6 @@
 
 static struct notifier_block xenstore_notifier;
 
-/* Setup our watcher
-   NB: Assumes xenbus_lock is held!
-*/
 static int setup_shutdown_watcher(struct notifier_block *notifier,
                                   unsigned long event,
                                   void *data)
@@ -371,8 +368,6 @@
 #ifdef CONFIG_MAGIC_SYSRQ
        int err2 = 0;
 #endif
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
 
        err1 = register_xenbus_watch(&shutdown_watch);
 #ifdef CONFIG_MAGIC_SYSRQ
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c
--- a/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c        Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/balloon/balloon.c        Sun Oct  9 
17:52:54 2005
@@ -370,16 +370,11 @@
     
 }
 
-/* Setup our watcher
-   NB: Assumes xenbus_lock is held!
-*/
 int balloon_init_watcher(struct notifier_block *notifier,
                          unsigned long event,
                          void *data)
 {
        int err;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
 
        err = register_xenbus_watch(&target_watch);
        if (err)
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c    Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_comms.c    Sun Oct  9 
17:52:54 2005
@@ -130,15 +130,10 @@
 
                wait_event(xb_waitq, output_avail(out));
 
-               /* Read, then check: not that we don't trust store.
-                * Hell, some of my best friends are daemons.  But,
-                * in this post-911 world... */
+               mb();
                h = *out;
-               mb();
-               if (!check_buffer(&h)) {
-                       set_current_state(TASK_RUNNING);
-                       return -EIO; /* ETERRORIST! */
-               }
+               if (!check_buffer(&h))
+                       return -EIO;
 
                dst = get_output_chunk(&h, out->buf, &avail);
                if (avail > len)
@@ -173,12 +168,11 @@
                const char *src;
 
                wait_event(xb_waitq, xs_input_avail());
+
+               mb();
                h = *in;
-               mb();
-               if (!check_buffer(&h)) {
-                       set_current_state(TASK_RUNNING);
+               if (!check_buffer(&h))
                        return -EIO;
-               }
 
                src = get_input_chunk(&h, in->buf, &avail);
                if (avail > len)
@@ -195,10 +189,6 @@
                        notify_remote_via_evtchn(xen_start_info->store_evtchn);
        }
 
-       /* If we left something, wake watch thread to deal with it. */
-       if (xs_input_avail())
-               wake_up(&xb_waitq);
-
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Sun Oct  9 
17:52:54 2005
@@ -46,85 +46,113 @@
 #include <asm/hypervisor.h>
 
 struct xenbus_dev_data {
-       /* Are there bytes left to be read in this message? */
-       int bytes_left;
-       /* Are we still waiting for the reply to a message we wrote? */
-       int awaiting_reply;
-       /* Buffer for outgoing messages. */
+       int in_transaction;
+
+       /* Partial request. */
        unsigned int len;
        union {
                struct xsd_sockmsg msg;
                char buffer[PAGE_SIZE];
        } u;
+
+       /* Response queue. */
+#define MASK_READ_IDX(idx) ((idx)&(PAGE_SIZE-1))
+       char read_buffer[PAGE_SIZE];
+       unsigned int read_cons, read_prod;
+       wait_queue_head_t read_waitq;
 };
 
 static struct proc_dir_entry *xenbus_dev_intf;
 
-/* Reply can be long (dir, getperm): don't buffer, just examine
- * headers so we can discard rest if they die. */
 static ssize_t xenbus_dev_read(struct file *filp,
                               char __user *ubuf,
                               size_t len, loff_t *ppos)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-       struct xsd_sockmsg msg;
-       int err;
-
-       /* Refill empty buffer? */
-       if (data->bytes_left == 0) {
-               if (len < sizeof(msg))
-                       return -EINVAL;
-
-               err = xb_read(&msg, sizeof(msg));
-               if (err)
-                       return err;
-               data->bytes_left = msg.len;
-               if (ubuf && copy_to_user(ubuf, &msg, sizeof(msg)) != 0)
-                       return -EFAULT;
-               /* We can receive spurious XS_WATCH_EVENT messages. */
-               if (msg.type != XS_WATCH_EVENT)
-                       data->awaiting_reply = 0;
-               return sizeof(msg);
+       struct xenbus_dev_data *u = filp->private_data;
+       int i;
+
+       if (wait_event_interruptible(u->read_waitq,
+                                    u->read_prod != u->read_cons))
+               return -EINTR;
+
+       for (i = 0; i < len; i++) {
+               if (u->read_cons == u->read_prod)
+                       break;
+               put_user(u->read_buffer[MASK_READ_IDX(u->read_cons)], ubuf+i);
+               u->read_cons++;
        }
 
-       /* Don't read over next header, or over temporary buffer. */
-       if (len > sizeof(data->u.buffer))
-               len = sizeof(data->u.buffer);
-       if (len > data->bytes_left)
-               len = data->bytes_left;
-
-       err = xb_read(data->u.buffer, len);
-       if (err)
-               return err;
-
-       data->bytes_left -= len;
-       if (ubuf && copy_to_user(ubuf, data->u.buffer, len) != 0)
-               return -EFAULT;
-       return len;
-}
-
-/* We do v. basic sanity checking so they don't screw up kernel later. */
+       return i;
+}
+
+static void queue_reply(struct xenbus_dev_data *u,
+                       char *data, unsigned int len)
+{
+       int i;
+
+       for (i = 0; i < len; i++, u->read_prod++)
+               u->read_buffer[MASK_READ_IDX(u->read_prod)] = data[i];
+
+       BUG_ON((u->read_prod - u->read_cons) > sizeof(u->read_buffer));
+
+       wake_up(&u->read_waitq);
+}
+
 static ssize_t xenbus_dev_write(struct file *filp,
                                const char __user *ubuf,
                                size_t len, loff_t *ppos)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-       int err;
-
-       /* We gather data in buffer until we're ready to send it. */
-       if (len > data->len + sizeof(data->u))
+       struct xenbus_dev_data *u = filp->private_data;
+       void *reply;
+       int err = 0;
+
+       if ((len + u->len) > sizeof(u->u.buffer))
                return -EINVAL;
-       if (copy_from_user(data->u.buffer + data->len, ubuf, len) != 0)
+
+       if (copy_from_user(u->u.buffer + u->len, ubuf, len) != 0)
                return -EFAULT;
-       data->len += len;
-       if (data->len >= sizeof(data->u.msg) + data->u.msg.len) {
-               err = xb_write(data->u.buffer, data->len);
-               if (err)
-                       return err;
-               data->len = 0;
-               data->awaiting_reply = 1;
+
+       u->len += len;
+       if (u->len < (sizeof(u->u.msg) + u->u.msg.len))
+               return len;
+
+       switch (u->u.msg.type) {
+       case XS_TRANSACTION_START:
+       case XS_TRANSACTION_END:
+       case XS_DIRECTORY:
+       case XS_READ:
+       case XS_GET_PERMS:
+       case XS_RELEASE:
+       case XS_GET_DOMAIN_PATH:
+       case XS_WRITE:
+       case XS_MKDIR:
+       case XS_RM:
+       case XS_SET_PERMS:
+               reply = xenbus_dev_request_and_reply(&u->u.msg);
+               if (IS_ERR(reply))
+                       err = PTR_ERR(reply);
+               else {
+                       if (u->u.msg.type == XS_TRANSACTION_START)
+                               u->in_transaction = 1;
+                       if (u->u.msg.type == XS_TRANSACTION_END)
+                               u->in_transaction = 0;
+                       queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
+                       queue_reply(u, (char *)reply, u->u.msg.len);
+                       kfree(reply);
+               }
+               break;
+
+       default:
+               err = -EINVAL;
+               break;
        }
-       return len;
+
+       if (err == 0) {
+               u->len = 0;
+               err = len;
+       }
+
+       return err;
 }
 
 static int xenbus_dev_open(struct inode *inode, struct file *filp)
@@ -134,7 +162,6 @@
        if (xen_start_info->store_evtchn == 0)
                return -ENOENT;
 
-       /* Don't try seeking. */
        nonseekable_open(inode, filp);
 
        u = kmalloc(sizeof(*u), GFP_KERNEL);
@@ -142,28 +169,21 @@
                return -ENOMEM;
 
        memset(u, 0, sizeof(*u));
+       init_waitqueue_head(&u->read_waitq);
 
        filp->private_data = u;
 
-       down(&xenbus_lock);
-
        return 0;
 }
 
 static int xenbus_dev_release(struct inode *inode, struct file *filp)
 {
-       struct xenbus_dev_data *data = filp->private_data;
-
-       /* Discard any unread replies. */
-       while (data->bytes_left || data->awaiting_reply)
-               xenbus_dev_read(filp, NULL, sizeof(data->u.buffer), NULL);
-
-       /* Harmless if no transaction in progress. */
-       xenbus_transaction_end(1);
-
-       up(&xenbus_lock);
-
-       kfree(data);
+       struct xenbus_dev_data *u = filp->private_data;
+
+       if (u->in_transaction)
+               xenbus_transaction_end(1);
+
+       kfree(u);
 
        return 0;
 }
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c    Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_probe.c    Sun Oct  9 
17:52:54 2005
@@ -43,6 +43,9 @@
 
 static struct notifier_block *xenstore_chain;
 
+/* Now used to protect xenbus probes against save/restore. */
+static DECLARE_MUTEX(xenbus_lock);
+
 /* If something in array of ids matches this device, return it. */
 static const struct xenbus_device_id *
 match_device(const struct xenbus_device_id *arr, struct xenbus_device *dev)
@@ -625,12 +628,13 @@
        down(&xenbus_lock);
        bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, suspend_dev);
        bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, suspend_dev);
+       xs_suspend();
 }
 
 void xenbus_resume(void)
 {
        xb_init_comms();
-       reregister_xenbus_watches();
+       xs_resume();
        bus_for_each_dev(&xenbus_frontend.bus, NULL, NULL, resume_dev);
        bus_for_each_dev(&xenbus_backend.bus, NULL, NULL, resume_dev);
        up(&xenbus_lock);
@@ -685,6 +689,7 @@
        /* Notify others that xenstore is up */
        notifier_call_chain(&xenstore_chain, 0, 0);
        up(&xenbus_lock);
+
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Sun Oct  9 
16:29:24 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Sun Oct  9 
17:52:54 2005
@@ -42,11 +42,67 @@
 
 #define streq(a, b) (strcmp((a), (b)) == 0)
 
-static char printf_buffer[4096];
+struct xs_stored_msg {
+       struct xsd_sockmsg hdr;
+
+       union {
+               /* Stored replies. */
+               struct {
+                       struct list_head list;
+                       char *body;
+               } reply;
+
+               /* Queued watch callbacks. */
+               struct {
+                       struct work_struct work;
+                       struct xenbus_watch *handle;
+                       char **vec;
+                       unsigned int vec_size;
+               } watch;
+       } u;
+};
+
+struct xs_handle {
+       /* A list of replies. Currently only one will ever be outstanding. */
+       struct list_head reply_list;
+       spinlock_t reply_lock;
+       wait_queue_head_t reply_waitq;
+
+       /* One request at a time. */
+       struct semaphore request_mutex;
+
+       /* One transaction at a time. */
+       struct semaphore transaction_mutex;
+       int transaction_pid;
+};
+
+static struct xs_handle xs_state;
+
 static LIST_HEAD(watches);
-
-DECLARE_MUTEX(xenbus_lock);
-EXPORT_SYMBOL(xenbus_lock);
+static DEFINE_SPINLOCK(watches_lock);
+
+/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
+static int xs_resuming;
+static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
+
+static void request_mutex_acquire(void)
+{
+       /*
+        * We can't distinguish non-transactional from transactional
+        * requests right now. So temporarily acquire the transaction mutex
+        * if this task is outside transaction context.
+        */
+       if (xs_state.transaction_pid != current->pid)
+               down(&xs_state.transaction_mutex);
+       down(&xs_state.request_mutex);
+}
+
+static void request_mutex_release(void)
+{
+       up(&xs_state.request_mutex);
+       if (xs_state.transaction_pid != current->pid)
+               up(&xs_state.transaction_mutex);
+}
 
 static int get_error(const char *errorstring)
 {
@@ -65,29 +121,32 @@
 
 static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
 {
-       struct xsd_sockmsg msg;
-       void *ret;
-       int err;
-
-       err = xb_read(&msg, sizeof(msg));
-       if (err)
-               return ERR_PTR(err);
-
-       ret = kmalloc(msg.len + 1, GFP_KERNEL);
-       if (!ret)
-               return ERR_PTR(-ENOMEM);
-
-       err = xb_read(ret, msg.len);
-       if (err) {
-               kfree(ret);
-               return ERR_PTR(err);
-       }
-       ((char*)ret)[msg.len] = '\0';
-
-       *type = msg.type;
+       struct xs_stored_msg *msg;
+       char *body;
+
+       spin_lock(&xs_state.reply_lock);
+
+       while (list_empty(&xs_state.reply_list)) {
+               spin_unlock(&xs_state.reply_lock);
+               wait_event(xs_state.reply_waitq,
+                          !list_empty(&xs_state.reply_list));
+               spin_lock(&xs_state.reply_lock);
+       }
+
+       msg = list_entry(xs_state.reply_list.next,
+                        struct xs_stored_msg, u.reply.list);
+       list_del(&msg->u.reply.list);
+
+       spin_unlock(&xs_state.reply_lock);
+
+       *type = msg->hdr.type;
        if (len)
-               *len = msg.len;
-       return ret;
+               *len = msg->hdr.len;
+       body = msg->u.reply.body;
+
+       kfree(msg);
+
+       return body;
 }
 
 /* Emergency write. */
@@ -98,10 +157,45 @@
        msg.type = XS_DEBUG;
        msg.len = sizeof("print") + count + 1;
 
+       request_mutex_acquire();
        xb_write(&msg, sizeof(msg));
        xb_write("print", sizeof("print"));
        xb_write(str, count);
        xb_write("", 1);
+       request_mutex_release();
+}
+
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
+{
+       void *ret;
+       struct xsd_sockmsg req_msg = *msg;
+       int err;
+
+       if (req_msg.type == XS_TRANSACTION_START) {
+               down(&xs_state.transaction_mutex);
+               xs_state.transaction_pid = current->pid;
+       }
+
+       request_mutex_acquire();
+
+       err = xb_write(msg, sizeof(*msg) + msg->len);
+       if (err) {
+               msg->type = XS_ERROR;
+               ret = ERR_PTR(err);
+       } else {
+               ret = read_reply(&msg->type, &msg->len);
+       }
+
+       request_mutex_release();
+
+       if ((msg->type == XS_TRANSACTION_END) ||
+           ((req_msg.type == XS_TRANSACTION_START) &&
+            (msg->type == XS_ERROR))) {
+               xs_state.transaction_pid = -1;
+               up(&xs_state.transaction_mutex);
+       }
+
+       return ret;
 }
 
 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
@@ -115,31 +209,33 @@
        unsigned int i;
        int err;
 
-       WARN_ON(down_trylock(&xenbus_lock) == 0);
-
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
                msg.len += iovec[i].iov_len;
 
+       request_mutex_acquire();
+
        err = xb_write(&msg, sizeof(msg));
-       if (err)
+       if (err) {
+               up(&xs_state.request_mutex);
                return ERR_PTR(err);
+       }
 
        for (i = 0; i < num_vecs; i++) {
                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
-               if (err)
+               if (err) {
+                       request_mutex_release();
                        return ERR_PTR(err);
-       }
-
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               kfree(ret);
-               ret = read_reply(&msg.type, len);
-               if (IS_ERR(ret))
-                       return ret;
-       } while (msg.type == XS_WATCH_EVENT);
+               }
+       }
+
+       ret = read_reply(&msg.type, len);
+
+       request_mutex_release();
+
+       if (IS_ERR(ret))
+               return ret;
 
        if (msg.type == XS_ERROR) {
                err = get_error(ret);
@@ -187,8 +283,6 @@
 {
        static char buffer[4096];
 
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
-       /* XXX FIXME: might not be correct if name == "" */
        BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer));
 
        strcpy(buffer, dir);
@@ -207,7 +301,7 @@
        *num = count_strings(strings, len);
 
        /* Transfer to one big alloc for easy freeing. */
-       ret = kmalloc(*num * sizeof(char *) + len, GFP_ATOMIC);
+       ret = kmalloc(*num * sizeof(char *) + len, GFP_KERNEL);
        if (!ret) {
                kfree(strings);
                return ERR_PTR(-ENOMEM);
@@ -298,7 +392,18 @@
  */
 int xenbus_transaction_start(void)
 {
-       return xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+       int err;
+
+       down(&xs_state.transaction_mutex);
+       xs_state.transaction_pid = current->pid;
+
+       err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
+       if (err) {
+               xs_state.transaction_pid = -1;
+               up(&xs_state.transaction_mutex);
+       }
+
+       return err;
 }
 EXPORT_SYMBOL(xenbus_transaction_start);
 
@@ -308,12 +413,19 @@
 int xenbus_transaction_end(int abort)
 {
        char abortstr[2];
+       int err;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
-       return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+       err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
+
+       xs_state.transaction_pid = -1;
+       up(&xs_state.transaction_mutex);
+
+       return err;
 }
 EXPORT_SYMBOL(xenbus_transaction_end);
 
@@ -344,14 +456,23 @@
 {
        va_list ap;
        int ret;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+#define PRINTF_BUFFER_SIZE 4096
+       char *printf_buffer;
+
+       printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+       if (printf_buffer == NULL)
+               return -ENOMEM;
+
        va_start(ap, fmt);
-       ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap);
+       ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
        va_end(ap);
 
-       BUG_ON(ret > sizeof(printf_buffer)-1);
-       return xenbus_write(dir, node, printf_buffer);
+       BUG_ON(ret > PRINTF_BUFFER_SIZE-1);
+       ret = xenbus_write(dir, node, printf_buffer);
+
+       kfree(printf_buffer);
+
+       return ret;
 }
 EXPORT_SYMBOL(xenbus_printf);
 
@@ -361,19 +482,28 @@
        va_list ap;
        int ret;
        unsigned int len;
-
-       BUG_ON(down_trylock(&xenbus_lock) == 0);
+       char *printf_buffer;
+
+       printf_buffer = kmalloc(PRINTF_BUFFER_SIZE, GFP_KERNEL);
+       if (printf_buffer == NULL)
+               goto fail;
 
        len = sprintf(printf_buffer, "%i ", -err);
        va_start(ap, fmt);
-       ret = vsnprintf(printf_buffer+len, sizeof(printf_buffer)-len, fmt, ap);
+       ret = vsnprintf(printf_buffer+len, PRINTF_BUFFER_SIZE-len, fmt, ap);
        va_end(ap);
 
-       BUG_ON(len + ret > sizeof(printf_buffer)-1);
+       BUG_ON(len + ret > PRINTF_BUFFER_SIZE-1);
        dev->has_error = 1;
        if (xenbus_write(dev->nodename, "error", printf_buffer) != 0)
-               printk("xenbus: failed to write error node for %s (%s)\n",
-                      dev->nodename, printf_buffer);
+               goto fail;
+
+       kfree(printf_buffer);
+       return;
+
+ fail:
+       printk("xenbus: failed to write error node for %s (%s)\n",
+              dev->nodename, printf_buffer);
 }
 EXPORT_SYMBOL(xenbus_dev_error);
 
@@ -432,26 +562,6 @@
        return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
 }
 
-static char **xs_read_watch(unsigned int *num)
-{
-       enum xsd_sockmsg_type type;
-       char *strings;
-       unsigned int len;
-
-       strings = read_reply(&type, &len);
-       if (IS_ERR(strings))
-               return (char **)strings;
-
-       BUG_ON(type != XS_WATCH_EVENT);
-
-       return split(strings, len, num);
-}
-
-static int xs_acknowledge_watch(const char *token)
-{
-       return xs_error(xs_single(XS_WATCH_ACK, token, NULL));
-}
-
 static int xs_unwatch(const char *path, const char *token)
 {
        struct kvec iov[2];
@@ -464,7 +574,6 @@
        return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
 }
 
-/* A little paranoia: we don't just trust token. */
 static struct xenbus_watch *find_watch(const char *token)
 {
        struct xenbus_watch *i, *cmp;
@@ -474,6 +583,7 @@
        list_for_each_entry(i, &watches, list)
                if (i == cmp)
                        return i;
+
        return NULL;
 }
 
@@ -485,11 +595,20 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       spin_lock(&watches_lock);
        BUG_ON(find_watch(token));
+       spin_unlock(&watches_lock);
 
        err = xs_watch(watch->node, token);
-       if (!err)
+
+       /* Ignore errors due to multiple registration. */
+       if ((err == 0) || (err == -EEXIST)) {
+               spin_lock(&watches_lock);
                list_add(&watch->list, &watches);
+               spin_unlock(&watches_lock);
+       }
+
        return err;
 }
 EXPORT_SYMBOL(register_xenbus_watch);
@@ -500,77 +619,188 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       spin_lock(&watches_lock);
        BUG_ON(!find_watch(token));
+       list_del(&watch->list);
+       spin_unlock(&watches_lock);
+
+       /* Ensure xs_resume() is not in progress (see comments there). */
+       wait_event(xs_resuming_waitq, !xs_resuming);
 
        err = xs_unwatch(watch->node, token);
-       list_del(&watch->list);
-
        if (err)
                printk(KERN_WARNING
                       "XENBUS Failed to release watch %s: %i\n",
                       watch->node, err);
+
+       /* Make sure watch is not in use. */
+       flush_scheduled_work();
 }
 EXPORT_SYMBOL(unregister_xenbus_watch);
 
-/* Re-register callbacks to all watches. */
-void reregister_xenbus_watches(void)
-{
+void xs_suspend(void)
+{
+       down(&xs_state.transaction_mutex);
+       down(&xs_state.request_mutex);
+}
+
+void xs_resume(void)
+{
+       struct list_head *ent, *prev_ent = &watches;
        struct xenbus_watch *watch;
        char token[sizeof(watch) * 2 + 1];
 
-       list_for_each_entry(watch, &watches, list) {
-               sprintf(token, "%lX", (long)watch);
-               xs_watch(watch->node, token);
-       }
-}
-
-static int watch_thread(void *unused)
-{
+       /* Protect against concurrent unregistration and freeing of watches. */
+       BUG_ON(xs_resuming);
+       xs_resuming = 1;
+
+       up(&xs_state.request_mutex);
+       up(&xs_state.transaction_mutex);
+
+       /*
+        * Iterate over the watch list re-registering each node. We must
+        * be careful about concurrent registrations and unregistrations.
+        * We search for the node immediately following the previously
+        * re-registered node. If we get no match then either we are done
+        * (previous node is last in list) or the node was unregistered, in
+        * which case we restart from the beginning of the list.
+        * register_xenbus_watch() + unregister_xenbus_watch() is safe because
+        * it will only ever move a watch node earlier in the list, so it
+        * cannot cause us to skip nodes.
+        */
        for (;;) {
-               char **vec = NULL;
-               unsigned int num;
-
-               wait_event(xb_waitq, xs_input_avail());
-
-               /* If this is a spurious wakeup caused by someone
-                * doing an op, they'll hold the lock and the buffer
-                * will be empty by the time we get there.               
-                */
-               down(&xenbus_lock);
-               if (xs_input_avail())
-                       vec = xs_read_watch(&num);
-
-               if (vec && !IS_ERR(vec)) {
-                       struct xenbus_watch *w;
-                       int err;
-
-                       err = xs_acknowledge_watch(vec[XS_WATCH_TOKEN]);
-                       if (err)
-                               printk(KERN_WARNING "XENBUS ack %s fail %i\n",
-                                      vec[XS_WATCH_TOKEN], err);
-                       w = find_watch(vec[XS_WATCH_TOKEN]);
-                       BUG_ON(!w);
-                       w->callback(w, (const char **)vec, num);
-                       kfree(vec);
-               } else if (vec)
-                       printk(KERN_WARNING "XENBUS xs_read_watch: %li\n",
-                              PTR_ERR(vec));
-               up(&xenbus_lock);
+               spin_lock(&watches_lock);
+               list_for_each(ent, &watches)
+                       if (ent->prev == prev_ent)
+                               break;
+               spin_unlock(&watches_lock);
+
+               /* No match because prev_ent is at the end of the list? */
+               if ((ent == &watches) && (watches.prev == prev_ent))
+                        break; /* We're done! */
+
+               if ((prev_ent = ent) != &watches) {
+                       /*
+                        * Safe even with watch_lock not held. We are saved by
+                        * (xs_resumed==1) check in unregister_xenbus_watch.
+                        */
+                       watch = list_entry(ent, struct xenbus_watch, list);
+                       sprintf(token, "%lX", (long)watch);
+                       xs_watch(watch->node, token);
+               }
+       }
+
+       xs_resuming = 0;
+       wake_up(&xs_resuming_waitq);
+}
+
+static void xenbus_fire_watch(void *arg)
+{
+       struct xs_stored_msg *msg = arg;
+
+       msg->u.watch.handle->callback(msg->u.watch.handle,
+                                     (const char **)msg->u.watch.vec,
+                                     msg->u.watch.vec_size);
+
+       kfree(msg->u.watch.vec);
+       kfree(msg);
+}
+
+static int process_msg(void)
+{
+       struct xs_stored_msg *msg;
+       char *body;
+       int err;
+
+       msg = kmalloc(sizeof(*msg), GFP_KERNEL);
+       if (msg == NULL)
+               return -ENOMEM;
+
+       err = xb_read(&msg->hdr, sizeof(msg->hdr));
+       if (err) {
+               kfree(msg);
+               return err;
+       }
+
+       body = kmalloc(msg->hdr.len + 1, GFP_KERNEL);
+       if (body == NULL) {
+               kfree(msg);
+               return -ENOMEM;
+       }
+
+       err = xb_read(body, msg->hdr.len);
+       if (err) {
+               kfree(body);
+               kfree(msg);
+               return err;
+       }
+       body[msg->hdr.len] = '\0';
+
+       if (msg->hdr.type == XS_WATCH_EVENT) {
+               INIT_WORK(&msg->u.watch.work, xenbus_fire_watch, msg);
+
+               msg->u.watch.vec = split(body, msg->hdr.len,
+                                        &msg->u.watch.vec_size);
+               if (IS_ERR(msg->u.watch.vec)) {
+                       kfree(msg);
+                       return PTR_ERR(msg->u.watch.vec);
+               }
+
+               spin_lock(&watches_lock);
+               msg->u.watch.handle = find_watch(
+                       msg->u.watch.vec[XS_WATCH_TOKEN]);
+               if (msg->u.watch.handle != NULL) {
+                       schedule_work(&msg->u.watch.work);
+               } else {
+                       kfree(msg->u.watch.vec);
+                       kfree(msg);
+               }
+               spin_unlock(&watches_lock);
+       } else {
+               msg->u.reply.body = body;
+               spin_lock(&xs_state.reply_lock);
+               list_add_tail(&msg->u.reply.list, &xs_state.reply_list);
+               spin_unlock(&xs_state.reply_lock);
+               wake_up(&xs_state.reply_waitq);
+       }
+
+       return 0;
+}
+
+static int read_thread(void *unused)
+{
+       int err;
+
+       for (;;) {
+               err = process_msg();
+               if (err)
+                       printk(KERN_WARNING "XENBUS error %d while reading "
+                              "message\n", err);
        }
 }
 
 int xs_init(void)
 {
        int err;
-       struct task_struct *watcher;
+       struct task_struct *reader;
+
+       INIT_LIST_HEAD(&xs_state.reply_list);
+       spin_lock_init(&xs_state.reply_lock);
+       init_waitqueue_head(&xs_state.reply_waitq);
+
+       init_MUTEX(&xs_state.request_mutex);
+       init_MUTEX(&xs_state.transaction_mutex);
+       xs_state.transaction_pid = -1;
 
        err = xb_init_comms();
        if (err)
                return err;
        
-       watcher = kthread_run(watch_thread, NULL, "kxbwatch");
-       if (IS_ERR(watcher))
-               return PTR_ERR(watcher);
+       reader = kthread_run(read_thread, NULL, "xenbusd");
+       if (IS_ERR(reader))
+               return PTR_ERR(reader);
+
        return 0;
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 
linux-2.6-xen-sparse/include/asm-xen/xenbus.h
--- a/linux-2.6-xen-sparse/include/asm-xen/xenbus.h     Sun Oct  9 16:29:24 2005
+++ b/linux-2.6-xen-sparse/include/asm-xen/xenbus.h     Sun Oct  9 17:52:54 2005
@@ -78,10 +78,6 @@
 int xenbus_register_backend(struct xenbus_driver *drv);
 void xenbus_unregister_driver(struct xenbus_driver *drv);
 
-/* Caller must hold this lock to call these functions: it's also held
- * across watch callbacks. */
-extern struct semaphore xenbus_lock;
-
 char **xenbus_directory(const char *dir, const char *node, unsigned int *num);
 void *xenbus_read(const char *dir, const char *node, unsigned int *len);
 int xenbus_write(const char *dir, const char *node, const char *string);
@@ -113,7 +109,11 @@
 struct xenbus_watch
 {
        struct list_head list;
+
+       /* Path being watched. */
        char *node;
+
+       /* Callback (executed in a process context with no locks held). */
        void (*callback)(struct xenbus_watch *,
                         const char **vec, unsigned int len);
 };
@@ -124,7 +124,11 @@
 
 int register_xenbus_watch(struct xenbus_watch *watch);
 void unregister_xenbus_watch(struct xenbus_watch *watch);
-void reregister_xenbus_watches(void);
+void xs_suspend(void);
+void xs_resume(void);
+
+/* Used by xenbus_dev to borrow kernel's store connection. */
+void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg);
 
 /* Called from xen core code. */
 void xenbus_suspend(void);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/blktap/xenbus.c
--- a/tools/blktap/xenbus.c     Sun Oct  9 16:29:24 2005
+++ b/tools/blktap/xenbus.c     Sun Oct  9 17:52:54 2005
@@ -260,10 +260,6 @@
     node  = res[XS_WATCH_PATH];
     token = res[XS_WATCH_TOKEN];
 
-    er = xs_acknowledge_watch(h, token);
-    if (er == 0)
-        warn("Couldn't acknowledge watch (%s)", token);
-
     w = find_watch(token);
     if (!w)
     {
diff -r ab93a9a46bd4 -r 8016551fde98 tools/console/daemon/io.c
--- a/tools/console/daemon/io.c Sun Oct  9 16:29:24 2005
+++ b/tools/console/daemon/io.c Sun Oct  9 17:52:54 2005
@@ -505,7 +505,6 @@
                        domain_create_ring(dom);
        }
 
-       xs_acknowledge_watch(xs, vec[1]);
        free(vec);
 }
 
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sun Oct  9 16:29:24 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sun Oct  9 17:52:54 2005
@@ -442,9 +442,6 @@
 
 #define xspy_read_watch_doc "\n"                               \
        "Read a watch notification.\n"                          \
-       "The notification must be acknowledged by passing\n"    \
-       "the token to acknowledge_watch().\n"                   \
-       " path [string]: xenstore path.\n"                      \
        "\n"                                                    \
        "Returns: [tuple] (path, token).\n"                     \
        "Raises RuntimeError on error.\n"                       \
@@ -492,44 +489,6 @@
  exit:
     if (xsval)
         free(xsval);
-    return val;
-}
-
-#define xspy_acknowledge_watch_doc "\n"                                        
\
-       "Acknowledge a watch notification that has been read.\n"        \
-       " token [string] : from the watch notification\n"               \
-       "\n"                                                            \
-       "Returns None on success.\n"                                    \
-       "Raises RuntimeError on error.\n"                               \
-       "\n"
-
-static PyObject *xspy_acknowledge_watch(PyObject *self, PyObject *args,
-                                        PyObject *kwds)
-{
-    static char *kwd_spec[] = { "token", NULL };
-    static char *arg_spec = "O";
-    PyObject *token;
-    char token_str[MAX_STRLEN(unsigned long) + 1];
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-    int xsval = 0;
-
-    if (!xh)
-        goto exit;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec, &token))
-        goto exit;
-    sprintf(token_str, "%li", (unsigned long)token);
-    Py_BEGIN_ALLOW_THREADS
-    xsval = xs_acknowledge_watch(xh, token_str);
-    Py_END_ALLOW_THREADS
-    if (!xsval) {
-        PyErr_SetFromErrno(PyExc_RuntimeError);
-        goto exit;
-    }
-    Py_INCREF(Py_None);
-    val = Py_None;
- exit:
     return val;
 }
 
@@ -833,7 +792,6 @@
      XSPY_METH(set_permissions),
      XSPY_METH(watch),
      XSPY_METH(read_watch),
-     XSPY_METH(acknowledge_watch),
      XSPY_METH(unwatch),
      XSPY_METH(transaction_start),
      XSPY_METH(transaction_end),
diff -r ab93a9a46bd4 -r 8016551fde98 tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sun Oct  9 16:29:24 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sun Oct  9 17:52:54 2005
@@ -8,6 +8,7 @@
 import select
 import threading
 from xen.lowlevel import xs
+from xen.xend.xenstore.xsutil import xshandle
 
 class xswatch:
 
@@ -27,10 +28,7 @@
         if cls.watchThread:
             cls.xslock.release()
             return
-        # XXX: When we fix xenstored to have better watch semantics,
-        # this can change to shared xshandle(). Currently that would result
-        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
-        cls.xs = xs.open()
+        cls.xs = xshandle()
         cls.watchThread = threading.Thread(name="Watcher",
                                            target=cls.watchMain)
         cls.watchThread.setDaemon(True)
@@ -43,11 +41,10 @@
         while True:
             try:
                 we = cls.xs.read_watch()
-                watch = we[1]
-                cls.xs.acknowledge_watch(watch)
             except RuntimeError, ex:
                 print ex
                 raise
+            watch = we[1]
             watch.fn(*watch.args, **watch.kwargs)
 
     watchMain = classmethod(watchMain)
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/07watch.test
--- a/tools/xenstore/testsuite/07watch.test     Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/07watch.test     Sun Oct  9 17:52:54 2005
@@ -5,7 +5,6 @@
 2 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Check that reads don't set it off.
@@ -22,15 +21,12 @@
 2 mkdir /dir/newdir
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 2 setperm /dir/newdir 0 READ
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 2 rm /dir/newdir
 expect 1:/dir/newdir:token
 1 waitwatch
-1 ackwatch token
 1 close
 2 close
 
@@ -49,7 +45,6 @@
 read /dir/test
 expect /dir/test:token
 waitwatch
-ackwatch token
 close
 
 # watch priority test: all simultaneous
@@ -59,13 +54,10 @@
 write /dir/test contents
 expect 3:/dir/test:token3
 3 waitwatch
-3 ackwatch token3
 expect 2:/dir/test:token2
 2 waitwatch
-2 ackwatch token2
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 3 close
@@ -79,7 +71,6 @@
 2 close
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 
 # If one dies (without reading at all), the other should still get ack.
@@ -89,7 +80,6 @@
 2 close
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 
@@ -111,7 +101,6 @@
 2 unwatch /dir token2
 expect 1:/dir/test:token1
 1 waitwatch
-1 ackwatch token1
 1 close
 2 close
 
@@ -123,14 +112,12 @@
 write /dir/test contents2
 expect 1:/dir/test:token2
 1 waitwatch
-1 ackwatch token2
 
 # check we only get notified once.
 1 watch /test token
 2 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
 1 close
@@ -142,13 +129,10 @@
 2 write /test3 contents
 expect 1:/test1:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test2:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test3:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Creation of subpaths should be covered correctly.
@@ -157,10 +141,8 @@
 2 write /test/subnode/subnode contents2
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test/subnode/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
 1 close
@@ -171,7 +153,6 @@
 1 watchnoack / token2 0
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 expect 1:/:token2
 1 waitwatch
 expect 1: waitwatch failed: Connection timed out
@@ -183,7 +164,6 @@
 2 rm /test
 expect 1:/test/subnode:token
 1 waitwatch
-1 ackwatch token
 
 # Watch should not double-send after we ack, even if we did something in 
between.
 1 watch /test2 token
@@ -192,6 +172,5 @@
 1 waitwatch
 expect 1:contents2
 1 read /test2/foo
-1 ackwatch token
 expect 1: waitwatch failed: Connection timed out
 1 waitwatch
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/08transaction.test
--- a/tools/xenstore/testsuite/08transaction.test       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/08transaction.test       Sun Oct  9 17:52:54 2005
@@ -68,7 +68,6 @@
 2 commit
 expect 1:/test/dir/sub:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Rm inside transaction works like rm outside: children get notified.
@@ -78,7 +77,6 @@
 2 commit
 expect 1:/test/dir/sub:token
 1 waitwatch
-1 ackwatch token
 1 close
 
 # Multiple events from single transaction don't trigger assert
@@ -89,8 +87,6 @@
 2 commit
 expect 1:/test/1:token
 1 waitwatch
-1 ackwatch token
 expect 1:/test/2:token
 1 waitwatch
-1 ackwatch token
 1 close
diff -r ab93a9a46bd4 -r 8016551fde98 
tools/xenstore/testsuite/10domain-homedir.test
--- a/tools/xenstore/testsuite/10domain-homedir.test    Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/10domain-homedir.test    Sun Oct  9 17:52:54 2005
@@ -16,4 +16,3 @@
 write /home/foo/bar contents
 expect 1:foo/bar:token
 1 waitwatch
-1 ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98 
tools/xenstore/testsuite/11domain-watch.test
--- a/tools/xenstore/testsuite/11domain-watch.test      Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/11domain-watch.test      Sun Oct  9 17:52:54 2005
@@ -10,7 +10,6 @@
 write /test contents2
 expect 1:/test:token
 1 waitwatch
-1 ackwatch token
 1 unwatch /test token
 release 1
 1 close
@@ -25,7 +24,6 @@
 1 write /dir/test4 contents4
 expect 1:/dir/test:token
 1 waitwatch
-1 ackwatch token
 release 1
 1 close
 
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/12readonly.test  Sun Oct  9 17:52:54 2005
@@ -36,4 +36,3 @@
 1 write /test contents
 expect /test:token
 waitwatch
-ackwatch token
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/testsuite/13watch-ack.test
--- a/tools/xenstore/testsuite/13watch-ack.test Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/testsuite/13watch-ack.test Sun Oct  9 17:52:54 2005
@@ -18,5 +18,4 @@
 1 waitwatch
 3 write /test/1 contents1
 4 write /test/3 contents3
-1 ackwatch token2
 1 close
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.c   Sun Oct  9 17:52:54 2005
@@ -154,7 +154,6 @@
        case XS_READ: return "READ";
        case XS_GET_PERMS: return "GET_PERMS";
        case XS_WATCH: return "WATCH";
-       case XS_WATCH_ACK: return "WATCH_ACK";
        case XS_UNWATCH: return "UNWATCH";
        case XS_TRANSACTION_START: return "TRANSACTION_START";
        case XS_TRANSACTION_END: return "TRANSACTION_END";
@@ -1103,10 +1102,6 @@
                do_watch(conn, in);
                break;
 
-       case XS_WATCH_ACK:
-               do_watch_ack(conn, onearg(in));
-               break;
-
        case XS_UNWATCH:
                do_unwatch(conn, in);
                break;
@@ -1167,11 +1162,6 @@
        if (verbose)
                xprintf("Got message %s len %i from %p\n",
                        sockmsg_string(type), conn->in->hdr.msg.len, conn);
-
-       /* We might get a command while waiting for an ack: this means
-        * the other end discarded it: we will re-transmit. */
-       if (type != XS_WATCH_ACK)
-               conn->waiting_for_ack = NULL;
 
        /* Careful: process_message may free connection.  We detach
         * "in" beforehand and allocate the new buffer to avoid
@@ -1266,7 +1256,6 @@
 
        new->state = OK;
        new->out = new->waiting_reply = NULL;
-       new->waiting_for_ack = NULL;
        new->fd = -1;
        new->id = 0;
        new->domain = NULL;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h   Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_core.h   Sun Oct  9 17:52:54 2005
@@ -70,9 +70,6 @@
 
        /* Is this a read-only connection? */
        bool can_write;
-
-       /* Are we waiting for a watch event ack? */
-       struct watch *waiting_for_ack;
 
        /* Buffered incoming data. */
        struct buffered_data *in;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xenstored_watch.c
--- a/tools/xenstore/xenstored_watch.c  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xenstored_watch.c  Sun Oct  9 17:52:54 2005
@@ -69,18 +69,14 @@
        if (conn->waiting_reply) {
                conn->out = conn->waiting_reply;
                conn->waiting_reply = NULL;
-               conn->waiting_for_ack = NULL;
-               return;
-       }
-
-       /* If we're already waiting for ack, don't queue more. */
-       if (conn->waiting_for_ack)
-               return;
+               return;
+       }
 
        list_for_each_entry(watch, &conn->watches, list) {
                event = list_top(&watch->events, struct watch_event, list);
                if (event) {
-                       conn->waiting_for_ack = watch;
+                       list_del(&event->list);
+                       talloc_free(event);
                        send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
                        break;
                }
@@ -181,6 +177,15 @@
                }
        }
 
+       /* Check for duplicates. */
+       list_for_each_entry(watch, &conn->watches, list) {
+               if (streq(watch->node, vec[0]) &&
+                    streq(watch->token, vec[1])) {
+                       send_error(conn, EEXIST);
+                       return;
+               }
+       }
+
        watch = talloc(conn, struct watch);
        watch->node = talloc_strdup(watch, vec[0]);
        watch->token = talloc_strdup(watch, vec[1]);
@@ -200,37 +205,6 @@
        add_event(conn, watch, watch->node);
 }
 
-void do_watch_ack(struct connection *conn, const char *token)
-{
-       struct watch_event *event;
-
-       if (!token) {
-               send_error(conn, EINVAL);
-               return;
-       }
-
-       if (!conn->waiting_for_ack) {
-               send_error(conn, ENOENT);
-               return;
-       }
-
-       if (!streq(conn->waiting_for_ack->token, token)) {
-               /* They're confused: this will cause us to send event again */
-               conn->waiting_for_ack = NULL;
-               send_error(conn, EINVAL);
-               return;
-       }
-
-       /* Remove event: after ack sent, core will call queue_next_event */
-       event = list_top(&conn->waiting_for_ack->events, struct watch_event,
-                        list);
-       list_del(&event->list);
-       talloc_free(event);
-
-       conn->waiting_for_ack = NULL;
-       send_ack(conn, XS_WATCH_ACK);
-}
-
 void do_unwatch(struct connection *conn, struct buffered_data *in)
 {
        struct watch *watch;
@@ -241,9 +215,6 @@
                return;
        }
 
-       /* We don't need to worry if we're waiting for an ack for the
-        * watch we're deleting: conn->waiting_for_ack was reset by
-        * this command in consider_message anyway. */
        node = canonicalize(conn, vec[0]);
        list_for_each_entry(watch, &conn->watches, list) {
                if (streq(watch->node, node) && streq(watch->token, vec[1])) {
@@ -262,11 +233,6 @@
        struct watch *watch;
        struct watch_event *event;
 
-       if (conn->waiting_for_ack)
-               printf("    waiting_for_ack for watch on %s token %s\n",
-                      conn->waiting_for_ack->node,
-                      conn->waiting_for_ack->token);
-
        list_for_each_entry(watch, &conn->watches, list) {
                printf("    watch on %s token %s\n",
                       watch->node, watch->token);
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs.c       Sun Oct  9 17:52:54 2005
@@ -78,9 +78,29 @@
 
        /* One transaction at a time. */
        pthread_mutex_t transaction_mutex;
+       pthread_t transaction_pthread;
 };
 
 static void *read_thread(void *arg);
+
+static void request_mutex_acquire(struct xs_handle *h)
+{
+       /*
+        * We can't distinguish non-transactional from transactional
+        * requests right now. So temporarily acquire the transaction mutex
+        * if this task is outside transaction context.
+        */
+       if (h->transaction_pthread != pthread_self())
+               pthread_mutex_lock(&h->transaction_mutex);
+       pthread_mutex_lock(&h->request_mutex);
+}
+
+static void request_mutex_release(struct xs_handle *h)
+{
+       pthread_mutex_unlock(&h->request_mutex);
+       if (h->transaction_pthread != pthread_self())
+               pthread_mutex_unlock(&h->transaction_mutex);
+}
 
 int xs_fileno(struct xs_handle *h)
 {
@@ -163,6 +183,7 @@
 
        pthread_mutex_init(&h->request_mutex, NULL);
        pthread_mutex_init(&h->transaction_mutex, NULL);
+       h->transaction_pthread = -1;
 
        if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
                goto error;
@@ -316,7 +337,7 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
-       pthread_mutex_lock(&h->request_mutex);
+       request_mutex_acquire(h);
 
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
@@ -329,7 +350,7 @@
        if (!ret)
                goto fail;
 
-       pthread_mutex_unlock(&h->request_mutex);
+       request_mutex_release(h);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -350,7 +371,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
-       pthread_mutex_unlock(&h->request_mutex);
+       request_mutex_release(h);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -593,15 +614,6 @@
        return ret;
 }
 
-/* Acknowledge watch on node.  Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token)
-{
-       return xs_bool(xs_single(h, XS_WATCH_ACK, token, NULL));
-}
-
 /* Remove a watch on a node.
  * Returns false on failure (no watch on that node).
  */
@@ -624,8 +636,18 @@
  */
 bool xs_transaction_start(struct xs_handle *h)
 {
+       bool rc;
+
        pthread_mutex_lock(&h->transaction_mutex);
-       return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+       h->transaction_pthread = pthread_self();
+
+       rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
+       if (!rc) {
+               h->transaction_pthread = -1;
+               pthread_mutex_unlock(&h->transaction_mutex);
+       }
+
+       return rc;
 }
 
 /* End a transaction.
@@ -645,6 +667,7 @@
        
        rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
 
+       h->transaction_pthread = -1;
        pthread_mutex_unlock(&h->transaction_mutex);
 
        return rc;
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs.h
--- a/tools/xenstore/xs.h       Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs.h       Sun Oct  9 17:52:54 2005
@@ -96,12 +96,6 @@
  */
 char **xs_read_watch(struct xs_handle *h, unsigned int *num);
 
-/* Acknowledge watch on node.  Watches must be acknowledged before
- * any other watches can be read.
- * Returns false on failure.
- */
-bool xs_acknowledge_watch(struct xs_handle *h, const char *token);
-
 /* Remove a watch on a node: implicitly acks any outstanding watch.
  * Returns false on failure (no watch on that node).
  */
diff -r ab93a9a46bd4 -r 8016551fde98 tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c  Sun Oct  9 16:29:24 2005
+++ b/tools/xenstore/xs_test.c  Sun Oct  9 17:52:54 2005
@@ -201,7 +201,6 @@
             "  watch <path> <token>\n"
             "  watchnoack <path> <token>\n"
             "  waitwatch\n"
-            "  ackwatch <token>\n"
             "  unwatch <path> <token>\n"
             "  close\n"
             "  start <node>\n"
@@ -455,8 +454,6 @@
                    !streq(vec[XS_WATCH_PATH], node) ||
                    !streq(vec[XS_WATCH_TOKEN], token))
                        failed(handle);
-               if (!xs_acknowledge_watch(handles[handle], token))
-                       failed(handle);
        }
 }
 
@@ -513,12 +510,6 @@
        else
                output("%s:%s\n", vec[XS_WATCH_PATH], vec[XS_WATCH_TOKEN]);
        free(vec);
-}
-
-static void do_ackwatch(unsigned int handle, const char *token)
-{
-       if (!xs_acknowledge_watch(handles[handle], token))
-               failed(handle);
 }
 
 static void do_unwatch(unsigned int handle, const char *node, const char 
*token)
@@ -746,8 +737,6 @@
                do_watch(handle, arg(line, 1), arg(line, 2), false);
        else if (streq(command, "waitwatch"))
                do_waitwatch(handle);
-       else if (streq(command, "ackwatch"))
-               do_ackwatch(handle, arg(line, 1));
        else if (streq(command, "unwatch"))
                do_unwatch(handle, arg(line, 1), arg(line, 2));
        else if (streq(command, "close")) {
diff -r ab93a9a46bd4 -r 8016551fde98 xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h   Sun Oct  9 16:29:24 2005
+++ b/xen/include/public/io/xs_wire.h   Sun Oct  9 17:52:54 2005
@@ -35,11 +35,9 @@
        XS_READ,
        XS_GET_PERMS,
        XS_WATCH,
-       XS_WATCH_ACK,
        XS_UNWATCH,
        XS_TRANSACTION_START,
        XS_TRANSACTION_END,
-       XS_OP_READ_ONLY = XS_TRANSACTION_END,
        XS_INTRODUCE,
        XS_RELEASE,
        XS_GET_DOMAIN_PATH,

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.