[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH 07/11] mini-os/xenbus: Unify watch and reply queues



Ian Jackson, le Fri 20 Jun 2014 20:04:46 +0100, a écrit :
> We are going to want to provide an interface to xenbus which does not
> reorder messages for a particular user.  In particular, the reply to a
> watch or unwatch should not be reordered with respect to watch events.
> 
> To this end we arrange that both replies and watches use the same kind
> of queue inside the xenbus driver.  Currently this queue type is only
> exposed outside the xenbus driver for use with watches, as before.
> 
> Important functional changes in this patch include:
> 
> * There is a separate scheduler wait queue for each reply queue,
>   rather than one for all watches and one for each outstanding reply.
>   This wait queue lives in the reply queue struct.
> 
> * There are abstracted-away internal functions for removing (and,
>   indeed, awaiting) events.  xenbus_wait_for_watch_return becomes a
>   trivial wrapper around await_event.
> 
> * Handling of the replies to requests is formalised, using the new
>   queues.  Now a single reply queue might be used for multiple
>   requests (although there are no callers that do this).
> 
> Other changes are:
> 
> * The latent bug in xenbus_msg_reply, which assumed no spurious
>   wakeups, is gone.
> 
> * The "in_use" flag in the request array can be done away with, since
>   we can use the reply_queue pointer value instead.
> 
> * The callers of allocate_xenbus_id (currently, only
>   xenbus_msg_reply), have to initialise a xenbus_event_queue and
>   provide it to allocate_xenbus_id.
> 
> * Abolished the xenbus_watch_queue waitq in favour of the waitq inside
>   the xenbus_default_watch_events event queue.
> 
> * Abolished a duplicate assignment to in_use in release_xenbus_id.
> 
> Signed-off-by: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>

Acked-by: Samuel Thibault <samuel.thibault@xxxxxxxxxxxx>

> ---
>  include/mini-os/xenbus.h |    3 ++
>  xen/xenbus/xenbus.c      |   82 
> +++++++++++++++++++++++++++++-----------------
>  2 files changed, 55 insertions(+), 30 deletions(-)
> 
> diff --git a/include/mini-os/xenbus.h b/include/mini-os/xenbus.h
> index abf8765..7e70de0 100644
> --- a/include/mini-os/xenbus.h
> +++ b/include/mini-os/xenbus.h
> @@ -2,6 +2,8 @@
>  #define MINIOS_XENBUS_H__
>  
>  #include <xen/io/xenbus.h>
> +#include <mini-os/sched.h>
> +#include <mini-os/waittypes.h>
>  #include <mini-os/queue.h>
>  
>  typedef unsigned long xenbus_transaction_t;
> @@ -30,6 +32,7 @@ struct xenbus_event {
>  };
>  struct xenbus_event_queue {
>      MINIOS_STAILQ_HEAD(, xenbus_event) events;
> +    struct wait_queue_head waitq;
>  };
>  
>  void xenbus_event_queue_init(struct xenbus_event_queue *queue);
> diff --git a/xen/xenbus/xenbus.c b/xen/xenbus/xenbus.c
> index 947b5c8..d2e59b3 100644
> --- a/xen/xenbus/xenbus.c
> +++ b/xen/xenbus/xenbus.c
> @@ -46,7 +46,6 @@
>  static struct xenstore_domain_interface *xenstore_buf;
>  static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>  static spinlock_t xb_lock = SPIN_LOCK_UNLOCKED; /* protects xenbus req ring 
> */
> -DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
>  
>  struct xenbus_event_queue xenbus_default_watch_queue;
>  struct watch {
> @@ -57,8 +56,8 @@ struct watch {
>  static MINIOS_LIST_HEAD(, watch) watches;
>  struct xenbus_req_info 
>  {
> -    int in_use:1;
> -    struct wait_queue_head waitq;
> +    struct xenbus_event_queue *reply_queue; /* non-0 iff in use */
> +    struct xenbus_event *for_queue;
>      void *reply;
>  };
>  
> @@ -66,6 +65,39 @@ struct xenbus_req_info
>  void xenbus_event_queue_init(struct xenbus_event_queue *queue)
>  {
>      MINIOS_STAILQ_INIT(&queue->events);
> +    init_waitqueue_head(&queue->waitq);
> +}
> +
> +static struct xenbus_event *remove_event(struct xenbus_event_queue *queue)
> +{
> +    struct xenbus_event *event;
> +
> +    event = MINIOS_STAILQ_FIRST(&queue->events);
> +    if (!event)
> +        goto out;
> +    MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
> +
> + out:
> +    return event;
> +}
> +
> +static void queue_event(struct xenbus_event_queue *queue,
> +                        struct xenbus_event *event)
> +{
> +    MINIOS_STAILQ_INSERT_TAIL(&queue->events, event, entry);
> +    wake_up(&queue->waitq);
> +}
> +
> +static struct xenbus_event *await_event(struct xenbus_event_queue *queue)
> +{
> +    struct xenbus_event *event;
> +    DEFINE_WAIT(w);
> +    while (!(event = remove_event(queue))) {
> +        add_waiter(w, queue->waitq);
> +        schedule();
> +    }
> +    remove_waiter(w, queue->waitq);
> +    return event;
>  }
>  
>  
> @@ -89,15 +121,9 @@ static void memcpy_from_ring(const void *Ring,
>  char **xenbus_wait_for_watch_return(struct xenbus_event_queue *queue)
>  {
>      struct xenbus_event *event;
> -    DEFINE_WAIT(w);
>      if (!queue)
>          queue = &xenbus_default_watch_queue;
> -    while (!(event = MINIOS_STAILQ_FIRST(&queue->events))) {
> -        add_waiter(w, xenbus_watch_queue);
> -        schedule();
> -    }
> -    remove_waiter(w, xenbus_watch_queue);
> -    MINIOS_STAILQ_REMOVE_HEAD(&queue->events, entry);
> +    event = await_event(queue);
>      return &event->path;
>  }
>  
> @@ -256,8 +282,7 @@ static void xenbus_thread_func(void *ign)
>                      }
>  
>                  if (events) {
> -                    MINIOS_STAILQ_INSERT_TAIL(&events->events, event, entry);
> -                    wake_up(&xenbus_watch_queue);
> +                    queue_event(events, event);
>                  } else {
>                      printk("unexpected watch token %s\n", event->token);
>                      free(event);
> @@ -272,7 +297,8 @@ static void xenbus_thread_func(void *ign)
>                      MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
>                      msg.len + sizeof(msg));
>                  xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -                wake_up(&req_info[msg.req_id].waitq);
> +                queue_event(req_info[msg.req_id].reply_queue,
> +                            req_info[msg.req_id].for_queue);
>              }
>          }
>      }
> @@ -291,11 +317,10 @@ static DECLARE_WAIT_QUEUE_HEAD(req_wq);
>  /* Release a xenbus identifier */
>  static void release_xenbus_id(int id)
>  {
> -    BUG_ON(!req_info[id].in_use);
> +    BUG_ON(!req_info[id].reply_queue);
>      spin_lock(&req_lock);
> -    req_info[id].in_use = 0;
> +    req_info[id].reply_queue = 0;
>      nr_live_reqs--;
> -    req_info[id].in_use = 0;
>      if (nr_live_reqs == NR_REQS - 1)
>          wake_up(&req_wq);
>      spin_unlock(&req_lock);
> @@ -303,7 +328,8 @@ static void release_xenbus_id(int id)
>  
>  /* Allocate an identifier for a xenbus request.  Blocks if none are
>     available. */
> -static int allocate_xenbus_id(void)
> +static int allocate_xenbus_id(struct xenbus_event_queue *reply_queue,
> +                              struct xenbus_event *for_queue)
>  {
>      static int probe;
>      int o_probe;
> @@ -320,16 +346,16 @@ static int allocate_xenbus_id(void)
>      o_probe = probe;
>      for (;;) 
>      {
> -        if (!req_info[o_probe].in_use)
> +        if (!req_info[o_probe].reply_queue)
>              break;
>          o_probe = (o_probe + 1) % NR_REQS;
>          BUG_ON(o_probe == probe);
>      }
>      nr_live_reqs++;
> -    req_info[o_probe].in_use = 1;
> +    req_info[o_probe].reply_queue = reply_queue;
> +    req_info[o_probe].for_queue = for_queue;
>      probe = (o_probe + 1) % NR_REQS;
>      spin_unlock(&req_lock);
> -    init_waitqueue_head(&req_info[o_probe].waitq);
>  
>      return o_probe;
>  }
> @@ -448,22 +474,18 @@ xenbus_msg_reply(int type,
>                int nr_reqs)
>  {
>      int id;
> -    DEFINE_WAIT(w);
>      struct xsd_sockmsg *rep;
> +    struct xenbus_event_queue queue;
> +    struct xenbus_event event_buf;
>  
> -    /*
> -     * XXX: should use a predicate loop instead of blindly trusting
> -     * that $someone didn't wake us up
> -     */
> +    xenbus_event_queue_init(&queue);
>  
> -    id = allocate_xenbus_id();
> -    add_waiter(w, req_info[id].waitq);
> +    id = allocate_xenbus_id(&queue,&event_buf);
>  
>      xb_write(type, id, trans, io, nr_reqs);
>  
> -    schedule();
> -    remove_waiter(w, req_info[id].waitq);
> -    wake(current);
> +    struct xenbus_event *event = await_event(&queue);
> +    BUG_ON(event != &event_buf);
>  
>      rep = req_info[id].reply;
>      BUG_ON(rep->req_id != id);
> -- 
> 1.7.10.4
> 
> 
> _______________________________________________
> Xen-devel mailing list
> Xen-devel@xxxxxxxxxxxxx
> http://lists.xen.org/xen-devel
> 

-- 
Samuel
 La carte réseau fournie par cybercable (sn2000) ne va-t-elle que sur
 bus isa ou peut-on aussi la mettre sur bus PCI.
 Merci de m'éclairer.
 -+- JP in le Neuneu Pète un Câble : Une carte dans chaque port -+-

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.