[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] [xen-unstable] libxl: Permit multithreaded event waiting



# HG changeset patch
# User Ian Jackson <ian.jackson@xxxxxxxxxxxxx>
# Date 1327683683 0
# Node ID cd4bff9d40507cfea619a679f524fafac79a8226
# Parent  d503bdfaba2326b6cc11b45feb660954cc8d9b88
libxl: Permit multithreaded event waiting

Previously, the context would be locked whenever we were waiting in
libxl's own call to poll (waiting for operating system events).

This would mean that multiple simultaneous calls to libxl_event_wait
in different threads with different parameters would not work
properly.

If we simply unlock the context, it would be possible for another
thread to discover the occurrence of the event we were waiting for,
without us even waking up, and we would remain in poll.  So we need a
way to wake up other threads: a pipe, one for each thread in poll.

We also need to move some variables from globals in the ctx to be
per-polling-thread.

Signed-off-by: Ian Jackson <ian.jackson@xxxxxxxxxxxxx>
Acked-by: Ian Campbell <ian.campbell@xxxxxxxxxx>
Committed-by: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>
---


diff -r d503bdfaba23 -r cd4bff9d4050 tools/libxl/libxl.c
--- a/tools/libxl/libxl.c       Fri Jan 27 17:01:23 2012 +0000
+++ b/tools/libxl/libxl.c       Fri Jan 27 17:01:23 2012 +0000
@@ -49,8 +49,9 @@
 
     ctx->osevent_hooks = 0;
 
-    ctx->fd_polls = 0;
-    ctx->fd_rindex = 0;
+    LIBXL_LIST_INIT(&ctx->pollers_event);
+    LIBXL_LIST_INIT(&ctx->pollers_idle);
+
     LIBXL_LIST_INIT(&ctx->efds);
     LIBXL_TAILQ_INIT(&ctx->etimes);
 
@@ -61,6 +62,9 @@
     LIBXL_TAILQ_INIT(&ctx->death_list);
     libxl__ev_xswatch_init(&ctx->death_watch);
 
+    rc = libxl__poller_init(ctx, &ctx->poller_app);
+    if (rc) goto out;
+
     if ( stat(XENSTORE_PID_FILE, &stat_buf) != 0 ) {
         LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "Is xenstore daemon running?\n"
                      "failed to stat %s", XENSTORE_PID_FILE);
@@ -135,8 +139,14 @@
     libxl_version_info_dispose(&ctx->version_info);
     if (ctx->xsh) xs_daemon_close(ctx->xsh);
 
-    free(ctx->fd_polls);
-    free(ctx->fd_rindex);
+    libxl__poller_dispose(&ctx->poller_app);
+    assert(LIBXL_LIST_EMPTY(&ctx->pollers_event));
+    libxl__poller *poller, *poller_tmp;
+    LIBXL_LIST_FOREACH_SAFE(poller, &ctx->pollers_idle, entry, poller_tmp) {
+        libxl__poller_dispose(poller);
+        free(poller);
+    }
+
     free(ctx->watch_slots);
 
     discard_events(&ctx->occurred);
diff -r d503bdfaba23 -r cd4bff9d4050 tools/libxl/libxl_event.c
--- a/tools/libxl/libxl_event.c Fri Jan 27 17:01:23 2012 +0000
+++ b/tools/libxl/libxl_event.c Fri Jan 27 17:01:23 2012 +0000
@@ -510,9 +510,9 @@
  * osevent poll
  */
 
-static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
-                               struct pollfd *fds, int *timeout_upd,
-                               struct timeval now)
+static int beforepoll_internal(libxl__gc *gc, libxl__poller *poller,
+                               int *nfds_io, struct pollfd *fds,
+                               int *timeout_upd, struct timeval now)
 {
     libxl__ev_fd *efd;
     int rc;
@@ -534,7 +534,7 @@
          * not to mess with fd_rindex.
          */
 
-        int maxfd = 0;
+        int maxfd = poller->wakeup_pipe[0] + 1;
         LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
             if (!efd->events)
                 continue;
@@ -542,30 +542,39 @@
                 maxfd = efd->fd + 1;
         }
         /* make sure our array is as big as *nfds_io */
-        if (CTX->fd_rindex_allocd < maxfd) {
+        if (poller->fd_rindex_allocd < maxfd) {
             assert(maxfd < INT_MAX / sizeof(int) / 2);
-            int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
+            int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
             if (!newarray) { rc = ERROR_NOMEM; goto out; }
-            memset(newarray + CTX->fd_rindex_allocd, 0,
-                   sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
-            CTX->fd_rindex = newarray;
-            CTX->fd_rindex_allocd = maxfd;
+            memset(newarray + poller->fd_rindex_allocd, 0,
+                   sizeof(int) * (maxfd - poller->fd_rindex_allocd));
+            poller->fd_rindex = newarray;
+            poller->fd_rindex_allocd = maxfd;
         }
     }
 
     int used = 0;
-    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
-        if (!efd->events)
-            continue;
-        if (used < *nfds_io) {
-            fds[used].fd = efd->fd;
-            fds[used].events = efd->events;
-            fds[used].revents = 0;
-            assert(efd->fd < CTX->fd_rindex_allocd);
-            CTX->fd_rindex[efd->fd] = used;
-        }
-        used++;
-    }
+
+#define REQUIRE_FD(req_fd, req_events, efd) do{                 \
+        if ((req_events)) {                                     \
+            if (used < *nfds_io) {                              \
+                fds[used].fd = (req_fd);                        \
+                fds[used].events = (req_events);                \
+                fds[used].revents = 0;                          \
+                assert((req_fd) < poller->fd_rindex_allocd);    \
+                poller->fd_rindex[(req_fd)] = used;             \
+            }                                                   \
+            used++;                                             \
+        }                                                       \
+    }while(0)
+
+    LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
+        REQUIRE_FD(efd->fd, efd->events, efd);
+
+    REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
+
+#undef REQUIRE_FD
+
     rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
 
     *nfds_io = used;
@@ -599,22 +608,23 @@
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    int rc = beforepoll_internal(gc, nfds_io, fds, timeout_upd, now);
+    int rc = beforepoll_internal(gc, &ctx->poller_app,
+                                 nfds_io, fds, timeout_upd, now);
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
 }
 
-static int afterpoll_check_fd(libxl_ctx *ctx,
+static int afterpoll_check_fd(libxl__poller *poller,
                               const struct pollfd *fds, int nfds,
                               int fd, int events)
     /* returns mask of events which were requested and occurred */
 {
-    if (fd >= ctx->fd_rindex_allocd)
+    if (fd >= poller->fd_rindex_allocd)
         /* added after we went into poll, have to try again */
         return 0;
 
-    int slot = ctx->fd_rindex[fd];
+    int slot = poller->fd_rindex[fd];
 
     if (slot >= nfds)
         /* stale slot entry; again, added afterwards */
@@ -630,22 +640,31 @@
     return revents;
 }
 
-static void afterpoll_internal(libxl__egc *egc,
+static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
                                int nfds, const struct pollfd *fds,
                                struct timeval now)
 {
     EGC_GC;
     libxl__ev_fd *efd;
 
+
     LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
         if (!efd->events)
             continue;
 
-        int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
+        int revents = afterpoll_check_fd(poller,fds,nfds, efd->fd,efd->events);
         if (revents)
             efd->func(egc, efd, efd->fd, efd->events, revents);
     }
 
+    if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
+        char buf[256];
+        int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
+        if (r < 0)
+            if (errno != EINTR && errno != EWOULDBLOCK)
+                LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
+    }
+
     for (;;) {
         libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
         if (!etime)
@@ -667,7 +686,7 @@
 {
     EGC_INIT(ctx);
     CTX_LOCK;
-    afterpoll_internal(egc, nfds, fds, now);
+    afterpoll_internal(egc, &ctx->poller_app, nfds, fds, now);
     CTX_UNLOCK;
     EGC_FREE;
 }
@@ -790,7 +809,10 @@
         LIBXL_TAILQ_INSERT_TAIL(&egc->occurred_for_callback, event, link);
         return;
     } else {
+        libxl__poller *poller;
         LIBXL_TAILQ_INSERT_TAIL(&CTX->occurred, event, link);
+        LIBXL_LIST_FOREACH(poller, &CTX->pollers_event, entry)
+            libxl__poller_wakeup(egc, poller);
     }
 }
 
@@ -858,7 +880,94 @@
     return rc;
 }
 
-static int eventloop_iteration(libxl__egc *egc) {
+/*
+ * Manipulation of pollers
+ */
+
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
+{
+    int r, rc;
+    p->fd_polls = 0;
+    p->fd_rindex = 0;
+
+    r = pipe(p->wakeup_pipe);
+    if (r) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
+        rc = ERROR_FAIL;
+        goto out;
+    }
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
+    if (rc) goto out;
+
+    rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
+    if (rc) goto out;
+
+    return 0;
+
+ out:
+    libxl__poller_dispose(p);
+    return rc;
+}
+
+void libxl__poller_dispose(libxl__poller *p)
+{
+    if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
+    if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);
+    free(p->fd_polls);
+    free(p->fd_rindex);
+}
+
+libxl__poller *libxl__poller_get(libxl_ctx *ctx)
+{
+    /* must be called with ctx locked */
+    int rc;
+
+    libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
+    if (p)
+        return p;
+
+    p = malloc(sizeof(*p));
+    if (!p) {
+        LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
+        return 0;
+    }
+    memset(p, 0, sizeof(*p));
+
+    rc = libxl__poller_init(ctx, p);
+    if (rc) return NULL;
+
+    return p;
+}
+
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
+{
+    LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
+}
+
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
+{
+    static const char buf[1] = "";
+
+    for (;;) {
+        int r = write(p->wakeup_pipe[1], buf, 1);
+        if (r==1) return;
+        assert(r==-1);
+        if (errno == EINTR) continue;
+        if (errno == EWOULDBLOCK) return;
+        LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
+        return;
+    }
+}
+
+/*
+ * Main event loop iteration
+ */
+
+static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
+    /* The CTX must be locked EXACTLY ONCE so that this function
+     * can unlock it when it polls.
+     */
     EGC_GC;
     int rc;
     struct timeval now;
@@ -871,23 +980,27 @@
     int timeout;
 
     for (;;) {
-        int nfds = CTX->fd_polls_allocd;
+        int nfds = poller->fd_polls_allocd;
         timeout = -1;
-        rc = beforepoll_internal(gc, &nfds, CTX->fd_polls, &timeout, now);
+        rc = beforepoll_internal(gc, poller, &nfds, poller->fd_polls,
+                                 &timeout, now);
         if (!rc) break;
         if (rc != ERROR_BUFFERFULL) goto out;
 
         struct pollfd *newarray =
             (nfds > INT_MAX / sizeof(struct pollfd) / 2) ? 0 :
-            realloc(CTX->fd_polls, sizeof(*newarray) * nfds);
+            realloc(poller->fd_polls, sizeof(*newarray) * nfds);
 
         if (!newarray) { rc = ERROR_NOMEM; goto out; }
 
-        CTX->fd_polls = newarray;
-        CTX->fd_polls_allocd = nfds;
+        poller->fd_polls = newarray;
+        poller->fd_polls_allocd = nfds;
     }
 
-    rc = poll(CTX->fd_polls, CTX->fd_polls_allocd, timeout);
+    CTX_UNLOCK;
+    rc = poll(poller->fd_polls, poller->fd_polls_allocd, timeout);
+    CTX_LOCK;
+
     if (rc < 0) {
         if (errno == EINTR)
             return 0; /* will go round again if caller requires */
@@ -900,7 +1013,8 @@
     rc = libxl__gettimeofday(gc, &now);
     if (rc) goto out;
 
-    afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
+    afterpoll_internal(egc, poller,
+                       poller->fd_polls_allocd, poller->fd_polls, now);
 
     CTX_UNLOCK;
 
@@ -914,15 +1028,19 @@
                      libxl_event_predicate *pred, void *pred_user)
 {
     int rc;
+    libxl__poller *poller = NULL;
 
     EGC_INIT(ctx);
     CTX_LOCK;
 
+    poller = libxl__poller_get(ctx);
+    if (!poller) { rc = ERROR_FAIL; goto out; }
+
     for (;;) {
         rc = event_check_internal(egc, event_r, typemask, pred, pred_user);
         if (rc != ERROR_NOT_READY) goto out;
 
-        rc = eventloop_iteration(egc);
+        rc = eventloop_iteration(egc, poller);
         if (rc) goto out;
 
         /* we unlock and cleanup the egc each time we go through this loop,
@@ -936,6 +1054,8 @@
     }
 
  out:
+    libxl__poller_put(ctx, poller);
+
     CTX_UNLOCK;
     EGC_FREE;
     return rc;
diff -r d503bdfaba23 -r cd4bff9d4050 tools/libxl/libxl_internal.h
--- a/tools/libxl/libxl_internal.h      Fri Jan 27 17:01:23 2012 +0000
+++ b/tools/libxl/libxl_internal.h      Fri Jan 27 17:01:23 2012 +0000
@@ -207,6 +207,33 @@
 _hidden void
 libxl__evdisable_disk_eject(libxl__gc*, libxl_evgen_disk_eject*);
 
+typedef struct libxl__poller libxl__poller;
+struct libxl__poller {
+    /*
+     * These are used to allow other threads to wake up a thread which
+     * may be stuck in poll, because whatever it was waiting for
+     * hadn't happened yet.  Threads which generate events will write
+     * a byte to each pipe.  A thread which is waiting will empty its
+     * own pipe, and put its poller on the pollers_event list, before
+     * releasing the ctx lock and going into poll; when it comes out
+     * of poll it will take the poller off the pollers_event list.
+     *
+     * When a thread is done with a poller it should put it onto
+     * pollers_idle, where it can be reused later.
+     *
+     * The "poller_app" is never idle, but is sometimes on
+     * pollers_event.
+     */
+    LIBXL_LIST_ENTRY(libxl__poller) entry;
+
+    struct pollfd *fd_polls;
+    int fd_polls_allocd;
+
+    int fd_rindex_allocd;
+    int *fd_rindex; /* see libxl_osevent_beforepoll */
+
+    int wakeup_pipe[2]; /* 0 means no fd allocated */
+};
 
 struct libxl__ctx {
     xentoollog_logger *lg;
@@ -237,10 +264,9 @@
       /* See the comment for OSEVENT_HOOK_INTERN in libxl_event.c
        * for restrictions on the use of the osevent fields. */
 
-    struct pollfd *fd_polls;
-    int fd_polls_allocd;
-    int fd_rindex_allocd;
-    int *fd_rindex; /* see libxl_osevent_beforepoll */
+    libxl__poller poller_app; /* libxl_osevent_beforepoll and _afterpoll */
+    LIBXL_LIST_HEAD(, libxl__poller) pollers_event, pollers_idle;
+
     LIBXL_LIST_HEAD(, libxl__ev_fd) efds;
     LIBXL_TAILQ_HEAD(, libxl__ev_time) etimes;
 
@@ -526,6 +552,22 @@
     libxl__event_disaster(egc, msg, errnoval, type, __FILE__,__LINE__,__func__)
 
 
+/* Fills in, or disposes of, the resources held by, a poller whose
+ * space the caller has allocated.  ctx must be locked. */
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p);
+void libxl__poller_dispose(libxl__poller *p);
+
+/* Obtain a fresh poller from malloc or the idle list, and put it
+ * away again afterwards.  _get can fail, returning NULL.
+ * ctx must be locked. */
+libxl__poller *libxl__poller_get(libxl_ctx *ctx);
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p);
+
+/* Notifies whoever is polling using p that they should wake up.
+ * ctx must be locked. */
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p);
+
+
 /* from xl_dom */
 _hidden libxl_domain_type libxl__domain_type(libxl__gc *gc, uint32_t domid);
 _hidden int libxl__domain_shutdown_reason(libxl__gc *gc, uint32_t domid);

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.