[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Make libxenstore thread-safe. It also spawns an internal



# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 2144de6eabcc7fc6272a8ca088008ef92c05aa6b
# Parent  e69413dca6844c87885121a7360fa7c2b11cdea9
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/lowlevel/xs/xs.c
--- a/tools/python/xen/lowlevel/xs/xs.c Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/lowlevel/xs/xs.c Sat Oct  8 18:19:27 2005
@@ -775,39 +775,6 @@
     return val;
 }
 
-#define xspy_shutdown_doc "\n"                 \
-       "Shutdown the xenstore daemon.\n"       \
-       "\n"                                    \
-       "Returns None on success.\n"            \
-       "Raises RuntimeError on error.\n"       \
-       "\n"
-
-static PyObject *xspy_shutdown(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwd_spec[] = { NULL };
-    static char *arg_spec = "";
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-    int xsval = 0;
-
-    if (!xh)
-        goto exit;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
-        goto exit;
-    Py_BEGIN_ALLOW_THREADS
-    xsval = xs_shutdown(xh);
-    Py_END_ALLOW_THREADS
-    if (!xsval) {
-        PyErr_SetFromErrno(PyExc_RuntimeError);
-        goto exit;
-    }
-    Py_INCREF(Py_None);
-    val = Py_None;
- exit:
-    return val;
-}
-
 #define xspy_get_domain_path_doc "\n"                  \
        "Return store path of domain.\n"                \
        " domid [int]: domain id\n"                     \
@@ -846,28 +813,6 @@
     }
     val = PyString_FromString(xsval);
     free(xsval);
- exit:
-    return val;
-}
-
-#define xspy_fileno_doc "\n"                                   \
-       "Get the file descriptor of the xenstore socket.\n"     \
-       "Allows an xs object to be passed to select().\n"       \
-       "\n"                                                    \
-       "Returns: [int] file descriptor.\n"                     \
-       "\n"
-
-static PyObject *xspy_fileno(PyObject *self, PyObject *args, PyObject *kwds)
-{
-    static char *kwd_spec[] = { NULL };
-    static char *arg_spec = "";
-
-    struct xs_handle *xh = xshandle(self);
-    PyObject *val = NULL;
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, arg_spec, kwd_spec))
-        goto exit;
-    val = PyInt_FromLong((xh ? xs_fileno(xh) : -1));
  exit:
     return val;
 }
@@ -895,9 +840,7 @@
      XSPY_METH(introduce_domain),
      XSPY_METH(release_domain),
      XSPY_METH(close),
-     XSPY_METH(shutdown),
      XSPY_METH(get_domain_path),
-     XSPY_METH(fileno),
      { /* Terminator. */ },
 };
 
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xsutil.py
--- a/tools/python/xen/xend/xenstore/xsutil.py  Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xsutil.py  Sat Oct  8 18:19:27 2005
@@ -7,14 +7,17 @@
 import threading
 from xen.lowlevel import xs
 
-handles = {}
+xs_lock = threading.Lock()
+xs_handle = None
 
-# XXX need to g/c handles from dead threads
 def xshandle():
-    if not handles.has_key(threading.currentThread()):
-        handles[threading.currentThread()] = xs.open()
-    return handles[threading.currentThread()]
-
+    global xs_handle, xs_lock
+    if not xs_handle:
+        xs_lock.acquire()
+        if not xs_handle:
+            xs_handle = xs.open()
+        xs_lock.release()
+    return xs_handle
 
 def IntroduceDomain(domid, page, port, path):
     return xshandle().introduce_domain(domid, page, port, path)
diff -r e69413dca684 -r 2144de6eabcc tools/python/xen/xend/xenstore/xswatch.py
--- a/tools/python/xen/xend/xenstore/xswatch.py Sat Oct  8 09:22:01 2005
+++ b/tools/python/xen/xend/xenstore/xswatch.py Sat Oct  8 18:19:27 2005
@@ -12,7 +12,6 @@
 class xswatch:
 
     watchThread = None
-    threadcond = threading.Condition()
     xs = None
     xslock = threading.Lock()
     
@@ -21,43 +20,31 @@
         self.args = args
         self.kwargs = kwargs
         xswatch.watchStart()
-        xswatch.xslock.acquire()
         xswatch.xs.watch(path, self)
-        xswatch.xslock.release()
 
     def watchStart(cls):
-        cls.threadcond.acquire()
+        cls.xslock.acquire()
         if cls.watchThread:
-            cls.threadcond.release()
+            cls.xslock.release()
             return
+        # XXX: When we fix xenstored to have better watch semantics,
+        # this can change to shared xshandle(). Currently that would result
+        # in duplicate watch firings, thus failed extra xs.acknowledge_watch.
+        cls.xs = xs.open()
         cls.watchThread = threading.Thread(name="Watcher",
                                            target=cls.watchMain)
         cls.watchThread.setDaemon(True)
         cls.watchThread.start()
-        while cls.xs == None:
-            cls.threadcond.wait()
-        cls.threadcond.release()
+        cls.xslock.release()
 
     watchStart = classmethod(watchStart)
 
     def watchMain(cls):
-        cls.threadcond.acquire()
-        cls.xs = xs.open()
-        cls.threadcond.notifyAll()
-        cls.threadcond.release()
         while True:
             try:
-                (fd, _1, _2) = select.select([ cls.xs ], [], [])
-                cls.xslock.acquire()
-                # reconfirm ready to read with lock
-                (fd, _1, _2) = select.select([ cls.xs ], [], [], 0.001)
-                if not cls.xs in fd:
-                    cls.xslock.release()
-                    continue
                 we = cls.xs.read_watch()
                 watch = we[1]
                 cls.xs.acknowledge_watch(watch)
-                cls.xslock.release()
             except RuntimeError, ex:
                 print ex
                 raise
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/Makefile
--- a/tools/xenstore/Makefile   Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/Makefile   Sat Oct  8 18:19:27 2005
@@ -34,7 +34,6 @@
 xenstored: xenstored_core.o xenstored_watch.o xenstored_domain.o 
xenstored_transaction.o xs_lib.o talloc.o utils.o tdb.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -o $@
 
-$(CLIENTS): libxenstore.so
 $(CLIENTS): xenstore-%: xenstore_%.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -lxenctrl -L. -lxenstore -o $@
 
@@ -47,6 +46,7 @@
 xs_tdb_dump: xs_tdb_dump.o utils.o tdb.o talloc.o
        $(LINK.o) $^ $(LOADLIBES) $(LDLIBS) -o $@
 
+xs_test xs_random xs_stress xs_crashme: LDFLAGS+=-lpthread
 xs_test: xs_test.o xs_lib.o utils.o
 xs_random: xs_random.o xs_test_lib.o xs_lib.o talloc.o utils.o
 xs_stress: xs_stress.o xs_test_lib.o xs_lib.o talloc.o utils.o
@@ -69,7 +69,7 @@
        $(COMPILE.c) -o $@ $<
 
 libxenstore.so: xs.opic xs_lib.opic
-       $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ 
$^
+       $(CC) $(CFLAGS) $(LDFLAGS) -Wl,-soname -Wl,libxenstore.so -shared -o $@ 
$^ -lpthread
 
 clean: testsuite-clean
        rm -f *.o *.opic *.so
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/12readonly.test
--- a/tools/xenstore/testsuite/12readonly.test  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/testsuite/12readonly.test  Sat Oct  8 18:19:27 2005
@@ -27,8 +27,6 @@
 setperm /test 100 NONE
 expect setperm failed: Permission denied
 setperm /test 100 NONE
-expect shutdown failed: Permission denied
-shutdown
 expect introduce failed: Permission denied
 introduce 1 100 7 /home
 
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/test.sh
--- a/tools/xenstore/testsuite/test.sh  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/testsuite/test.sh  Sat Oct  8 18:19:27 2005
@@ -23,7 +23,8 @@
            cat testsuite/tmp/xenstored_errors
            return 1
        fi
-       echo shutdown | ./xs_test
+       kill $PID
+       sleep 1
        return 0
     else
        # In case daemon is wedged.
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xenstored_core.c
--- a/tools/xenstore/xenstored_core.c   Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xenstored_core.c   Sat Oct  8 18:19:27 2005
@@ -150,7 +150,6 @@
 {
        switch (type) {
        case XS_DEBUG: return "DEBUG";
-       case XS_SHUTDOWN: return "SHUTDOWN";
        case XS_DIRECTORY: return "DIRECTORY";
        case XS_READ: return "READ";
        case XS_GET_PERMS: return "GET_PERMS";
@@ -1082,17 +1081,6 @@
        case XS_SET_PERMS:
                do_set_perms(conn, in);
                break;
-
-       case XS_SHUTDOWN:
-               /* FIXME: Implement gentle shutdown too. */
-               /* Only tools can do this. */
-               if (conn->id != 0 || !conn->can_write) {
-                       send_error(conn, EACCES);
-                       break;
-               }
-               send_ack(conn, XS_SHUTDOWN);
-               /* Everything hangs off auto-free context, freed at exit. */
-               exit(0);
 
        case XS_DEBUG:
                if (streq(in->buffer, "print"))
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs.c       Sat Oct  8 18:19:27 2005
@@ -32,82 +32,153 @@
 #include <stdint.h>
 #include <errno.h>
 #include <sys/ioctl.h>
+#include <pthread.h>
 #include "xs.h"
+#include "list.h"
 #include "utils.h"
 
-struct xs_handle
-{
+struct xs_stored_msg {
+       struct list_head list;
+       struct xsd_sockmsg hdr;
+       char *body;
+};
+
+struct xs_handle {
+       /* Communications channel to xenstore daemon. */
        int fd;
+
+       /*
+         * A read thread which pulls messages off the comms channel and
+         * signals waiters.
+         */
+       pthread_t read_thr;
+
+       /*
+         * A list of fired watch messages, protected by a mutex. Users can
+         * wait on the conditional variable until a watch is pending.
+         */
+       struct list_head watch_list;
+       pthread_mutex_t watch_mutex;
+       pthread_cond_t watch_condvar;
+
+       /* Clients can select() on this pipe to wait for a watch to fire. */
+       int watch_pipe[2];
+
+       /*
+         * A list of replies. Currently only one will ever be outstanding
+         * because we serialise requests. The requester can wait on the
+         * conditional variable for its response.
+         */
+       struct list_head reply_list;
+       pthread_mutex_t reply_mutex;
+       pthread_cond_t reply_condvar;
+
+       /* One request at a time. */
+       pthread_mutex_t request_mutex;
+
+       /* One transaction at a time. */
+       pthread_mutex_t transaction_mutex;
 };
 
-/* Get the socket from the store daemon handle.
- */
+static void *read_thread(void *arg);
+
 int xs_fileno(struct xs_handle *h)
 {
-       return h->fd;
-}
-
-static struct xs_handle *get_socket(const char *connect_to)
+       char c = 0;
+
+       pthread_mutex_lock(&h->watch_mutex);
+
+       if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
+               /* Kick things off if the watch list is already non-empty. */
+               if (!list_empty(&h->watch_list))
+                       while (write(h->watch_pipe[1], &c, 1) != 1)
+                               continue;
+       }
+
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       return h->watch_pipe[0];
+}
+
+static int get_socket(const char *connect_to)
 {
        struct sockaddr_un addr;
        int sock, saved_errno;
-       struct xs_handle *h = NULL;
 
        sock = socket(PF_UNIX, SOCK_STREAM, 0);
        if (sock < 0)
-               return NULL;
+               return -1;
 
        addr.sun_family = AF_UNIX;
        strcpy(addr.sun_path, connect_to);
 
-       if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
-               h = malloc(sizeof(*h));
-               if (h) {
-                       h->fd = sock;
-                       return h;
-               }
-       }
-
+       if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
+               saved_errno = errno;
+               close(sock);
+               errno = saved_errno;
+               return -1;
+       }
+
+       return sock;
+}
+
+static int get_dev(const char *connect_to)
+{
+       return open(connect_to, O_RDWR);
+}
+
+static struct xs_handle *get_handle(const char *connect_to)
+{
+       struct stat buf;
+       struct xs_handle *h = NULL;
+       int fd = -1, saved_errno;
+
+       if (stat(connect_to, &buf) != 0)
+               goto error;
+
+       if (S_ISSOCK(buf.st_mode))
+               fd = get_socket(connect_to);
+       else
+               fd = get_dev(connect_to);
+
+       if (fd == -1)
+               goto error;
+
+       h = malloc(sizeof(*h));
+       if (h == NULL)
+               goto error;
+
+       h->fd = fd;
+
+       /* Watch pipe is allocated on demand in xs_fileno(). */
+       h->watch_pipe[0] = h->watch_pipe[1] = -1;
+
+       INIT_LIST_HEAD(&h->watch_list);
+       pthread_mutex_init(&h->watch_mutex, NULL);
+       pthread_cond_init(&h->watch_condvar, NULL);
+
+       INIT_LIST_HEAD(&h->reply_list);
+       pthread_mutex_init(&h->reply_mutex, NULL);
+       pthread_cond_init(&h->reply_condvar, NULL);
+
+       pthread_mutex_init(&h->request_mutex, NULL);
+       pthread_mutex_init(&h->transaction_mutex, NULL);
+
+       if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
+               goto error;
+
+       return h;
+
+ error:
        saved_errno = errno;
-       close(sock);
+       if (h != NULL)
+               free(h);
+       if (fd != -1)
+               close(fd);
        errno = saved_errno;
        return NULL;
 }
 
-static struct xs_handle *get_dev(const char *connect_to)
-{
-       int fd, saved_errno;
-       struct xs_handle *h;
-
-       fd = open(connect_to, O_RDWR);
-       if (fd < 0)
-               return NULL;
-
-       h = malloc(sizeof(*h));
-       if (h) {
-               h->fd = fd;
-               return h;
-       }
-
-       saved_errno = errno;
-       close(fd);
-       errno = saved_errno;
-       return NULL;
-}
-
-static struct xs_handle *get_handle(const char *connect_to)
-{
-       struct stat buf;
-
-       if (stat(connect_to, &buf) != 0)
-               return NULL;
-
-       if (S_ISSOCK(buf.st_mode))
-               return get_socket(connect_to);
-       else
-               return get_dev(connect_to);
-}
-
 struct xs_handle *xs_daemon_open(void)
 {
        return get_handle(xs_daemon_socket());
@@ -125,8 +196,39 @@
 
 void xs_daemon_close(struct xs_handle *h)
 {
-       if (h->fd >= 0)
-               close(h->fd);
+       struct xs_stored_msg *msg, *tmsg;
+
+       pthread_mutex_lock(&h->transaction_mutex);
+       pthread_mutex_lock(&h->request_mutex);
+       pthread_mutex_lock(&h->reply_mutex);
+       pthread_mutex_lock(&h->watch_mutex);
+
+       /* XXX FIXME: May leak an unpublished message buffer. */
+       pthread_cancel(h->read_thr);
+       pthread_join(h->read_thr, NULL);
+
+       list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
+               free(msg->body);
+               free(msg);
+       }
+
+       list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
+               free(msg->body);
+               free(msg);
+       }
+
+       pthread_mutex_unlock(&h->transaction_mutex);
+       pthread_mutex_unlock(&h->request_mutex);
+       pthread_mutex_unlock(&h->reply_mutex);
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       if (h->watch_pipe[0] != -1) {
+               close(h->watch_pipe[0]);
+               close(h->watch_pipe[1]);
+       }
+
+       close(h->fd);
+
        free(h);
 }
 
@@ -169,31 +271,28 @@
 }
 
 /* Adds extra nul terminator, because we generally (always?) hold strings. */
-static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len)
-{
-       struct xsd_sockmsg msg;
-       void *ret;
-       int saved_errno;
-
-       if (!read_all(fd, &msg, sizeof(msg)))
-               return NULL;
-
-       ret = malloc(msg.len + 1);
-       if (!ret)
-               return NULL;
-
-       if (!read_all(fd, ret, msg.len)) {
-               saved_errno = errno;
-               free(ret);
-               errno = saved_errno;
-               return NULL;
-       }
-
-       *type = msg.type;
+static void *read_reply(
+       struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
+{
+       struct xs_stored_msg *msg;
+       char *body;
+
+       pthread_mutex_lock(&h->reply_mutex);
+       while (list_empty(&h->reply_list))
+               pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
+       msg = list_top(&h->reply_list, struct xs_stored_msg, list);
+       list_del(&msg->list);
+       assert(list_empty(&h->reply_list));
+       pthread_mutex_unlock(&h->reply_mutex);
+
+       *type = msg->hdr.type;
        if (len)
-               *len = msg.len;
-       ((char *)ret)[msg.len] = '\0';
-       return ret;
+               *len = msg->hdr.len;
+       body = msg->body;
+
+       free(msg);
+
+       return body;
 }
 
 /* Send message to xs, get malloc'ed reply.  NULL and set errno on error. */
@@ -217,6 +316,8 @@
        ignorepipe.sa_flags = 0;
        sigaction(SIGPIPE, &ignorepipe, &oldact);
 
+       pthread_mutex_lock(&h->request_mutex);
+
        if (!xs_write_all(h->fd, &msg, sizeof(msg)))
                goto fail;
 
@@ -224,14 +325,11 @@
                if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
                        goto fail;
 
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               free(ret);
-               ret = read_reply(h->fd, &msg.type, len);
-               if (!ret)
-                       goto fail;
-       } while (msg.type == XS_WATCH_EVENT);
+       ret = read_reply(h, &msg.type, len);
+       if (!ret)
+               goto fail;
+
+       pthread_mutex_unlock(&h->request_mutex);
 
        sigaction(SIGPIPE, &oldact, NULL);
        if (msg.type == XS_ERROR) {
@@ -252,6 +350,7 @@
 fail:
        /* We're in a bad state, so close fd. */
        saved_errno = errno;
+       pthread_mutex_unlock(&h->request_mutex);
        sigaction(SIGPIPE, &oldact, NULL);
 close_fd:
        close(h->fd);
@@ -449,39 +548,45 @@
  */
 char **xs_read_watch(struct xs_handle *h, unsigned int *num)
 {
-       struct xsd_sockmsg msg;
-       char **ret;
-       char *strings;
+       struct xs_stored_msg *msg;
+       char **ret, *strings, c = 0;
        unsigned int num_strings, i;
 
-       if (!read_all(h->fd, &msg, sizeof(msg)))
-               return NULL;
-
-       assert(msg.type == XS_WATCH_EVENT);
-       strings = malloc(msg.len);
-       if (!strings)
-               return NULL;
-
-       if (!read_all(h->fd, strings, msg.len)) {
-               free_no_errno(strings);
-               return NULL;
-       }
-
-       num_strings = xs_count_strings(strings, msg.len);
-
-       ret = malloc(sizeof(char*) * num_strings + msg.len);
+       pthread_mutex_lock(&h->watch_mutex);
+
+       /* Wait on the condition variable for a watch to fire. */
+       while (list_empty(&h->watch_list))
+               pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
+       msg = list_top(&h->watch_list, struct xs_stored_msg, list);
+       list_del(&msg->list);
+
+       /* Clear the pipe token if there are no more pending watches. */
+       if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
+               while (read(h->watch_pipe[0], &c, 1) != 1)
+                       continue;
+
+       pthread_mutex_unlock(&h->watch_mutex);
+
+       assert(msg->hdr.type == XS_WATCH_EVENT);
+
+       strings     = msg->body;
+       num_strings = xs_count_strings(strings, msg->hdr.len);
+
+       ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
        if (!ret) {
                free_no_errno(strings);
+               free_no_errno(msg);
                return NULL;
        }
 
        ret[0] = (char *)(ret + num_strings);
-       memcpy(ret[0], strings, msg.len);
+       memcpy(ret[0], strings, msg->hdr.len);
+
        free(strings);
-
-       for (i = 1; i < num_strings; i++) {
+       free(msg);
+
+       for (i = 1; i < num_strings; i++)
                ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
-       }
 
        *num = num_strings;
 
@@ -519,6 +624,7 @@
  */
 bool xs_transaction_start(struct xs_handle *h)
 {
+       pthread_mutex_lock(&h->transaction_mutex);
        return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
 }
 
@@ -530,12 +636,18 @@
 bool xs_transaction_end(struct xs_handle *h, bool abort)
 {
        char abortstr[2];
+       bool rc;
 
        if (abort)
                strcpy(abortstr, "F");
        else
                strcpy(abortstr, "T");
-       return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+       
+       rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+       pthread_mutex_unlock(&h->transaction_mutex);
+
+       return rc;
 }
 
 /* Introduce a new domain.
@@ -584,18 +696,6 @@
        return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
 }
 
-bool xs_shutdown(struct xs_handle *h)
-{
-       bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL));
-       if (ret) {
-               char c;
-               /* Wait for it to actually shutdown. */
-               while ((read(h->fd, &c, 1) < 0) && (errno == EINTR))
-                       continue;
-       }
-       return ret;
-}
-
 /* Only useful for DEBUG versions */
 char *xs_debug_command(struct xs_handle *h, const char *cmd,
                       void *data, unsigned int len)
@@ -609,3 +709,75 @@
 
        return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
 }
+
+static void *read_thread(void *arg)
+{
+       struct xs_handle *h = arg;
+       struct xs_stored_msg *msg = NULL;
+       char *body = NULL;
+
+       for (;;) {
+               msg = NULL;
+               body = NULL;
+
+               /* Allocate message structure and read the message header. */
+               msg = malloc(sizeof(*msg));
+               if (msg == NULL)
+                       goto error;
+               if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+                       goto error;
+
+               /* Allocate and read the message body. */
+               body = msg->body = malloc(msg->hdr.len + 1);
+               if (body == NULL)
+                       goto error;
+               if (!read_all(h->fd, body, msg->hdr.len))
+                       goto error;
+               body[msg->hdr.len] = '\0';
+
+               if (msg->hdr.type == XS_WATCH_EVENT) {
+                       pthread_mutex_lock(&h->watch_mutex);
+
+                       /* Kick users out of their select() loop. */
+                       if (list_empty(&h->watch_list) &&
+                           (h->watch_pipe[1] != -1))
+                               while (write(h->watch_pipe[1], body, 1) != 1)
+                                       continue;
+
+                       list_add_tail(&msg->list, &h->watch_list);
+                       pthread_cond_signal(&h->watch_condvar);
+
+                       pthread_mutex_unlock(&h->watch_mutex);
+               } else {
+                       pthread_mutex_lock(&h->reply_mutex);
+
+                       /* There should only ever be one response pending! */
+                       if (!list_empty(&h->reply_list)) {
+                               pthread_mutex_unlock(&h->reply_mutex);
+                               goto error;
+                       }
+
+                       list_add_tail(&msg->list, &h->reply_list);
+                       pthread_cond_signal(&h->reply_condvar);
+
+                       pthread_mutex_unlock(&h->reply_mutex);
+               }
+       }
+
+ error:
+       if (body != NULL)
+               free(body);
+       if (msg != NULL)
+               free(msg);
+       return NULL;
+}
+
+/*
+ * Local variables:
+ *  c-file-style: "linux"
+ *  indent-tabs-mode: t
+ *  c-indent-level: 8
+ *  c-basic-offset: 8
+ *  tab-width: 8
+ * End:
+ */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs.h
--- a/tools/xenstore/xs.h       Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs.h       Sat Oct  8 18:19:27 2005
@@ -141,7 +141,4 @@
 char *xs_debug_command(struct xs_handle *h, const char *cmd,
                       void *data, unsigned int len);
 
-/* Shut down the daemon. */
-bool xs_shutdown(struct xs_handle *h);
-
 #endif /* _XS_H */
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_random.c
--- a/tools/xenstore/xs_random.c        Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs_random.c        Sat Oct  8 18:19:27 2005
@@ -879,20 +879,11 @@
 static void cleanup_xs_ops(void)
 {
        char *cmd;
+
        if (daemon_pid) {
-               struct xs_handle *h;
-               h = xs_daemon_open();
-               if (h) {
-                       if (xs_shutdown(h)) {
-                               waitpid(daemon_pid, NULL, 0);
-                               daemon_pid = 0;
-                       }
-                       xs_daemon_close(h);
-               }
-               if (daemon_pid) {
-                       kill(daemon_pid, SIGTERM);
-                       waitpid(daemon_pid, NULL, 0);
-               }
+               kill(daemon_pid, SIGTERM);
+               waitpid(daemon_pid, NULL, 0);
+               daemon_pid = 0;
        }
        
        cmd = talloc_asprintf(NULL, "rm -rf testsuite/tmp/*");
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/xs_test.c
--- a/tools/xenstore/xs_test.c  Sat Oct  8 09:22:01 2005
+++ b/tools/xenstore/xs_test.c  Sat Oct  8 18:19:27 2005
@@ -198,7 +198,6 @@
             "  rm <path>\n"
             "  getperm <path>\n"
             "  setperm <path> <id> <flags> ...\n"
-            "  shutdown\n"
             "  watch <path> <token>\n"
             "  watchnoack <path> <token>\n"
             "  waitwatch\n"
@@ -214,8 +213,6 @@
             "  notimeout\n"
             "  readonly\n"
             "  readwrite\n"
-            "  noackwrite <path> <value>...\n"
-            "  readack\n"
             "  dump\n");
 }
 
@@ -353,37 +350,6 @@
 {
        if (!xs_write(handles[handle], path, data, strlen(data)))
                failed(handle);
-}
-
-static void do_noackwrite(unsigned int handle,
-                         char *path, char *data)
-{
-       struct xsd_sockmsg msg;
-
-       msg.len = strlen(path) + 1 + strlen(data);
-       msg.type = XS_WRITE;
-       if (!write_all_choice(handles[handle]->fd, &msg, sizeof(msg)))
-               failed(handle);
-       if (!write_all_choice(handles[handle]->fd, path, strlen(path) + 1))
-               failed(handle);
-       if (!write_all_choice(handles[handle]->fd, data, strlen(data)))
-               failed(handle);
-       /* Do not wait for ack. */
-}
-
-static void do_readack(unsigned int handle)
-{
-       enum xsd_sockmsg_type type;
-       char *ret = NULL;
-
-       /* Watches can have fired before reply comes: daemon detects
-        * and re-transmits, so we can ignore this. */
-       do {
-               free(ret);
-               ret = read_reply(handles[handle]->fd, &type, NULL);
-               if (!ret)
-                       failed(handle);
-       } while (type == XS_WATCH_EVENT);
 }
 
 static void do_setid(unsigned int handle, char *id)
@@ -472,12 +438,6 @@
        }
 
        if (!xs_set_permissions(handles[handle], path, perms, i))
-               failed(handle);
-}
-
-static void do_shutdown(unsigned int handle)
-{
-       if (!xs_shutdown(handles[handle]))
                failed(handle);
 }
 
@@ -780,8 +740,6 @@
                do_getperm(handle, arg(line, 1));
        else if (streq(command, "setperm"))
                do_setperm(handle, arg(line, 1), line);
-       else if (streq(command, "shutdown"))
-               do_shutdown(handle);
        else if (streq(command, "watch"))
                do_watch(handle, arg(line, 1), arg(line, 2), true);
        else if (streq(command, "watchnoack"))
@@ -823,11 +781,7 @@
                readonly = false;
                xs_daemon_close(handles[handle]);
                handles[handle] = NULL;
-       } else if (streq(command, "noackwrite"))
-               do_noackwrite(handle, arg(line,1), arg(line,2));
-       else if (streq(command, "readack"))
-               do_readack(handle);
-       else
+       } else
                barf("Unknown command %s", command);
        fflush(stdout);
        disarm_timeout();
diff -r e69413dca684 -r 2144de6eabcc xen/include/public/io/xs_wire.h
--- a/xen/include/public/io/xs_wire.h   Sat Oct  8 09:22:01 2005
+++ b/xen/include/public/io/xs_wire.h   Sat Oct  8 18:19:27 2005
@@ -31,7 +31,6 @@
 enum xsd_sockmsg_type
 {
        XS_DEBUG,
-       XS_SHUTDOWN,
        XS_DIRECTORY,
        XS_READ,
        XS_GET_PERMS,
diff -r e69413dca684 -r 2144de6eabcc tools/xenstore/testsuite/15nowait.test
--- a/tools/xenstore/testsuite/15nowait.test    Sat Oct  8 09:22:01 2005
+++ /dev/null   Sat Oct  8 18:19:27 2005
@@ -1,25 +0,0 @@
-# If we don't wait for an ack, we can crash daemon as it never expects to be
-# sending out two replies on top of each other.
-noackwrite /1 1
-noackwrite /2 2
-noackwrite /3 3
-noackwrite /4 4
-noackwrite /5 5
-readack
-readack
-readack
-readack
-readack
-
-expect handle is 1
-introduce 1 100 7 /my/home
-1 noackwrite /1 1
-1 noackwrite /2 2
-1 noackwrite /3 3
-1 noackwrite /4 4
-1 noackwrite /5 5
-1 readack
-1 readack
-1 readack
-1 readack
-1 readack
diff -r e69413dca684 -r 2144de6eabcc 
tools/xenstore/testsuite/16block-watch-crash.test
--- a/tools/xenstore/testsuite/16block-watch-crash.test Sat Oct  8 09:22:01 2005
+++ /dev/null   Sat Oct  8 18:19:27 2005
@@ -1,14 +0,0 @@
-# Test case where blocked connection gets sent watch.
-
-# FIXME: We no longer block connections 
-# mkdir /test
-# watch /test token
-# 1 start
-# # This will block on above
-# noackwrite /test/entry contents
-# 1 write /test/entry2 contents
-# 1 commit
-# readack
-# expect /test/entry2:token
-# waitwatch
-# ackwatch token

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.