[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] xenstored now supports multiple concurrent transactions per



# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 015f8ae8127649f0c69904fd063ca45d304d4e0c
# Parent  1ac39c7a043541cfa94655f0e9ab98d4503c29a2
xenstored now supports multiple concurrent transactions per
connection, plus interleaving of transactional and
non-transactional accesses. A transaction identifier is added
to the xsd_sockmsg header structure (0 means 'not in context
of a transaction'). The user and kernel xs interfaces accept
a pointer to a transaction handle where appropriate --
currently this is directly cast to an integer identifier in
the client library / kernel driver, but will allow for keeping
extra dynamic client-side state in future if we need to.

The transaction mutex has now gone. It's replaced with a
read-write mutex, but this is only acquired for exclusive
access during suspend/resume, to ensure there are no in-progress
transactions.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r 1ac39c7a0435 -r 015f8ae81276 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Mon Oct 10 
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_dev.c      Mon Oct 10 
14:38:01 2005
@@ -45,8 +45,14 @@
 #include <asm-xen/xen_proc.h>
 #include <asm/hypervisor.h>
 
+struct xenbus_dev_transaction {
+       struct list_head list;
+       struct xenbus_transaction *handle;
+};
+
 struct xenbus_dev_data {
-       int in_transaction;
+       /* In-progress transaction. */
+       struct list_head transactions;
 
        /* Partial request. */
        unsigned int len;
@@ -103,6 +109,7 @@
                                size_t len, loff_t *ppos)
 {
        struct xenbus_dev_data *u = filp->private_data;
+       struct xenbus_dev_transaction *trans;
        void *reply;
        int err = 0;
 
@@ -129,13 +136,24 @@
        case XS_RM:
        case XS_SET_PERMS:
                reply = xenbus_dev_request_and_reply(&u->u.msg);
-               if (IS_ERR(reply))
+               if (IS_ERR(reply)) {
                        err = PTR_ERR(reply);
-               else {
-                       if (u->u.msg.type == XS_TRANSACTION_START)
-                               u->in_transaction = 1;
-                       if (u->u.msg.type == XS_TRANSACTION_END)
-                               u->in_transaction = 0;
+               } else {
+                       if (u->u.msg.type == XS_TRANSACTION_START) {
+                               trans = kmalloc(sizeof(*trans), GFP_KERNEL);
+                               trans->handle = (struct xenbus_transaction *)
+                                       simple_strtoul(reply, NULL, 0);
+                               list_add(&trans->list, &u->transactions);
+                       } else if (u->u.msg.type == XS_TRANSACTION_END) {
+                               list_for_each_entry(trans, &u->transactions,
+                                                   list)
+                                       if ((unsigned long)trans->handle ==
+                                           (unsigned long)u->u.msg.tx_id)
+                                               break;
+                               BUG_ON(&trans->list == &u->transactions);
+                               list_del(&trans->list);
+                               kfree(trans);
+                       }
                        queue_reply(u, (char *)&u->u.msg, sizeof(u->u.msg));
                        queue_reply(u, (char *)reply, u->u.msg.len);
                        kfree(reply);
@@ -169,6 +187,7 @@
                return -ENOMEM;
 
        memset(u, 0, sizeof(*u));
+       INIT_LIST_HEAD(&u->transactions);
        init_waitqueue_head(&u->read_waitq);
 
        filp->private_data = u;
@@ -179,9 +198,13 @@
 static int xenbus_dev_release(struct inode *inode, struct file *filp)
 {
        struct xenbus_dev_data *u = filp->private_data;
-
-       if (u->in_transaction)
-               xenbus_transaction_end((struct xenbus_transaction *)1, 1);
+       struct xenbus_dev_transaction *trans, *tmp;
+
+       list_for_each_entry_safe(trans, tmp, &u->transactions, list) {
+               xenbus_transaction_end(trans->handle, 1);
+               list_del(&trans->list);
+               kfree(trans);
+       }
 
        kfree(u);
 
diff -r 1ac39c7a0435 -r 015f8ae81276 
linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c
--- a/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Mon Oct 10 
13:46:53 2005
+++ b/linux-2.6-xen-sparse/drivers/xen/xenbus/xenbus_xs.c       Mon Oct 10 
14:38:01 2005
@@ -71,38 +71,14 @@
        /* One request at a time. */
        struct semaphore request_mutex;
 
-       /* One transaction at a time. */
-       struct semaphore transaction_mutex;
-       int transaction_pid;
+       /* Protect transactions against save/restore. */
+       struct rw_semaphore suspend_mutex;
 };
 
 static struct xs_handle xs_state;
 
 static LIST_HEAD(watches);
 static DEFINE_SPINLOCK(watches_lock);
-
-/* Can wait on !xs_resuming for suspend/resume cycle to complete. */
-static int xs_resuming;
-static DECLARE_WAIT_QUEUE_HEAD(xs_resuming_waitq);
-
-static void request_mutex_acquire(void)
-{
-       /*
-        * We can't distinguish non-transactional from transactional
-        * requests right now. So temporarily acquire the transaction mutex
-        * if this task is outside transaction context.
-        */
-       if (xs_state.transaction_pid != current->pid)
-               down(&xs_state.transaction_mutex);
-       down(&xs_state.request_mutex);
-}
-
-static void request_mutex_release(void)
-{
-       up(&xs_state.request_mutex);
-       if (xs_state.transaction_pid != current->pid)
-               up(&xs_state.transaction_mutex);
-}
 
 static int get_error(const char *errorstring)
 {
@@ -152,17 +128,17 @@
 /* Emergency write. */
 void xenbus_debug_write(const char *str, unsigned int count)
 {
-       struct xsd_sockmsg msg;
+       struct xsd_sockmsg msg = { 0 };
 
        msg.type = XS_DEBUG;
        msg.len = sizeof("print") + count + 1;
 
-       request_mutex_acquire();
+       down(&xs_state.request_mutex);
        xb_write(&msg, sizeof(msg));
        xb_write("print", sizeof("print"));
        xb_write(str, count);
        xb_write("", 1);
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 }
 
 void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg)
@@ -171,12 +147,10 @@
        struct xsd_sockmsg req_msg = *msg;
        int err;
 
-       if (req_msg.type == XS_TRANSACTION_START) {
-               down(&xs_state.transaction_mutex);
-               xs_state.transaction_pid = current->pid;
-       }
-
-       request_mutex_acquire();
+       if (req_msg.type == XS_TRANSACTION_START)
+               down_read(&xs_state.suspend_mutex);
+
+       down(&xs_state.request_mutex);
 
        err = xb_write(msg, sizeof(*msg) + msg->len);
        if (err) {
@@ -186,20 +160,19 @@
                ret = read_reply(&msg->type, &msg->len);
        }
 
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 
        if ((msg->type == XS_TRANSACTION_END) ||
            ((req_msg.type == XS_TRANSACTION_START) &&
-            (msg->type == XS_ERROR))) {
-               xs_state.transaction_pid = -1;
-               up(&xs_state.transaction_mutex);
-       }
+            (msg->type == XS_ERROR)))
+               up_read(&xs_state.suspend_mutex);
 
        return ret;
 }
 
 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
-static void *xs_talkv(enum xsd_sockmsg_type type,
+static void *xs_talkv(struct xenbus_transaction *t,
+                     enum xsd_sockmsg_type type,
                      const struct kvec *iovec,
                      unsigned int num_vecs,
                      unsigned int *len)
@@ -209,12 +182,13 @@
        unsigned int i;
        int err;
 
+       msg.tx_id = (u32)(unsigned long)t;
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
                msg.len += iovec[i].iov_len;
 
-       request_mutex_acquire();
+       down(&xs_state.request_mutex);
 
        err = xb_write(&msg, sizeof(msg));
        if (err) {
@@ -225,14 +199,14 @@
        for (i = 0; i < num_vecs; i++) {
                err = xb_write(iovec[i].iov_base, iovec[i].iov_len);;
                if (err) {
-                       request_mutex_release();
+                       up(&xs_state.request_mutex);
                        return ERR_PTR(err);
                }
        }
 
        ret = read_reply(&msg.type, len);
 
-       request_mutex_release();
+       up(&xs_state.request_mutex);
 
        if (IS_ERR(ret))
                return ret;
@@ -248,14 +222,16 @@
 }
 
 /* Simplified version of xs_talkv: single message. */
-static void *xs_single(enum xsd_sockmsg_type type,
-                      const char *string, unsigned int *len)
+static void *xs_single(struct xenbus_transaction *t,
+                      enum xsd_sockmsg_type type,
+                      const char *string,
+                      unsigned int *len)
 {
        struct kvec iovec;
 
        iovec.iov_base = (void *)string;
        iovec.iov_len = strlen(string) + 1;
-       return xs_talkv(type, &iovec, 1, len);
+       return xs_talkv(t, type, &iovec, 1, len);
 }
 
 /* Many commands only need an ack, don't care what it says. */
@@ -322,7 +298,7 @@
        char *strings;
        unsigned int len;
 
-       strings = xs_single(XS_DIRECTORY, join(dir, node), &len);
+       strings = xs_single(t, XS_DIRECTORY, join(dir, node), &len);
        if (IS_ERR(strings))
                return (char **)strings;
 
@@ -352,7 +328,7 @@
 void *xenbus_read(struct xenbus_transaction *t,
                  const char *dir, const char *node, unsigned int *len)
 {
-       return xs_single(XS_READ, join(dir, node), len);
+       return xs_single(t, XS_READ, join(dir, node), len);
 }
 EXPORT_SYMBOL(xenbus_read);
 
@@ -372,7 +348,7 @@
        iovec[1].iov_base = (void *)string;
        iovec[1].iov_len = strlen(string);
 
-       return xs_error(xs_talkv(XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+       return xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
 }
 EXPORT_SYMBOL(xenbus_write);
 
@@ -380,14 +356,14 @@
 int xenbus_mkdir(struct xenbus_transaction *t,
                 const char *dir, const char *node)
 {
-       return xs_error(xs_single(XS_MKDIR, join(dir, node), NULL));
+       return xs_error(xs_single(t, XS_MKDIR, join(dir, node), NULL));
 }
 EXPORT_SYMBOL(xenbus_mkdir);
 
 /* Destroy a file or directory (directories must be empty). */
 int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
 {
-       return xs_error(xs_single(XS_RM, join(dir, node), NULL));
+       return xs_error(xs_single(t, XS_RM, join(dir, node), NULL));
 }
 EXPORT_SYMBOL(xenbus_rm);
 
@@ -396,18 +372,21 @@
  */
 struct xenbus_transaction *xenbus_transaction_start(void)
 {
-       int err;
-
-       down(&xs_state.transaction_mutex);
-       xs_state.transaction_pid = current->pid;
-
-       err = xs_error(xs_single(XS_TRANSACTION_START, "", NULL));
-       if (err) {
-               xs_state.transaction_pid = -1;
-               up(&xs_state.transaction_mutex);
-       }
-
-       return err ? ERR_PTR(err) : (struct xenbus_transaction *)1;
+       char *id_str;
+       unsigned long id;
+
+       down_read(&xs_state.suspend_mutex);
+
+       id_str = xs_single(NULL, XS_TRANSACTION_START, "", NULL);
+       if (IS_ERR(id_str)) {
+               up_read(&xs_state.suspend_mutex);
+               return (struct xenbus_transaction *)id_str;
+       }
+
+       id = simple_strtoul(id_str, NULL, 0);
+       kfree(id_str);
+
+       return (struct xenbus_transaction *)id;
 }
 EXPORT_SYMBOL(xenbus_transaction_start);
 
@@ -419,17 +398,14 @@
        char abortstr[2];
        int err;
 
-       BUG_ON(t == NULL);
-
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
 
-       err = xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL));
-
-       xs_state.transaction_pid = -1;
-       up(&xs_state.transaction_mutex);
+       err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL));
+
+       up_read(&xs_state.suspend_mutex);
 
        return err;
 }
@@ -567,7 +543,8 @@
        iov[1].iov_base = (void *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_error(xs_talkv(NULL, XS_WATCH, iov,
+                                ARRAY_SIZE(iov), NULL));
 }
 
 static int xs_unwatch(const char *path, const char *token)
@@ -579,7 +556,8 @@
        iov[1].iov_base = (char *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_error(xs_talkv(NULL, XS_UNWATCH, iov,
+                                ARRAY_SIZE(iov), NULL));
 }
 
 static struct xenbus_watch *find_watch(const char *token)
@@ -603,6 +581,8 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       down_read(&xs_state.suspend_mutex);
 
        spin_lock(&watches_lock);
        BUG_ON(find_watch(token));
@@ -617,6 +597,8 @@
                spin_unlock(&watches_lock);
        }
 
+       up_read(&xs_state.suspend_mutex);
+
        return err;
 }
 EXPORT_SYMBOL(register_xenbus_watch);
@@ -627,14 +609,13 @@
        int err;
 
        sprintf(token, "%lX", (long)watch);
+
+       down_read(&xs_state.suspend_mutex);
 
        spin_lock(&watches_lock);
        BUG_ON(!find_watch(token));
        list_del(&watch->list);
        spin_unlock(&watches_lock);
-
-       /* Ensure xs_resume() is not in progress (see comments there). */
-       wait_event(xs_resuming_waitq, !xs_resuming);
 
        err = xs_unwatch(watch->node, token);
        if (err)
@@ -642,6 +623,8 @@
                       "XENBUS Failed to release watch %s: %i\n",
                       watch->node, err);
 
+       up_read(&xs_state.suspend_mutex);
+
        /* Make sure watch is not in use. */
        flush_scheduled_work();
 }
@@ -649,58 +632,24 @@
 
 void xs_suspend(void)
 {
-       down(&xs_state.transaction_mutex);
+       down_write(&xs_state.suspend_mutex);
        down(&xs_state.request_mutex);
 }
 
 void xs_resume(void)
 {
-       struct list_head *ent, *prev_ent = &watches;
        struct xenbus_watch *watch;
        char token[sizeof(watch) * 2 + 1];
 
-       /* Protect against concurrent unregistration and freeing of watches. */
-       BUG_ON(xs_resuming);
-       xs_resuming = 1;
-
        up(&xs_state.request_mutex);
-       up(&xs_state.transaction_mutex);
-
-       /*
-        * Iterate over the watch list re-registering each node. We must
-        * be careful about concurrent registrations and unregistrations.
-        * We search for the node immediately following the previously
-        * re-registered node. If we get no match then either we are done
-        * (previous node is last in list) or the node was unregistered, in
-        * which case we restart from the beginning of the list.
-        * register_xenbus_watch() + unregister_xenbus_watch() is safe because
-        * it will only ever move a watch node earlier in the list, so it
-        * cannot cause us to skip nodes.
-        */
-       for (;;) {
-               spin_lock(&watches_lock);
-               list_for_each(ent, &watches)
-                       if (ent->prev == prev_ent)
-                               break;
-               spin_unlock(&watches_lock);
-
-               /* No match because prev_ent is at the end of the list? */
-               if ((ent == &watches) && (watches.prev == prev_ent))
-                        break; /* We're done! */
-
-               if ((prev_ent = ent) != &watches) {
-                       /*
-                        * Safe even with watch_lock not held. We are saved by
-                        * (xs_resumed==1) check in unregister_xenbus_watch.
-                        */
-                       watch = list_entry(ent, struct xenbus_watch, list);
-                       sprintf(token, "%lX", (long)watch);
-                       xs_watch(watch->node, token);
-               }
-       }
-
-       xs_resuming = 0;
-       wake_up(&xs_resuming_waitq);
+
+       /* No need for watches_lock: the suspend_mutex is sufficient. */
+       list_for_each_entry(watch, &watches, list) {
+               sprintf(token, "%lX", (long)watch);
+               xs_watch(watch->node, token);
+       }
+
+       up_write(&xs_state.suspend_mutex);
 }
 
 static void xenbus_fire_watch(void *arg)
@@ -801,8 +750,7 @@
        init_waitqueue_head(&xs_state.reply_waitq);
 
        init_MUTEX(&xs_state.request_mutex);
-       init_MUTEX(&xs_state.transaction_mutex);
-       xs_state.transaction_pid = -1;
+       init_rwsem(&xs_state.suspend_mutex);
 
        /* Initialize the shared memory rings to talk to xenstored */
        err = xb_init_comms();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/utils.h
--- a/tools/xenstore/utils.h    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/utils.h    Mon Oct 10 14:38:01 2005
@@ -55,4 +55,34 @@
 #define dprintf(_fmt, _args...) ((void)0)
 #endif
 
+/*
+ * Mux errno values onto returned pointers.
+ */
+
+static inline void *ERR_PTR(long error)
+{
+       return (void *)error;
+}
+
+static inline long PTR_ERR(const void *ptr)
+{
+       return (long)ptr;
+}
+
+static inline long IS_ERR(const void *ptr)
+{
+       return ((unsigned long)ptr > (unsigned long)-1000L);
+}
+
+
 #endif /* _UTILS_H */
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstore_client.c
--- a/tools/xenstore/xenstore_client.c  Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstore_client.c  Mon Oct 10 14:38:01 2005
@@ -34,14 +34,10 @@
     struct xs_handle *xsh;
     struct xs_transaction_handle *xth;
     bool success;
-    int ret = 0;
+    int ret = 0, socket = 0;
 #if defined(CLIENT_read) || defined(CLIENT_list)
     int prefix = 0;
 #endif
-
-    xsh = xs_domain_open();
-    if (xsh == NULL)
-       err(1, "xs_domain_open");
 
     while (1) {
        int c, index = 0;
@@ -50,10 +46,11 @@
 #if defined(CLIENT_read) || defined(CLIENT_list)
            {"prefix", 0, 0, 'p'},
 #endif
+            {"socket", 0, 0, 's'},
            {0, 0, 0, 0}
        };
 
-       c = getopt_long(argc, argv, "h"
+       c = getopt_long(argc, argv, "hs"
 #if defined(CLIENT_read) || defined(CLIENT_list)
                        "p"
 #endif
@@ -65,6 +62,9 @@
        case 'h':
            usage(argv[0]);
            /* NOTREACHED */
+        case 's':
+            socket = 1;
+            break;
 #if defined(CLIENT_read) || defined(CLIENT_list)
        case 'p':
            prefix = 1;
@@ -83,6 +83,10 @@
        /* NOTREACHED */
     }
 #endif
+
+    xsh = socket ? xs_daemon_open() : xs_domain_open();
+    if (xsh == NULL)
+       err(1, socket ? "xs_daemon_open" : "xs_domain_open");
 
   again:
     xth = xs_transaction_start(xsh);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.c   Mon Oct 10 14:38:01 2005
@@ -238,46 +238,47 @@
 static bool write_messages(struct connection *conn)
 {
        int ret;
-       struct buffered_data *out, *tmp;
-
-       list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
-               if (out->inhdr) {
-                       if (verbose)
-                               xprintf("Writing msg %s (%s) out to %p\n",
-                                       sockmsg_string(out->hdr.msg.type),
-                                       out->buffer, conn);
-                       ret = conn->write(conn, out->hdr.raw + out->used,
-                                         sizeof(out->hdr) - out->used);
-                       if (ret < 0)
-                               return false;
-
-                       out->used += ret;
-                       if (out->used < sizeof(out->hdr))
-                               return true;
-
-                       out->inhdr = false;
-                       out->used = 0;
-
-                       /* Second write might block if non-zero. */
-                       if (out->hdr.msg.len && !conn->domain)
-                               return true;
-               }
-
-               ret = conn->write(conn, out->buffer + out->used,
-                                 out->hdr.msg.len - out->used);
-
+       struct buffered_data *out;
+
+       out = list_top(&conn->out_list, struct buffered_data, list);
+       if (out == NULL)
+               return true;
+
+       if (out->inhdr) {
+               if (verbose)
+                       xprintf("Writing msg %s (%s) out to %p\n",
+                               sockmsg_string(out->hdr.msg.type),
+                               out->buffer, conn);
+               ret = conn->write(conn, out->hdr.raw + out->used,
+                                 sizeof(out->hdr) - out->used);
                if (ret < 0)
                        return false;
 
                out->used += ret;
-               if (out->used != out->hdr.msg.len)
+               if (out->used < sizeof(out->hdr))
                        return true;
 
-               trace_io(conn, "OUT", out);
-
-               list_del(&out->list);
-               talloc_free(out);
-       }
+               out->inhdr = false;
+               out->used = 0;
+
+               /* Second write might block if non-zero. */
+               if (out->hdr.msg.len && !conn->domain)
+                       return true;
+       }
+
+       ret = conn->write(conn, out->buffer + out->used,
+                         out->hdr.msg.len - out->used);
+       if (ret < 0)
+               return false;
+
+       out->used += ret;
+       if (out->used != out->hdr.msg.len)
+               return true;
+
+       trace_io(conn, "OUT", out);
+
+       list_del(&out->list);
+       talloc_free(out);
 
        return true;
 }
@@ -1042,6 +1043,17 @@
  */
 static void process_message(struct connection *conn, struct buffered_data *in)
 {
+       struct transaction *trans;
+
+       trans = transaction_lookup(conn, in->hdr.msg.tx_id);
+       if (IS_ERR(trans)) {
+               send_error(conn, -PTR_ERR(trans));
+               return;
+       }
+
+       assert(conn->transaction == NULL);
+       conn->transaction = trans;
+
        switch (in->hdr.msg.type) {
        case XS_DIRECTORY:
                send_directory(conn, onearg(in));
@@ -1116,11 +1128,13 @@
                do_get_domain_path(conn, onearg(in));
                break;
 
-       case XS_WATCH_EVENT:
        default:
                eprintf("Client unknown operation %i", in->hdr.msg.type);
                send_error(conn, ENOSYS);
-       }
+               break;
+       }
+
+       conn->transaction = NULL;
 }
 
 static int out_of_mem(void *data)
@@ -1239,15 +1253,14 @@
        if (!new)
                return NULL;
 
+       memset(new, 0, sizeof(*new));
        new->fd = -1;
-       new->id = 0;
-       new->domain = NULL;
-       new->transaction = NULL;
        new->write = write;
        new->read = read;
        new->can_write = true;
        INIT_LIST_HEAD(&new->out_list);
        INIT_LIST_HEAD(&new->watches);
+       INIT_LIST_HEAD(&new->transaction_list);
 
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
        if (setjmp(talloc_fail)) {
@@ -1410,6 +1423,7 @@
 
 
 static struct option options[] = {
+       { "no-domain-init", 0, NULL, 'D' },
        { "pid-file", 1, NULL, 'F' },
        { "no-fork", 0, NULL, 'N' },
        { "output-pid", 0, NULL, 'P' },
@@ -1424,11 +1438,15 @@
        fd_set inset, outset;
        bool dofork = true;
        bool outputpid = false;
+       bool no_domain_init = false;
        const char *pidfile = NULL;
 
-       while ((opt = getopt_long(argc, argv, "F:NPT:V", options,
+       while ((opt = getopt_long(argc, argv, "DF:NPT:V", options,
                                  NULL)) != -1) {
                switch (opt) {
+               case 'D':
+                       no_domain_init = true;
+                       break;
                case 'F':
                        pidfile = optarg;
                        break;
@@ -1501,7 +1519,8 @@
        setup_structure();
 
        /* Listen to hypervisor. */
-       event_fd = domain_init();
+       if (!no_domain_init)
+               event_fd = domain_init();
 
        /* Restore existing connections. */
        restore_existing_connections();
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h   Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_core.h   Mon Oct 10 14:38:01 2005
@@ -71,8 +71,12 @@
        /* Buffered output data */
        struct list_head out_list;
 
-       /* My transaction, if any. */
+       /* Transaction context for current request (NULL if none). */
        struct transaction *transaction;
+
+       /* List of in-progress transactions. */
+       struct list_head transaction_list;
+       u32 next_transaction_id;
 
        /* The domain I'm associated with, if any. */
        struct domain *domain;
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.c
--- a/tools/xenstore/xenstored_transaction.c    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.c    Mon Oct 10 14:38:01 2005
@@ -37,7 +37,7 @@
 
 struct changed_node
 {
-       /* The list within this transaction. */
+       /* List of all changed nodes in the context of this transaction. */
        struct list_head list;
 
        /* The name of the node. */
@@ -49,14 +49,14 @@
 
 struct transaction
 {
-       /* Global list of transactions. */
+       /* List of all transactions active on this connection. */
        struct list_head list;
+
+       /* Connection-local identifier for this transaction. */
+       u32 id;
 
        /* Generation when transaction started. */
        unsigned int generation;
-
-       /* My owner (conn->transaction == me). */
-       struct connection *conn;
 
        /* TDB to work on, and filename */
        TDB_CONTEXT *tdb;
@@ -65,7 +65,7 @@
        /* List of changed nodes. */
        struct list_head changes;
 };
-static LIST_HEAD(transactions);
+
 static unsigned int generation;
 
 /* Return tdb context to use for this connection. */
@@ -100,7 +100,6 @@
 {
        struct transaction *trans = _transaction;
 
-       list_del(&trans->list);
        trace_destroy(trans, "transaction");
        if (trans->tdb)
                tdb_close(trans->tdb);
@@ -108,10 +107,26 @@
        return 0;
 }
 
+struct transaction *transaction_lookup(struct connection *conn, u32 id)
+{
+       struct transaction *trans;
+
+       if (id == 0)
+               return NULL;
+
+       list_for_each_entry(trans, &conn->transaction_list, list)
+               if (trans->id == id)
+                       return trans;
+
+       return ERR_PTR(-ENOENT);
+}
+
 void do_transaction_start(struct connection *conn, struct buffered_data *in)
 {
-       struct transaction *trans;
-
+       struct transaction *trans, *exists;
+       char id_str[20];
+
+       /* We don't support nested transactions. */
        if (conn->transaction) {
                send_error(conn, EBUSY);
                return;
@@ -120,7 +135,6 @@
        /* Attach transaction to input for autofree until it's complete */
        trans = talloc(in, struct transaction);
        INIT_LIST_HEAD(&trans->changes);
-       trans->conn = conn;
        trans->generation = generation;
        trans->tdb_name = talloc_asprintf(trans, "%s.%p",
                                          xs_daemon_tdb(), trans);
@@ -132,11 +146,19 @@
        /* Make it close if we go away. */
        talloc_steal(trans, trans->tdb);
 
+       /* Pick an unused transaction identifier. */
+       do {
+               trans->id = conn->next_transaction_id;
+               exists = transaction_lookup(conn, conn->next_transaction_id++);
+       } while (!IS_ERR(exists));
+
        /* Now we own it. */
-       conn->transaction = talloc_steal(conn, trans);
-       list_add_tail(&trans->list, &transactions);
+       list_add_tail(&trans->list, &conn->transaction_list);
+       talloc_steal(conn, trans);
        talloc_set_destructor(trans, destroy_transaction);
-       send_ack(conn, XS_TRANSACTION_START);
+
+       sprintf(id_str, "%u", trans->id);
+       send_reply(conn, XS_TRANSACTION_START, id_str, strlen(id_str)+1);
 }
 
 void do_transaction_end(struct connection *conn, const char *arg)
@@ -149,13 +171,13 @@
                return;
        }
 
-       if (!conn->transaction) {
+       if ((trans = conn->transaction) == NULL) {
                send_error(conn, ENOENT);
                return;
        }
 
-       trans = conn->transaction;
        conn->transaction = NULL;
+       list_del(&trans->list);
 
        /* Attach transaction to arg for auto-cleanup */
        talloc_steal(arg, trans);
@@ -181,3 +203,12 @@
        send_ack(conn, XS_TRANSACTION_END);
 }
 
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xenstored_transaction.h
--- a/tools/xenstore/xenstored_transaction.h    Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xenstored_transaction.h    Mon Oct 10 14:38:01 2005
@@ -25,10 +25,11 @@
 void do_transaction_start(struct connection *conn, struct buffered_data *node);
 void do_transaction_end(struct connection *conn, const char *arg);
 
-bool transaction_block(struct connection *conn);
+struct transaction *transaction_lookup(struct connection *conn, u32 id);
 
 /* This node was changed: can fail and longjmp. */
-void add_change_node(struct transaction *trans, const char *node, bool 
recurse);
+void add_change_node(struct transaction *trans, const char *node,
+                     bool recurse);
 
 /* Return tdb context to use for this connection. */
 TDB_CONTEXT *tdb_transaction_context(struct transaction *trans);
diff -r 1ac39c7a0435 -r 015f8ae81276 tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Mon Oct 10 13:46:53 2005
+++ b/tools/xenstore/xs.c       Mon Oct 10 14:38:01 2005
@@ -75,36 +75,9 @@
 
        /* One request at a time. */
        pthread_mutex_t request_mutex;
-
-       /* One transaction at a time. */
-       pthread_mutex_t transaction_mutex;
-       pthread_t transaction_pthread;
 };
 
-struct xs_transaction_handle {
-       int id;
-};
-
 static void *read_thread(void *arg);
-
-static void request_mutex_acquire(struct xs_handle *h)
-{
-       /*
-        * We can't distinguish non-transactional from transactional
-        * requests right now. So temporarily acquire the transaction mutex
-        * if this task is outside transaction context.
-        */
-       if (h->transaction_pthread != pthread_self())
-               pthread_mutex_lock(&h->transaction_mutex);
-       pthread_mutex_lock(&h->request_mutex);
-}
-
-static void request_mutex_release(struct xs_handle *h)
-{
-       pthread_mutex_unlock(&h->request_mutex);
-       if (h->transaction_pthread != pthread_self())
-               pthread_mutex_unlock(&h->transaction_mutex);
-}
 
 int xs_fileno(struct xs_handle *h)
 {
@@ -186,8 +159,6 @@
        pthread_cond_init(&h->reply_condvar, NULL);
 
        pthread_mutex_init(&h->request_mutex, NULL);
-       pthread_mutex_init(&h->transaction_mutex, NULL);
-       h->transaction_pthread = -1;
 
        if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
                goto error;
@@ -223,7 +194,6 @@
 {
        struct xs_stored_msg *msg, *tmsg;
 
-       pthread_mutex_lock(&h->transaction_mutex);
        pthread_mutex_lock(&h->request_mutex);
        pthread_mutex_lock(&h->reply_mutex);
        pthread_mutex_lock(&h->watch_mutex);
@@ -242,7 +212,6 @@
                free(msg);
        }
 
-       pthread_mutex_unlock(&h->transaction_mutex);
        pthread_mutex_unlock(&h->request_mutex);
        pthread_mutex_unlock(&h->reply_mutex);
        pthread_mutex_unlock(&h->watch_mutex);
@@ -321,8 +290,10 @@
 }
 
 /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
-static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
-                     const struct iovec *iovec, unsigned int num_vecs,
+static void *xs_talkv(struct xs_handle *h, struct xs_transaction_handle *t,
+                     enum xsd_sockmsg_type type,
+                     const struct iovec *iovec,
+                     unsigned int num_vecs,
                      unsigned int *len)
 {
        struct xsd_sockmsg msg;
@@ -331,6 +302,7 @@
        unsigned int i;
        struct sigaction ignorepipe, oldact;
 
+       msg.tx_id = (u32)(unsigned long)t;
        msg.type = type;
        msg.len = 0;
        for (i = 0; i < num_vecs; i++)
@@ -341,7 +313,7 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
-       request_mutex_acquire(h);
+       pthread_mutex_lock(&h->request_mutex);
 
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
@@ -354,7 +326,7 @@
        if (!ret)
                goto fail;
 
-       request_mutex_release(h);
+       pthread_mutex_unlock(&h->request_mutex);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -375,7 +347,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
-       request_mutex_release(h);
+       pthread_mutex_unlock(&h->request_mutex);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -393,14 +365,16 @@
 }
 
 /* Simplified version of xs_talkv: single message. */
-static void *xs_single(struct xs_handle *h, enum xsd_sockmsg_type type,
-                      const char *string, unsigned int *len)
+static void *xs_single(struct xs_handle *h, struct xs_transaction_handle *t,
+                      enum xsd_sockmsg_type type,
+                      const char *string,
+                      unsigned int *len)
 {
        struct iovec iovec;
 
        iovec.iov_base = (void *)string;
        iovec.iov_len = strlen(string) + 1;
-       return xs_talkv(h, type, &iovec, 1, len);
+       return xs_talkv(h, t, type, &iovec, 1, len);
 }
 
 static bool xs_bool(char *reply)
@@ -417,7 +391,7 @@
        char *strings, *p, **ret;
        unsigned int len;
 
-       strings = xs_single(h, XS_DIRECTORY, path, &len);
+       strings = xs_single(h, t, XS_DIRECTORY, path, &len);
        if (!strings)
                return NULL;
 
@@ -446,7 +420,7 @@
 void *xs_read(struct xs_handle *h, struct xs_transaction_handle *t,
              const char *path, unsigned int *len)
 {
-       return xs_single(h, XS_READ, path, len);
+       return xs_single(h, t, XS_READ, path, len);
 }
 
 /* Write the value of a single file.
@@ -462,7 +436,8 @@
        iovec[1].iov_base = (void *)data;
        iovec[1].iov_len = len;
 
-       return xs_bool(xs_talkv(h, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL));
+       return xs_bool(xs_talkv(h, t, XS_WRITE, iovec,
+                               ARRAY_SIZE(iovec), NULL));
 }
 
 /* Create a new directory.
@@ -471,7 +446,7 @@
 bool xs_mkdir(struct xs_handle *h, struct xs_transaction_handle *t,
              const char *path)
 {
-       return xs_bool(xs_single(h, XS_MKDIR, path, NULL));
+       return xs_bool(xs_single(h, t, XS_MKDIR, path, NULL));
 }
 
 /* Destroy a file or directory (directories must be empty).
@@ -480,7 +455,7 @@
 bool xs_rm(struct xs_handle *h, struct xs_transaction_handle *t,
           const char *path)
 {
-       return xs_bool(xs_single(h, XS_RM, path, NULL));
+       return xs_bool(xs_single(h, t, XS_RM, path, NULL));
 }
 
 /* Get permissions of node (first element is owner).
@@ -494,7 +469,7 @@
        unsigned int len;
        struct xs_permissions *ret;
 
-       strings = xs_single(h, XS_GET_PERMS, path, &len);
+       strings = xs_single(h, t, XS_GET_PERMS, path, &len);
        if (!strings)
                return NULL;
 
@@ -544,7 +519,7 @@
                        goto unwind;
        }
 
-       if (!xs_bool(xs_talkv(h, XS_SET_PERMS, iov, 1+num_perms, NULL)))
+       if (!xs_bool(xs_talkv(h, t, XS_SET_PERMS, iov, 1+num_perms, NULL)))
                goto unwind;
        for (i = 0; i < num_perms; i++)
                free(iov[i+1].iov_base);
@@ -571,7 +546,8 @@
        iov[1].iov_base = (void *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_bool(xs_talkv(h, XS_WATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_WATCH, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 /* Find out what node change was on (will block if nothing pending).
@@ -637,7 +613,8 @@
        iov[1].iov_base = (char *)token;
        iov[1].iov_len = strlen(token) + 1;
 
-       return xs_bool(xs_talkv(h, XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_UNWATCH, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 /* Start a transaction: changes by others will not be seen during this
@@ -647,18 +624,17 @@
  */
 struct xs_transaction_handle *xs_transaction_start(struct xs_handle *h)
 {
-       bool rc;
-
-       pthread_mutex_lock(&h->transaction_mutex);
-       h->transaction_pthread = pthread_self();
-
-       rc = xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
-       if (!rc) {
-               h->transaction_pthread = -1;
-               pthread_mutex_unlock(&h->transaction_mutex);
-       }
-
-       return (struct xs_transaction_handle *)rc;
+       char *id_str;
+       unsigned long id;
+
+       id_str = xs_single(h, NULL, XS_TRANSACTION_START, "", NULL);
+       if (id_str == NULL)
+               return NULL;
+
+       id = strtoul(id_str, NULL, 0);
+       free(id_str);
+
+       return (struct xs_transaction_handle *)id;
 }
 
 /* End a transaction.
@@ -670,22 +646,13 @@
                        bool abort)
 {
        char abortstr[2];
-       bool rc;
-
-       if (t == NULL)
-               return -EINVAL;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
        
-       rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
-
-       h->transaction_pthread = -1;
-       pthread_mutex_unlock(&h->transaction_mutex);
-
-       return rc;
+       return xs_bool(xs_single(h, t, XS_TRANSACTION_END, abortstr, NULL));
 }
 
 /* Introduce a new domain.
@@ -713,7 +680,8 @@
        iov[3].iov_base = (char *)path;
        iov[3].iov_len = strlen(path) + 1;
 
-       return xs_bool(xs_talkv(h, XS_INTRODUCE, iov, ARRAY_SIZE(iov), NULL));
+       return xs_bool(xs_talkv(h, NULL, XS_INTRODUCE, iov,
+                               ARRAY_SIZE(iov), NULL));
 }
 
 bool xs_release_domain(struct xs_handle *h, domid_t domid)
@@ -722,7 +690,7 @@
 
        sprintf(domid_str, "%u", domid);
 
-       return xs_bool(xs_single(h, XS_RELEASE, domid_str, NULL));
+       return xs_bool(xs_single(h, NULL, XS_RELEASE, domid_str, NULL));
 }
 
 char *xs_get_domain_path(struct xs_handle *h, domid_t domid)
@@ -731,7 +699,7 @@
 
        sprintf(domid_str, "%u", domid);
 
-       return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
+       return xs_single(h, NULL, XS_GET_DOMAIN_PATH, domid_str, NULL);
 }
 
 /* Only useful for DEBUG versions */
@@ -745,7 +713,8 @@
        iov[1].iov_base = data;
        iov[1].iov_len = len;
 
-       return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
+       return xs_talkv(h, NULL, XS_DEBUG, iov,
+                       ARRAY_SIZE(iov), NULL);
 }
 
 static void *read_thread(void *arg)

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.