[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] Xenstore client library spawns a reader thread the first
# HG changeset patch # User kaf24@xxxxxxxxxxxxxxxxxxxx # Node ID 46bd7564125d7e91832d132979bd7e4b3af27b08 # Parent 5cca372aec0554f150e54d0ab30c30a06a2ad4b5 Xenstore client library spawns a reader thread the first time a watch is registered. Before this it is fine for caller threads to read the comms channel directly as no async messages will be received. This avoids various user tools needlessly creating three threads where one will do the job. Signed-off-by: Keir Fraser <keir@xxxxxxxxxxxxx> diff -r 5cca372aec05 -r 46bd7564125d tools/xenstore/xs.c --- a/tools/xenstore/xs.c Tue Oct 11 11:39:03 2005 +++ b/tools/xenstore/xs.c Tue Oct 11 12:02:59 2005 @@ -52,6 +52,7 @@ * signals waiters. */ pthread_t read_thr; + int read_thr_exists; /* * A list of fired watch messages, protected by a mutex. Users can @@ -77,6 +78,7 @@ pthread_mutex_t request_mutex; }; +static int read_message(struct xs_handle *h); static void *read_thread(void *arg); int xs_fileno(struct xs_handle *h) @@ -131,7 +133,7 @@ int fd = -1, saved_errno; if (stat(connect_to, &buf) != 0) - goto error; + return NULL; if (S_ISSOCK(buf.st_mode)) fd = get_socket(connect_to); @@ -139,11 +141,17 @@ fd = get_dev(connect_to); if (fd == -1) - goto error; + return NULL; h = malloc(sizeof(*h)); - if (h == NULL) - goto error; + if (h == NULL) { + saved_errno = errno; + close(fd); + errno = saved_errno; + return NULL; + } + + memset(h, 0, sizeof(*h)); h->fd = fd; @@ -160,19 +168,7 @@ pthread_mutex_init(&h->request_mutex, NULL); - if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) - goto error; - return h; - - error: - saved_errno = errno; - if (h != NULL) - free(h); - if (fd != -1) - close(fd); - errno = saved_errno; - return NULL; } struct xs_handle *xs_daemon_open(void) @@ -198,9 +194,11 @@ pthread_mutex_lock(&h->reply_mutex); pthread_mutex_lock(&h->watch_mutex); - /* XXX FIXME: May leak an unpublished message buffer. */ - pthread_cancel(h->read_thr); - pthread_join(h->read_thr, NULL); + if (h->read_thr_exists) { + /* XXX FIXME: May leak an unpublished message buffer. */ + pthread_cancel(h->read_thr); + pthread_join(h->read_thr, NULL); + } list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) { free(msg->body); @@ -271,6 +269,10 @@ struct xs_stored_msg *msg; char *body; + /* Read from comms channel ourselves if there is no reader thread. */ + if (!h->read_thr_exists && (read_message(h) == -1)) + return NULL; + pthread_mutex_lock(&h->reply_mutex); while (list_empty(&h->reply_list)) pthread_cond_wait(&h->reply_condvar, &h->reply_mutex); @@ -541,6 +543,17 @@ { struct iovec iov[2]; + /* We dynamically create a reader thread on demand. */ + pthread_mutex_lock(&h->request_mutex); + if (!h->read_thr_exists) { + if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) { + pthread_mutex_unlock(&h->request_mutex); + return false; + } + h->read_thr_exists = 1; + } + pthread_mutex_unlock(&h->request_mutex); + iov[0].iov_base = (void *)path; iov[0].iov_len = strlen(path) + 1; iov[1].iov_base = (void *)token; @@ -717,65 +730,72 @@ ARRAY_SIZE(iov), NULL); } -static void *read_thread(void *arg) -{ - struct xs_handle *h = arg; +static int read_message(struct xs_handle *h) +{ struct xs_stored_msg *msg = NULL; char *body = NULL; - - for (;;) { - msg = NULL; - body = NULL; - - /* Allocate message structure and read the message header. */ - msg = malloc(sizeof(*msg)); - if (msg == NULL) + int saved_errno; + + /* Allocate message structure and read the message header. */ + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto error; + if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr))) + goto error; + + /* Allocate and read the message body. */ + body = msg->body = malloc(msg->hdr.len + 1); + if (body == NULL) + goto error; + if (!read_all(h->fd, body, msg->hdr.len)) + goto error; + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + pthread_mutex_lock(&h->watch_mutex); + + /* Kick users out of their select() loop. */ + if (list_empty(&h->watch_list) && + (h->watch_pipe[1] != -1)) + while (write(h->watch_pipe[1], body, 1) != 1) + continue; + + list_add_tail(&msg->list, &h->watch_list); + pthread_cond_signal(&h->watch_condvar); + + pthread_mutex_unlock(&h->watch_mutex); + } else { + pthread_mutex_lock(&h->reply_mutex); + + /* There should only ever be one response pending! */ + if (!list_empty(&h->reply_list)) { + pthread_mutex_unlock(&h->reply_mutex); goto error; - if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr))) - goto error; - - /* Allocate and read the message body. */ - body = msg->body = malloc(msg->hdr.len + 1); - if (body == NULL) - goto error; - if (!read_all(h->fd, body, msg->hdr.len)) - goto error; - body[msg->hdr.len] = '\0'; - - if (msg->hdr.type == XS_WATCH_EVENT) { - pthread_mutex_lock(&h->watch_mutex); - - /* Kick users out of their select() loop. */ - if (list_empty(&h->watch_list) && - (h->watch_pipe[1] != -1)) - while (write(h->watch_pipe[1], body, 1) != 1) - continue; - - list_add_tail(&msg->list, &h->watch_list); - pthread_cond_signal(&h->watch_condvar); - - pthread_mutex_unlock(&h->watch_mutex); - } else { - pthread_mutex_lock(&h->reply_mutex); - - /* There should only ever be one response pending! */ - if (!list_empty(&h->reply_list)) { - pthread_mutex_unlock(&h->reply_mutex); - goto error; - } - - list_add_tail(&msg->list, &h->reply_list); - pthread_cond_signal(&h->reply_condvar); - - pthread_mutex_unlock(&h->reply_mutex); } - } + + list_add_tail(&msg->list, &h->reply_list); + pthread_cond_signal(&h->reply_condvar); + + pthread_mutex_unlock(&h->reply_mutex); + } + + return 0; error: - if (body != NULL) - free(body); - if (msg != NULL) - free(msg); + saved_errno = errno; + free(msg); + free(body); + errno = saved_errno; + return -1; +} + +static void *read_thread(void *arg) +{ + struct xs_handle *h = arg; + + while (read_message(h) != -1) + continue; + return NULL; } _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |