[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses



On 01/06/2017 10:05 AM, Juergen Gross wrote:
> Handling of multiple concurrent Xenstore accesses through xenbus driver
> either from the kernel or user land is rather lame today: xenbus is
> capable to have one access active only at one point of time.
>
> Rewrite xenbus to handle multiple requests concurrently by making use
> of the request id of the Xenstore protocol. This requires to:
>
> - Instead of blocking inside xb_read() when trying to read data from
>   the xenstore ring buffer do so only in the main loop of
>   xenbus_thread().
>
> - Instead of doing writes to the xenstore ring buffer in the context of
>   the caller just queue the request and do the write in the dedicated
>   xenbus thread.
>
> - Instead of just forwarding the request id specified by the caller of
>   xenbus to xenstore use a xenbus internal unique request id. This will
>   allow multiple outstanding requests.
>
> - Modify the locking scheme in order to allow multiple requests being
>   active in parallel.
>
> - Instead of waiting for the reply of a user's xenstore request after
>   writing the request to the xenstore ring buffer return directly to
>   the caller and do the waiting in the read path.
>
> Additionally signal handling was optimized by avoiding waking up the
> xenbus thread or sending an event to Xenstore in case the addressed
> entity is known to be running already.
>
> As a result communication with Xenstore is sped up by a factor of up
> to 5: depending on the request type (read or write) and the amount of
> data transferred the gain was at least 20% (small reads) and went up to
> a factor of 5 for large writes.
>
> In the end some more rough edges of xenbus have been smoothed:
>
> - Handling of memory shortage when reading from xenstore ring buffer in
>   the xenbus driver was not optimal: it was busy looping and issuing a
>   warning in each loop.
>
> - In case of xenstore not running in dom0 but in a stubdom we end up
>   with two xenbus threads running as the initialization of xenbus in
>   dom0 expecting a local xenstored will be redone later when connecting
>   to the xenstore domain. Up to now this was no problem as locking
>   would prevent the two xenbus threads interfering with each other, but
>   this was just a waste of kernel resources.
>
> - An out of memory situation while writing to or reading from the
>   xenstore ring buffer no longer will lead to a possible loss of
>   synchronization with xenstore.
>
> - The user read and write part are now interruptible by signals.
>
> Signed-off-by: Juergen Gross <jgross@xxxxxxxx>
> ---
> I'm aware that the changes are quite large. I thought about sending a
> version split into multiple patches, but a lot of lines would have been
> touched by more than one patch. I still have the multiple patch variant
> lying around - this patch is split into 11 smaller ones. While all
> steps of this larger series is operational some steps are not optimal
> as they are even slower than the original version of xenbus.
>
> Nevertheless I can send the large series if there are requests for it.

I will comment only on xen_comms changes for now since otherwise I am
afraid it may be difficult to keep track of conversation.


> diff --git a/drivers/xen/xenbus/xenbus_comms.c 
> b/drivers/xen/xenbus/xenbus_comms.c
> index c21ec02..fa054ca 100644
> --- a/drivers/xen/xenbus/xenbus_comms.c
> +++ b/drivers/xen/xenbus/xenbus_comms.c
> @@ -34,6 +34,7 @@
>  
>  #include <linux/wait.h>
>  #include <linux/interrupt.h>
> +#include <linux/kthread.h>
>  #include <linux/sched.h>
>  #include <linux/err.h>
>  #include <xen/xenbus.h>
> @@ -42,11 +43,40 @@
>  #include <xen/page.h>
>  #include "xenbus.h"
>  
> +struct xs_thread_state_write {
> +     struct xb_req_data *req;
> +     int idx;
> +     unsigned int used;

"written" or "sent"?

> +};
> +
> +struct xs_thread_state_read {
> +     struct xsd_sockmsg msg;
> +     char *body;
> +     union {
> +             void *alloc;
> +             struct xs_watch_event *watch;
> +     };
> +     bool in_msg;
> +     bool in_hdr;

It may be better to keep track of which state we are in using a bitmap.
Otherwise it easy to lose track of one or the other.

> +     unsigned int used;

"read" or"received"?

> +};

Both of these are private to process_msg/process_write so perhaps they
can be declared in those routines' scopes.

> +
> +/* A list of replies. Currently only one will ever be outstanding. */
> +LIST_HEAD(xs_reply_list);
> +
> +/* A list of write requests. */
> +LIST_HEAD(xb_write_list);
> +DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> +DEFINE_MUTEX(xb_write_mutex);
> +
> +/* Protect xenbus reader thread against save/restore. */
> +DEFINE_MUTEX(xs_response_mutex);
> +
>  static int xenbus_irq;
> +static struct task_struct *xenbus_task;
>  
>  static DECLARE_WORK(probe_work, xenbus_probe);
>  
> -static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>  
>  static irqreturn_t wake_waiting(int irq, void *unused)
>  {
> @@ -84,30 +114,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX 
> cons,
>       return buf + MASK_XENSTORE_IDX(cons);
>  }
>  
> +static int xb_data_to_write(void)
> +{
> +     struct xenstore_domain_interface *intf = xen_store_interface;
> +
> +     return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE &&
> +             !list_empty(&xb_write_list);
> +}
> +
>  /**
>   * xb_write - low level write
>   * @data: buffer to send
>   * @len: length of buffer
>   *
> - * Returns 0 on success, error otherwise.
> + * Returns number of bytes written or -err.
>   */
> -int xb_write(const void *data, unsigned len)
> +static int xb_write(const void *data, unsigned int len)
>  {
>       struct xenstore_domain_interface *intf = xen_store_interface;
>       XENSTORE_RING_IDX cons, prod;
> -     int rc;
> +     unsigned int bytes = 0;
>  
>       while (len != 0) {
>               void *dst;
>               unsigned int avail;
>  
> -             rc = wait_event_interruptible(
> -                     xb_waitq,
> -                     (intf->req_prod - intf->req_cons) !=
> -                     XENSTORE_RING_SIZE);
> -             if (rc < 0)
> -                     return rc;
> -
>               /* Read indexes, then verify. */
>               cons = intf->req_cons;
>               prod = intf->req_prod;
> @@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len)
>                       intf->req_cons = intf->req_prod = 0;
>                       return -EIO;
>               }
> -
> -             dst = get_output_chunk(cons, prod, intf->req, &avail);
> -             if (avail == 0)
> -                     continue;
> -             if (avail > len)
> -                     avail = len;
> +             if (!xb_data_to_write())
> +                     return bytes;
>  
>               /* Must write data /after/ reading the consumer index. */
>               virt_mb();
>  
> +             dst = get_output_chunk(cons, prod, intf->req, &avail);
> +             if (avail == 0)
> +                     continue;

Should we continue the loop here or return? We are waiting for the
reader to get stuff off the ring.


> +             if (avail > len)
> +                     avail = len;
> +
>               memcpy(dst, data, avail);
>               data += avail;
>               len -= avail;
> +             bytes += avail;
>  
>               /* Other side must not see new producer until data is there. */
>               virt_wmb();
>               intf->req_prod += avail;
>  
>               /* Implies mb(): other side will see the updated producer. */
> -             notify_remote_via_evtchn(xen_store_evtchn);
> +             if (prod <= intf->req_cons)
> +                     notify_remote_via_evtchn(xen_store_evtchn);
>       }
>  
> -     return 0;
> +     return bytes;
>  }
>  
> -int xb_data_to_read(void)
> +static int xb_data_to_read(void)
>  {
>       struct xenstore_domain_interface *intf = xen_store_interface;
>       return (intf->rsp_cons != intf->rsp_prod);
>  }
>  
> -int xb_wait_for_data_to_read(void)
> -{
> -     return wait_event_interruptible(xb_waitq, xb_data_to_read());
> -}
> -
> -int xb_read(void *data, unsigned len)
> +static int xb_read(void *data, unsigned int len)
>  {
>       struct xenstore_domain_interface *intf = xen_store_interface;
>       XENSTORE_RING_IDX cons, prod;
> -     int rc;
> +     unsigned int bytes = 0;
>  
>       while (len != 0) {
>               unsigned int avail;
>               const char *src;
>  
> -             rc = xb_wait_for_data_to_read();
> -             if (rc < 0)
> -                     return rc;
> -
>               /* Read indexes, then verify. */
>               cons = intf->rsp_cons;
>               prod = intf->rsp_prod;
> +             if (cons == prod)
> +                     return bytes;
> +
>               if (!check_indexes(cons, prod)) {
>                       intf->rsp_cons = intf->rsp_prod = 0;
>                       return -EIO;
> @@ -185,17 +214,229 @@ int xb_read(void *data, unsigned len)
>               memcpy(data, src, avail);
>               data += avail;
>               len -= avail;
> +             bytes += avail;
>  
>               /* Other side must not see free space until we've copied out */
>               virt_mb();
>               intf->rsp_cons += avail;
>  
> -             pr_debug("Finished read of %i bytes (%i to go)\n", avail, len);
> -
>               /* Implies mb(): other side will see the updated consumer. */
> -             notify_remote_via_evtchn(xen_store_evtchn);
> +             if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> +                     notify_remote_via_evtchn(xen_store_evtchn);
>       }
>  
> +     return bytes;
> +}
> +
> +static int process_msg(void)
> +{
> +     static struct xs_thread_state_read state;
> +     struct xb_req_data *req;
> +     int err;
> +     unsigned int len;
> +
> +     if (!state.in_msg) {
> +             state.in_msg = true;
> +             state.in_hdr = true;
> +             state.used = 0;
> +
> +             /*
> +              * We must disallow save/restore while reading a message.
> +              * A partial read across s/r leaves us out of sync with
> +              * xenstored.
> +              */
> +             mutex_lock(&xs_response_mutex);
> +
> +             if (!xb_data_to_read()) {
> +                     /* We raced with save/restore: pending data 'gone'. */
> +                     mutex_unlock(&xs_response_mutex);
> +                     state.in_msg = false;
> +                     return 0;
> +             }
> +     }
> +
> +     if (state.in_hdr) {
> +             if (state.used != sizeof(state.msg)) {
> +                     err = xb_read((void *)&state.msg + state.used,
> +                                   sizeof(state.msg) - state.used);
> +                     if (err < 0)
> +                             goto out;
> +                     state.used += err;
> +                     if (state.used != sizeof(state.msg))
> +                             return 0;

Would it be possible to do locking at the caller? I understand that you
are trying to hold the lock across multiple invocations of this function
but it feels somewhat counter-intuitive and bug-prone.

If it's not possible then at least please add a comment explaining
locking algorithm.

> +                     if (state.msg.len > XENSTORE_PAYLOAD_MAX) {
> +                             err = -EINVAL;
> +                             goto out;
> +                     }
> +             }
> +
> +             len = state.msg.len + 1;
> +             if (state.msg.type == XS_WATCH_EVENT)
> +                     len += sizeof(*state.watch);
> +
> +             state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH);

Why can't you kmalloc to state.body only when type!=XS_WATCH_EVENT ?

> +             if (!state.alloc)
> +                     return -ENOMEM;
> +
> +             if (state.msg.type == XS_WATCH_EVENT)
> +                     state.body = state.watch->body;
> +             else
> +                     state.body = state.alloc;
> +             state.in_hdr = false;
> +             state.used = 0;
> +     }
> +
> +     err = xb_read(state.body + state.used, state.msg.len - state.used);
> +     if (err < 0)
> +             goto out;
> +
> +     state.used += err;
> +     if (state.used != state.msg.len)
> +             return 0;
> +
> +     state.body[state.msg.len] = '\0';
> +
> +     if (state.msg.type == XS_WATCH_EVENT) {
> +             state.watch->len = state.msg.len;
> +             err = xs_watch_msg(state.watch);
> +     } else {
> +             err = -ENOENT;
> +             mutex_lock(&xb_write_mutex);
> +             list_for_each_entry(req, &xs_reply_list, list) {
> +                     if (req->msg.req_id == state.msg.req_id) {
> +                             if (req->state == xb_req_state_wait_reply) {
> +                                     req->msg.type = state.msg.type;
> +                                     req->msg.len = state.msg.len;
> +                                     req->body = state.body;
> +                                     req->state = xb_req_state_got_reply;
> +                                     list_del(&req->list);
> +                                     req->cb(req);
> +                             } else {
> +                                     list_del(&req->list);
> +                                     kfree(req);
> +                             }
> +                             err = 0;
> +                             break;
> +                     }
> +             }
> +             mutex_unlock(&xb_write_mutex);
> +             if (err)
> +                     goto out;
> +     }
> +
> +     mutex_unlock(&xs_response_mutex);
> +
> +     state.in_msg = false;
> +     state.alloc = NULL;
> +     return err;
> +
> + out:
> +     mutex_unlock(&xs_response_mutex);
> +     state.in_msg = false;
> +     kfree(state.alloc);
> +     state.alloc = NULL;
> +     return err;
> +}
> +
> +static int process_writes(void)
> +{
> +     static struct xs_thread_state_write state;
> +     void *base;
> +     unsigned int len;
> +     int err = 0;
> +
> +     if (!xb_data_to_write())
> +             return 0;
> +
> +     mutex_lock(&xb_write_mutex);
> +
> +     if (!state.req) {
> +             state.req = list_first_entry(&xb_write_list,
> +                                          struct xb_req_data, list);
> +             state.idx = -1;
> +             state.used = 0;
> +     }
> +
> +     if (state.req->state == xb_req_state_aborted)
> +             goto out_err;
> +
> +     while (state.idx < state.req->num_vecs) {
> +             if (state.idx < 0) {
> +                     base = &state.req->msg;
> +                     len = sizeof(state.req->msg);
> +             } else {
> +                     base = state.req->vec[state.idx].iov_base;
> +                     len = state.req->vec[state.idx].iov_len;
> +             }
> +             err = xb_write(base + state.used, len - state.used);
> +             if (err < 0)
> +                     goto out_err;
> +             state.used += err;
> +             if (state.used != len)
> +                     goto out;
> +
> +             state.idx++;
> +             state.used = 0;
> +     }
> +
> +     /*
> +      * You would expect the following to be racy, but as the response is
> +      * being read by our thread there is no risk of req being freed
> +      * under our feet.
> +      */

I don't think I understand this (and it's missing a "so" or something
like that between "thread" and "there"). If this is not racy, why are we
doing this under xb_write_mutex?

> +     list_del(&state.req->list);
> +     state.req->state = xb_req_state_wait_reply;
> +     list_add_tail(&state.req->list, &xs_reply_list);
> +     state.req = NULL;
> +
> + out:
> +     mutex_unlock(&xb_write_mutex);
> +
> +     return 0;
> +
> + out_err:
> +     state.req->msg.type = XS_ERROR;
> +     state.req->err = err;

You don't seem to need this for xb_req_state_aborted since you are
freeing state_req. OTOH, why shouldn't aborted requests generate an
error reply as well?


> +     list_del(&state.req->list);
> +     if (state.req->state == xb_req_state_aborted)
> +             kfree(state.req);
> +     else {
> +             state.req->state = xb_req_state_got_reply;
> +             wake_up(&state.req->wq);
> +     }
> +
> +     mutex_unlock(&xb_write_mutex);
> +
> +     state.req = NULL;
> +
> +     return err;
> +}
> +
> +static int xb_thread_work(void)
> +{
> +     return xb_data_to_read() || xb_data_to_write();
> +}
> +
> +static int xenbus_thread(void *unused)
> +{
> +     int err;
> +
> +     while (!kthread_should_stop()) {
> +             if (wait_event_interruptible(xb_waitq, xb_thread_work()))
> +                     continue;
> +
> +             err = process_msg();
> +             if (err == -ENOMEM)
> +                     schedule();
> +             else if (err)
> +                     pr_warn("error %d while reading message\n", err);
> +
> +             err = process_writes();
> +             if (err)
> +                     pr_warn("error %d while writing message\n", err);

Is there a chance that errors are persistent and you then spam the log?


-boris

> +     }
> +
> +     xenbus_task = NULL;
>       return 0;
>  }
>  
> @@ -223,6 +464,7 @@ int xb_init_comms(void)
>               rebind_evtchn_irq(xen_store_evtchn, xenbus_irq);
>       } else {
>               int err;
> +
>               err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting,
>                                               0, "xenbus", &xb_waitq);
>               if (err < 0) {
> @@ -231,6 +473,13 @@ int xb_init_comms(void)
>               }
>  
>               xenbus_irq = err;
> +
> +             if (!xenbus_task) {
> +                     xenbus_task = kthread_run(xenbus_thread, NULL,
> +                                               "xenbus");
> +                     if (IS_ERR(xenbus_task))
> +                             return PTR_ERR(xenbus_task);
> +             }
>       }
>  
>       return 0;
>
>
>   
>
>



_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
https://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.