[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Xenstore client library spawns a reader thread the first



# HG changeset patch
# User kaf24@xxxxxxxxxxxxxxxxxxxx
# Node ID 46bd7564125d7e91832d132979bd7e4b3af27b08
# Parent  5cca372aec0554f150e54d0ab30c30a06a2ad4b5
Xenstore client library spawns a reader thread the first
time a watch is registered. Before this it is fine for
caller threads to read the comms channel directly as no
async messages will be received.

This avoids various user tools needlessly creating three
threads where one will do the job.

Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx>

diff -r 5cca372aec05 -r 46bd7564125d tools/xenstore/xs.c
--- a/tools/xenstore/xs.c       Tue Oct 11 11:39:03 2005
+++ b/tools/xenstore/xs.c       Tue Oct 11 12:02:59 2005
@@ -52,6 +52,7 @@
          * signals waiters.
          */
        pthread_t read_thr;
+       int read_thr_exists;
 
        /*
          * A list of fired watch messages, protected by a mutex. Users can
@@ -77,6 +78,7 @@
        pthread_mutex_t request_mutex;
 };
 
+static int read_message(struct xs_handle *h);
 static void *read_thread(void *arg);
 
 int xs_fileno(struct xs_handle *h)
@@ -131,7 +133,7 @@
        int fd = -1, saved_errno;
 
        if (stat(connect_to, &buf) != 0)
-               goto error;
+               return NULL;
 
        if (S_ISSOCK(buf.st_mode))
                fd = get_socket(connect_to);
@@ -139,11 +141,17 @@
                fd = get_dev(connect_to);
 
        if (fd == -1)
-               goto error;
+               return NULL;
 
        h = malloc(sizeof(*h));
-       if (h == NULL)
-               goto error;
+       if (h == NULL) {
+               saved_errno = errno;
+               close(fd);
+               errno = saved_errno;
+               return NULL;
+       }
+
+       memset(h, 0, sizeof(*h));
 
        h->fd = fd;
 
@@ -160,19 +168,7 @@
 
        pthread_mutex_init(&h->request_mutex, NULL);
 
-       if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
-               goto error;
-
        return h;
-
- error:
-       saved_errno = errno;
-       if (h != NULL)
-               free(h);
-       if (fd != -1)
-               close(fd);
-       errno = saved_errno;
-       return NULL;
 }
 
 struct xs_handle *xs_daemon_open(void)
@@ -198,9 +194,11 @@
        pthread_mutex_lock(&h->reply_mutex);
        pthread_mutex_lock(&h->watch_mutex);
 
-       /* XXX FIXME: May leak an unpublished message buffer. */
-       pthread_cancel(h->read_thr);
-       pthread_join(h->read_thr, NULL);
+       if (h->read_thr_exists) {
+               /* XXX FIXME: May leak an unpublished message buffer. */
+               pthread_cancel(h->read_thr);
+               pthread_join(h->read_thr, NULL);
+       }
 
        list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
                free(msg->body);
@@ -271,6 +269,10 @@
        struct xs_stored_msg *msg;
        char *body;
 
+       /* Read from comms channel ourselves if there is no reader thread. */
+       if (!h->read_thr_exists && (read_message(h) == -1))
+               return NULL;
+
        pthread_mutex_lock(&h->reply_mutex);
        while (list_empty(&h->reply_list))
                pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
@@ -541,6 +543,17 @@
 {
        struct iovec iov[2];
 
+       /* We dynamically create a reader thread on demand. */
+       pthread_mutex_lock(&h->request_mutex);
+       if (!h->read_thr_exists) {
+               if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
+                       pthread_mutex_unlock(&h->request_mutex);
+                       return false;
+               }
+               h->read_thr_exists = 1;
+       }
+       pthread_mutex_unlock(&h->request_mutex);
+
        iov[0].iov_base = (void *)path;
        iov[0].iov_len = strlen(path) + 1;
        iov[1].iov_base = (void *)token;
@@ -717,65 +730,72 @@
                        ARRAY_SIZE(iov), NULL);
 }
 
-static void *read_thread(void *arg)
-{
-       struct xs_handle *h = arg;
+static int read_message(struct xs_handle *h)
+{
        struct xs_stored_msg *msg = NULL;
        char *body = NULL;
-
-       for (;;) {
-               msg = NULL;
-               body = NULL;
-
-               /* Allocate message structure and read the message header. */
-               msg = malloc(sizeof(*msg));
-               if (msg == NULL)
+       int saved_errno;
+
+       /* Allocate message structure and read the message header. */
+       msg = malloc(sizeof(*msg));
+       if (msg == NULL)
+               goto error;
+       if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+               goto error;
+
+       /* Allocate and read the message body. */
+       body = msg->body = malloc(msg->hdr.len + 1);
+       if (body == NULL)
+               goto error;
+       if (!read_all(h->fd, body, msg->hdr.len))
+               goto error;
+       body[msg->hdr.len] = '\0';
+
+       if (msg->hdr.type == XS_WATCH_EVENT) {
+               pthread_mutex_lock(&h->watch_mutex);
+
+               /* Kick users out of their select() loop. */
+               if (list_empty(&h->watch_list) &&
+                   (h->watch_pipe[1] != -1))
+                       while (write(h->watch_pipe[1], body, 1) != 1)
+                               continue;
+
+               list_add_tail(&msg->list, &h->watch_list);
+               pthread_cond_signal(&h->watch_condvar);
+
+               pthread_mutex_unlock(&h->watch_mutex);
+       } else {
+               pthread_mutex_lock(&h->reply_mutex);
+
+               /* There should only ever be one response pending! */
+               if (!list_empty(&h->reply_list)) {
+                       pthread_mutex_unlock(&h->reply_mutex);
                        goto error;
-               if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
-                       goto error;
-
-               /* Allocate and read the message body. */
-               body = msg->body = malloc(msg->hdr.len + 1);
-               if (body == NULL)
-                       goto error;
-               if (!read_all(h->fd, body, msg->hdr.len))
-                       goto error;
-               body[msg->hdr.len] = '\0';
-
-               if (msg->hdr.type == XS_WATCH_EVENT) {
-                       pthread_mutex_lock(&h->watch_mutex);
-
-                       /* Kick users out of their select() loop. */
-                       if (list_empty(&h->watch_list) &&
-                           (h->watch_pipe[1] != -1))
-                               while (write(h->watch_pipe[1], body, 1) != 1)
-                                       continue;
-
-                       list_add_tail(&msg->list, &h->watch_list);
-                       pthread_cond_signal(&h->watch_condvar);
-
-                       pthread_mutex_unlock(&h->watch_mutex);
-               } else {
-                       pthread_mutex_lock(&h->reply_mutex);
-
-                       /* There should only ever be one response pending! */
-                       if (!list_empty(&h->reply_list)) {
-                               pthread_mutex_unlock(&h->reply_mutex);
-                               goto error;
-                       }
-
-                       list_add_tail(&msg->list, &h->reply_list);
-                       pthread_cond_signal(&h->reply_condvar);
-
-                       pthread_mutex_unlock(&h->reply_mutex);
                }
-       }
+
+               list_add_tail(&msg->list, &h->reply_list);
+               pthread_cond_signal(&h->reply_condvar);
+
+               pthread_mutex_unlock(&h->reply_mutex);
+       }
+
+       return 0;
 
  error:
-       if (body != NULL)
-               free(body);
-       if (msg != NULL)
-               free(msg);
+       saved_errno = errno;
+       free(msg);
+       free(body);
+       errno = saved_errno;
+       return -1;
+}
+
+static void *read_thread(void *arg)
+{
+       struct xs_handle *h = arg;
+
+       while (read_message(h) != -1)
+               continue;
+
        return NULL;
 }
 

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.