[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Simplify reply logic in xenstored. Maintain a linked list



# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID b67873a9e3acd8cee39ed68ea3924add7f7a26df
# Parent  f1e8d5f641057018677e7282c3358835a749bec8
Simplify reply logic in xenstored. Maintain a linked list
of pending replies that are sent out in order.

Currently we only read new requests when the reply list is
empty. In fact there is no good reason for this restriction.
Another interesting point is that (on my test machine)
hotplug blk setup fails if xenstored_client connects to
xenstored via the unix domain socket rather than through the
kernel --- this points to some user/kernel races that are
'fixed' by the extra serialisation of the in-kernel mutexes.
It definitely needs looking into.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r f1e8d5f64105 -r b67873a9e3ac tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Sun Oct  9 20:29:10 2005
+++ b/tools/xenstore/xenstored_core.c   Sun Oct  9 22:53:03 2005
@@ -235,52 +235,50 @@
        talloc_free(str);
 }
 
-static bool write_message(struct connection *conn)
+static bool write_messages(struct connection *conn)
 {
        int ret;
-       struct buffered_data *out = conn->out;
-
-       if (out->inhdr) {
-               if (verbose)
-                       xprintf("Writing msg %s (%s) out to %p\n",
-                               sockmsg_string(out->hdr.msg.type),
-                               out->buffer, conn);
-               ret = conn->write(conn, out->hdr.raw + out->used,
-                                 sizeof(out->hdr) - out->used);
+       struct buffered_data *out, *tmp;
+
+       list_for_each_entry_safe(out, tmp, &conn->out_list, list) {
+               if (out->inhdr) {
+                       if (verbose)
+                               xprintf("Writing msg %s (%s) out to %p\n",
+                                       sockmsg_string(out->hdr.msg.type),
+                                       out->buffer, conn);
+                       ret = conn->write(conn, out->hdr.raw + out->used,
+                                         sizeof(out->hdr) - out->used);
+                       if (ret < 0)
+                               return false;
+
+                       out->used += ret;
+                       if (out->used < sizeof(out->hdr))
+                               return true;
+
+                       out->inhdr = false;
+                       out->used = 0;
+
+                       /* Second write might block if non-zero. */
+                       if (out->hdr.msg.len && !conn->domain)
+                               return true;
+               }
+
+               ret = conn->write(conn, out->buffer + out->used,
+                                 out->hdr.msg.len - out->used);
+
                if (ret < 0)
                        return false;
 
                out->used += ret;
-               if (out->used < sizeof(out->hdr))
+               if (out->used != out->hdr.msg.len)
                        return true;
 
-               out->inhdr = false;
-               out->used = 0;
-
-               /* Second write might block if non-zero. */
-               if (out->hdr.msg.len && !conn->domain)
-                       return true;
-       }
-
-       ret = conn->write(conn, out->buffer + out->used,
-                         out->hdr.msg.len - out->used);
-
-       if (ret < 0)
-               return false;
-
-       out->used += ret;
-       if (out->used != out->hdr.msg.len)
-               return true;
-
-       trace_io(conn, "OUT", out);
-       conn->out = NULL;
-       talloc_free(out);
-
-       queue_next_event(conn);
-
-       /* No longer busy? */
-       if (!conn->out)
-               conn->state = OK;
+               trace_io(conn, "OUT", out);
+
+               list_del(&out->list);
+               talloc_free(out);
+       }
+
        return true;
 }
 
@@ -297,9 +295,9 @@
                FD_SET(conn->fd, &set);
                none.tv_sec = none.tv_usec = 0;
 
-               while (conn->out
+               while (!list_empty(&conn->out_list)
                       && select(conn->fd+1, NULL, &set, NULL, &none) == 1)
-                       if (!write_message(conn))
+                       if (!write_messages(conn))
                                break;
                close(conn->fd);
        }
@@ -326,9 +324,9 @@
        list_for_each_entry(i, &connections, list) {
                if (i->domain)
                        continue;
-               if (i->state == OK)
+               if (list_empty(&i->out_list))
                        FD_SET(i->fd, inset);
-               if (i->out)
+               if (!list_empty(&i->out_list))
                        FD_SET(i->fd, outset);
                if (i->fd > max)
                        max = i->fd;
@@ -594,14 +592,7 @@
        bdata->hdr.msg.len = len;
        memcpy(bdata->buffer, data, len);
 
-       /* There might be an event going out now.  Queue behind it. */
-       if (conn->out) {
-               assert(conn->out->hdr.msg.type == XS_WATCH_EVENT);
-               assert(!conn->waiting_reply);
-               conn->waiting_reply = bdata;
-       } else
-               conn->out = bdata;
-       conn->state = BUSY;
+       list_add_tail(&bdata->list, &conn->out_list);
 }
 
 /* Some routines (write, mkdir, etc) just need a non-error return */
@@ -1148,8 +1139,6 @@
        enum xsd_sockmsg_type volatile type = conn->in->hdr.msg.type;
        jmp_buf talloc_fail;
 
-       assert(conn->state == OK);
-
        /* For simplicity, we kill the connection on OOM. */
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
        if (setjmp(talloc_fail)) {
@@ -1186,10 +1175,7 @@
 static void handle_input(struct connection *conn)
 {
        int bytes;
-       struct buffered_data *in;
-
-       assert(conn->state == OK);
-       in = conn->in;
+       struct buffered_data *in = conn->in;
 
        /* Not finished header yet? */
        if (in->inhdr) {
@@ -1237,7 +1223,7 @@
 
 static void handle_output(struct connection *conn)
 {
-       if (!write_message(conn))
+       if (!write_messages(conn))
                talloc_free(conn);
 }
 
@@ -1254,8 +1240,6 @@
        if (!new)
                return NULL;
 
-       new->state = OK;
-       new->out = new->waiting_reply = NULL;
        new->fd = -1;
        new->id = 0;
        new->domain = NULL;
@@ -1263,6 +1247,7 @@
        new->write = write;
        new->read = read;
        new->can_write = true;
+       INIT_LIST_HEAD(&new->out_list);
        INIT_LIST_HEAD(&new->watches);
 
        talloc_set_fail_handler(out_of_mem, &talloc_fail);
@@ -1317,23 +1302,17 @@
        list_for_each_entry(i, &connections, list) {
                printf("Connection %p:\n", i);
                printf("    state = %s\n",
-                      i->state == OK ? "OK"
-                      : i->state == BUSY ? "BUSY"
-                      : "INVALID");
+                      list_empty(&i->out_list) ? "OK" : "BUSY");
                if (i->id)
                        printf("    id = %i\n", i->id);
                if (!i->in->inhdr || i->in->used)
                        printf("    got %i bytes of %s\n",
                               i->in->used, i->in->inhdr ? "header" : "data");
+#if 0
                if (i->out)
                        printf("    sending message %s (%s) out\n",
                               sockmsg_string(i->out->hdr.msg.type),
                               i->out->buffer);
-               if (i->waiting_reply)
-                       printf("    ... and behind is queued %s (%s)\n",
-                              sockmsg_string(i->waiting_reply->hdr.msg.type),
-                              i->waiting_reply->buffer);
-#if 0
                if (i->transaction)
                        dump_transaction(i);
                if (i->domain)
@@ -1604,3 +1583,13 @@
                max = initialize_set(&inset, &outset, *sock, *ro_sock);
        }
 }
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r f1e8d5f64105 -r b67873a9e3ac tools/xenstore/xenstored_core.h
--- a/tools/xenstore/xenstored_core.h   Sun Oct  9 20:29:10 2005
+++ b/tools/xenstore/xenstored_core.h   Sun Oct  9 22:53:03 2005
@@ -31,14 +31,19 @@
 
 struct buffered_data
 {
+       struct list_head list;
+
        /* Are we still doing the header? */
        bool inhdr;
+
        /* How far are we? */
        unsigned int used;
+
        union {
                struct xsd_sockmsg msg;
                char raw[sizeof(struct xsd_sockmsg)];
        } hdr;
+
        /* The actual data. */
        char *buffer;
 };
@@ -47,14 +52,6 @@
 typedef int connwritefn_t(struct connection *, const void *, unsigned int);
 typedef int connreadfn_t(struct connection *, void *, unsigned int);
 
-enum state
-{
-       /* Doing action, not listening */
-       BUSY,
-       /* Completed */
-       OK,
-};
-
 struct connection
 {
        struct list_head list;
@@ -62,11 +59,8 @@
        /* The file descriptor we came in on. */
        int fd;
 
-       /* Who am I?  0 for socket connections. */
+       /* Who am I? 0 for socket connections. */
        domid_t id;
-
-       /* Blocked on transaction?  Busy? */
-       enum state state;
 
        /* Is this a read-only connection? */
        bool can_write;
@@ -75,10 +69,7 @@
        struct buffered_data *in;
 
        /* Buffered output data */
-       struct buffered_data *out;
-
-       /* If we had a watch fire outgoing when we needed to reply... */
-       struct buffered_data *waiting_reply;
+       struct list_head out_list;
 
        /* My transaction, if any. */
        struct transaction *transaction;
@@ -172,3 +163,13 @@
 extern int event_fd;
 
 #endif /* _XENSTORED_CORE_H */
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r f1e8d5f64105 -r b67873a9e3ac tools/xenstore/xenstored_domain.c
--- a/tools/xenstore/xenstored_domain.c Sun Oct  9 20:29:10 2005
+++ b/tools/xenstore/xenstored_domain.c Sun Oct  9 22:53:03 2005
@@ -276,12 +276,14 @@
 
 bool domain_can_read(struct connection *conn)
 {
-       return conn->state == OK && buffer_has_input(conn->domain->input);
+       return (list_empty(&conn->out_list) &&
+                buffer_has_input(conn->domain->input));
 }
 
 bool domain_can_write(struct connection *conn)
 {
-       return conn->out && buffer_has_output_room(conn->domain->output);
+       return (!list_empty(&conn->out_list) &&
+                buffer_has_output_room(conn->domain->output));
 }
 
 static struct domain *new_domain(void *context, domid_t domid,
diff -r f1e8d5f64105 -r b67873a9e3ac tools/xenstore/xenstored_watch.c
--- a/tools/xenstore/xenstored_watch.c  Sun Oct  9 20:29:10 2005
+++ b/tools/xenstore/xenstored_watch.c  Sun Oct  9 22:53:03 2005
@@ -32,17 +32,6 @@
 #include "xenstored_test.h"
 #include "xenstored_domain.h"
 
-/* FIXME: time out unacked watches. */
-struct watch_event
-{
-       /* The events on this watch. */
-       struct list_head list;
-
-       /* Data to send (node\0token\0). */
-       unsigned int len;
-       char *data;
-};
-
 struct watch
 {
        /* Watches on this connection */
@@ -58,50 +47,17 @@
        char *node;
 };
 
-/* Look through our watches: if any of them have an event, queue it. */
-void queue_next_event(struct connection *conn)
-{
-       struct watch_event *event;
-       struct watch *watch;
-
-       /* We had a reply queued already?  Send it: other end will
-        * discard watch. */
-       if (conn->waiting_reply) {
-               conn->out = conn->waiting_reply;
-               conn->waiting_reply = NULL;
-               return;
-       }
-
-       list_for_each_entry(watch, &conn->watches, list) {
-               event = list_top(&watch->events, struct watch_event, list);
-               if (event) {
-                       list_del(&event->list);
-                       talloc_free(event);
-                       send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
-                       break;
-               }
-       }
-}
-
-static int destroy_watch_event(void *_event)
-{
-       struct watch_event *event = _event;
-
-       trace_destroy(event, "watch_event");
-       return 0;
-}
-
 static void add_event(struct connection *conn,
                      struct watch *watch,
                      const char *name)
 {
-       struct watch_event *event;
+       /* Data to send (node\0token\0). */
+       unsigned int len;
+       char *data;
 
        if (!check_event_node(name)) {
                /* Can this conn load node, or see that it doesn't exist? */
-               struct node *node;
-
-               node = get_node(conn, name, XS_PERM_READ);
+               struct node *node = get_node(conn, name, XS_PERM_READ);
                if (!node && errno != ENOENT)
                        return;
        }
@@ -112,14 +68,12 @@
                        name++;
        }
 
-       event = talloc(watch, struct watch_event);
-       event->len = strlen(name) + 1 + strlen(watch->token) + 1;
-       event->data = talloc_array(event, char, event->len);
-       strcpy(event->data, name);
-       strcpy(event->data + strlen(name) + 1, watch->token);
-       talloc_set_destructor(event, destroy_watch_event);
-       list_add_tail(&event->list, &watch->events);
-       trace_create(event, "watch_event");
+       len = strlen(name) + 1 + strlen(watch->token) + 1;
+       data = talloc_array(watch, char, len);
+       strcpy(data, name);
+       strcpy(data + strlen(name) + 1, watch->token);
+        send_reply(conn, XS_WATCH_EVENT, data, len);
+       talloc_free(data);
 }
 
 /* FIXME: we fail to fire on out of memory.  Should drop connections. */
@@ -139,11 +93,6 @@
                                add_event(i, watch, name);
                        else if (recurse && is_child(watch->node, name))
                                add_event(i, watch, watch->node);
-                       else
-                               continue;
-                       /* If connection not doing anything, queue this. */
-                       if (i->state == OK)
-                               queue_next_event(i);
                }
        }
 }
@@ -231,13 +180,19 @@
 void dump_watches(struct connection *conn)
 {
        struct watch *watch;
-       struct watch_event *event;
 
-       list_for_each_entry(watch, &conn->watches, list) {
+       list_for_each_entry(watch, &conn->watches, list)
                printf("    watch on %s token %s\n",
                       watch->node, watch->token);
-               list_for_each_entry(event, &watch->events, list)
-                       printf("        event: %s\n", event->data);
-       }
 }
 #endif
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r f1e8d5f64105 -r b67873a9e3ac tools/xenstore/xenstored_watch.h
--- a/tools/xenstore/xenstored_watch.h  Sun Oct  9 20:29:10 2005
+++ b/tools/xenstore/xenstored_watch.h  Sun Oct  9 22:53:03 2005
@@ -23,17 +23,9 @@
 #include "xenstored_core.h"
 
 void do_watch(struct connection *conn, struct buffered_data *in);
-void do_watch_ack(struct connection *conn, const char *token);
 void do_unwatch(struct connection *conn, struct buffered_data *in);
 
-/* Is this a watch event message for this connection? */
-bool is_watch_event(struct connection *conn, struct buffered_data *out);
-
-/* Look through our watches: if any of them have an event, queue it. */
-void queue_next_event(struct connection *conn);
-
-/* Fire all watches: recurse means all the children are affected (ie. rm).
- */
+/* Fire all watches: recurse means all the children are affected (ie. rm). */
 void fire_watches(struct connection *conn, const char *name, bool recurse);
 
 void dump_watches(struct connection *conn);
diff -r f1e8d5f64105 -r b67873a9e3ac xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h   Sun Oct  9 20:29:10 2005
+++ b/xen/include/public/io/xs_wire.h   Sun Oct  9 22:53:03 2005
@@ -30,23 +30,23 @@
 
 enum xsd_sockmsg_type
 {
-       XS_DEBUG,
-       XS_DIRECTORY,
-       XS_READ,
-       XS_GET_PERMS,
-       XS_WATCH,
-       XS_UNWATCH,
-       XS_TRANSACTION_START,
-       XS_TRANSACTION_END,
-       XS_INTRODUCE,
-       XS_RELEASE,
-       XS_GET_DOMAIN_PATH,
-       XS_WRITE,
-       XS_MKDIR,
-       XS_RM,
-       XS_SET_PERMS,
-       XS_WATCH_EVENT,
-       XS_ERROR,
+    XS_DEBUG,
+    XS_DIRECTORY,
+    XS_READ,
+    XS_GET_PERMS,
+    XS_WATCH,
+    XS_UNWATCH,
+    XS_TRANSACTION_START,
+    XS_TRANSACTION_END,
+    XS_INTRODUCE,
+    XS_RELEASE,
+    XS_GET_DOMAIN_PATH,
+    XS_WRITE,
+    XS_MKDIR,
+    XS_RM,
+    XS_SET_PERMS,
+    XS_WATCH_EVENT,
+    XS_ERROR,
 };
 
 #define XS_WRITE_NONE "NONE"
@@ -56,38 +56,40 @@
 /* We hand errors as strings, for portability. */
 struct xsd_errors
 {
-       int errnum;
-       const char *errstring;
+    int errnum;
+    const char *errstring;
 };
 #define XSD_ERROR(x) { x, #x }
 static struct xsd_errors xsd_errors[] __attribute__((unused)) = {
-       XSD_ERROR(EINVAL),
-       XSD_ERROR(EACCES),
-       XSD_ERROR(EEXIST),
-       XSD_ERROR(EISDIR),
-       XSD_ERROR(ENOENT),
-       XSD_ERROR(ENOMEM),
-       XSD_ERROR(ENOSPC),
-       XSD_ERROR(EIO),
-       XSD_ERROR(ENOTEMPTY),
-       XSD_ERROR(ENOSYS),
-       XSD_ERROR(EROFS),
-       XSD_ERROR(EBUSY),
-       XSD_ERROR(EAGAIN),
-       XSD_ERROR(EISCONN),
+    XSD_ERROR(EINVAL),
+    XSD_ERROR(EACCES),
+    XSD_ERROR(EEXIST),
+    XSD_ERROR(EISDIR),
+    XSD_ERROR(ENOENT),
+    XSD_ERROR(ENOMEM),
+    XSD_ERROR(ENOSPC),
+    XSD_ERROR(EIO),
+    XSD_ERROR(ENOTEMPTY),
+    XSD_ERROR(ENOSYS),
+    XSD_ERROR(EROFS),
+    XSD_ERROR(EBUSY),
+    XSD_ERROR(EAGAIN),
+    XSD_ERROR(EISCONN),
 };
 struct xsd_sockmsg
 {
-       u32 type;
-       u32 len;                /* Length of data following this. */
+    u32 type;  /* XS_??? */
+    u32 req_id;/* Request identifier, echoed in daemon's response.  */
+    u32 tx_id; /* Transaction id (0 if not related to a transaction). */
+    u32 len;   /* Length of data following this. */
 
-       /* Generally followed by nul-terminated string(s). */
+    /* Generally followed by nul-terminated string(s). */
 };
 
 enum xs_watch_type
 {
-       XS_WATCH_PATH = 0,
-       XS_WATCH_TOKEN,
+    XS_WATCH_PATH = 0,
+    XS_WATCH_TOKEN,
 };
 
 #endif /* _XS_WIRE_H */

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.