[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH 3/3] xen: optimize xenbus driver for multiple concurrent xenstore accesses
On 01/06/2017 10:05 AM, Juergen Gross wrote: > Handling of multiple concurrent Xenstore accesses through xenbus driver > either from the kernel or user land is rather lame today: xenbus is > capable to have one access active only at one point of time. > > Rewrite xenbus to handle multiple requests concurrently by making use > of the request id of the Xenstore protocol. This requires to: > > - Instead of blocking inside xb_read() when trying to read data from > the xenstore ring buffer do so only in the main loop of > xenbus_thread(). > > - Instead of doing writes to the xenstore ring buffer in the context of > the caller just queue the request and do the write in the dedicated > xenbus thread. > > - Instead of just forwarding the request id specified by the caller of > xenbus to xenstore use a xenbus internal unique request id. This will > allow multiple outstanding requests. > > - Modify the locking scheme in order to allow multiple requests being > active in parallel. > > - Instead of waiting for the reply of a user's xenstore request after > writing the request to the xenstore ring buffer return directly to > the caller and do the waiting in the read path. > > Additionally signal handling was optimized by avoiding waking up the > xenbus thread or sending an event to Xenstore in case the addressed > entity is known to be running already. > > As a result communication with Xenstore is sped up by a factor of up > to 5: depending on the request type (read or write) and the amount of > data transferred the gain was at least 20% (small reads) and went up to > a factor of 5 for large writes. > > In the end some more rough edges of xenbus have been smoothed: > > - Handling of memory shortage when reading from xenstore ring buffer in > the xenbus driver was not optimal: it was busy looping and issuing a > warning in each loop. > > - In case of xenstore not running in dom0 but in a stubdom we end up > with two xenbus threads running as the initialization of xenbus in > dom0 expecting a local xenstored will be redone later when connecting > to the xenstore domain. Up to now this was no problem as locking > would prevent the two xenbus threads interfering with each other, but > this was just a waste of kernel resources. > > - An out of memory situation while writing to or reading from the > xenstore ring buffer no longer will lead to a possible loss of > synchronization with xenstore. > > - The user read and write part are now interruptible by signals. > > Signed-off-by: Juergen Gross <jgross@xxxxxxxx> > --- > I'm aware that the changes are quite large. I thought about sending a > version split into multiple patches, but a lot of lines would have been > touched by more than one patch. I still have the multiple patch variant > lying around - this patch is split into 11 smaller ones. While all > steps of this larger series is operational some steps are not optimal > as they are even slower than the original version of xenbus. > > Nevertheless I can send the large series if there are requests for it. I will comment only on xen_comms changes for now since otherwise I am afraid it may be difficult to keep track of conversation. > diff --git a/drivers/xen/xenbus/xenbus_comms.c > b/drivers/xen/xenbus/xenbus_comms.c > index c21ec02..fa054ca 100644 > --- a/drivers/xen/xenbus/xenbus_comms.c > +++ b/drivers/xen/xenbus/xenbus_comms.c > @@ -34,6 +34,7 @@ > > #include <linux/wait.h> > #include <linux/interrupt.h> > +#include <linux/kthread.h> > #include <linux/sched.h> > #include <linux/err.h> > #include <xen/xenbus.h> > @@ -42,11 +43,40 @@ > #include <xen/page.h> > #include "xenbus.h" > > +struct xs_thread_state_write { > + struct xb_req_data *req; > + int idx; > + unsigned int used; "written" or "sent"? > +}; > + > +struct xs_thread_state_read { > + struct xsd_sockmsg msg; > + char *body; > + union { > + void *alloc; > + struct xs_watch_event *watch; > + }; > + bool in_msg; > + bool in_hdr; It may be better to keep track of which state we are in using a bitmap. Otherwise it easy to lose track of one or the other. > + unsigned int used; "read" or"received"? > +}; Both of these are private to process_msg/process_write so perhaps they can be declared in those routines' scopes. > + > +/* A list of replies. Currently only one will ever be outstanding. */ > +LIST_HEAD(xs_reply_list); > + > +/* A list of write requests. */ > +LIST_HEAD(xb_write_list); > +DECLARE_WAIT_QUEUE_HEAD(xb_waitq); > +DEFINE_MUTEX(xb_write_mutex); > + > +/* Protect xenbus reader thread against save/restore. */ > +DEFINE_MUTEX(xs_response_mutex); > + > static int xenbus_irq; > +static struct task_struct *xenbus_task; > > static DECLARE_WORK(probe_work, xenbus_probe); > > -static DECLARE_WAIT_QUEUE_HEAD(xb_waitq); > > static irqreturn_t wake_waiting(int irq, void *unused) > { > @@ -84,30 +114,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX > cons, > return buf + MASK_XENSTORE_IDX(cons); > } > > +static int xb_data_to_write(void) > +{ > + struct xenstore_domain_interface *intf = xen_store_interface; > + > + return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE && > + !list_empty(&xb_write_list); > +} > + > /** > * xb_write - low level write > * @data: buffer to send > * @len: length of buffer > * > - * Returns 0 on success, error otherwise. > + * Returns number of bytes written or -err. > */ > -int xb_write(const void *data, unsigned len) > +static int xb_write(const void *data, unsigned int len) > { > struct xenstore_domain_interface *intf = xen_store_interface; > XENSTORE_RING_IDX cons, prod; > - int rc; > + unsigned int bytes = 0; > > while (len != 0) { > void *dst; > unsigned int avail; > > - rc = wait_event_interruptible( > - xb_waitq, > - (intf->req_prod - intf->req_cons) != > - XENSTORE_RING_SIZE); > - if (rc < 0) > - return rc; > - > /* Read indexes, then verify. */ > cons = intf->req_cons; > prod = intf->req_prod; > @@ -115,59 +146,57 @@ int xb_write(const void *data, unsigned len) > intf->req_cons = intf->req_prod = 0; > return -EIO; > } > - > - dst = get_output_chunk(cons, prod, intf->req, &avail); > - if (avail == 0) > - continue; > - if (avail > len) > - avail = len; > + if (!xb_data_to_write()) > + return bytes; > > /* Must write data /after/ reading the consumer index. */ > virt_mb(); > > + dst = get_output_chunk(cons, prod, intf->req, &avail); > + if (avail == 0) > + continue; Should we continue the loop here or return? We are waiting for the reader to get stuff off the ring. > + if (avail > len) > + avail = len; > + > memcpy(dst, data, avail); > data += avail; > len -= avail; > + bytes += avail; > > /* Other side must not see new producer until data is there. */ > virt_wmb(); > intf->req_prod += avail; > > /* Implies mb(): other side will see the updated producer. */ > - notify_remote_via_evtchn(xen_store_evtchn); > + if (prod <= intf->req_cons) > + notify_remote_via_evtchn(xen_store_evtchn); > } > > - return 0; > + return bytes; > } > > -int xb_data_to_read(void) > +static int xb_data_to_read(void) > { > struct xenstore_domain_interface *intf = xen_store_interface; > return (intf->rsp_cons != intf->rsp_prod); > } > > -int xb_wait_for_data_to_read(void) > -{ > - return wait_event_interruptible(xb_waitq, xb_data_to_read()); > -} > - > -int xb_read(void *data, unsigned len) > +static int xb_read(void *data, unsigned int len) > { > struct xenstore_domain_interface *intf = xen_store_interface; > XENSTORE_RING_IDX cons, prod; > - int rc; > + unsigned int bytes = 0; > > while (len != 0) { > unsigned int avail; > const char *src; > > - rc = xb_wait_for_data_to_read(); > - if (rc < 0) > - return rc; > - > /* Read indexes, then verify. */ > cons = intf->rsp_cons; > prod = intf->rsp_prod; > + if (cons == prod) > + return bytes; > + > if (!check_indexes(cons, prod)) { > intf->rsp_cons = intf->rsp_prod = 0; > return -EIO; > @@ -185,17 +214,229 @@ int xb_read(void *data, unsigned len) > memcpy(data, src, avail); > data += avail; > len -= avail; > + bytes += avail; > > /* Other side must not see free space until we've copied out */ > virt_mb(); > intf->rsp_cons += avail; > > - pr_debug("Finished read of %i bytes (%i to go)\n", avail, len); > - > /* Implies mb(): other side will see the updated consumer. */ > - notify_remote_via_evtchn(xen_store_evtchn); > + if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE) > + notify_remote_via_evtchn(xen_store_evtchn); > } > > + return bytes; > +} > + > +static int process_msg(void) > +{ > + static struct xs_thread_state_read state; > + struct xb_req_data *req; > + int err; > + unsigned int len; > + > + if (!state.in_msg) { > + state.in_msg = true; > + state.in_hdr = true; > + state.used = 0; > + > + /* > + * We must disallow save/restore while reading a message. > + * A partial read across s/r leaves us out of sync with > + * xenstored. > + */ > + mutex_lock(&xs_response_mutex); > + > + if (!xb_data_to_read()) { > + /* We raced with save/restore: pending data 'gone'. */ > + mutex_unlock(&xs_response_mutex); > + state.in_msg = false; > + return 0; > + } > + } > + > + if (state.in_hdr) { > + if (state.used != sizeof(state.msg)) { > + err = xb_read((void *)&state.msg + state.used, > + sizeof(state.msg) - state.used); > + if (err < 0) > + goto out; > + state.used += err; > + if (state.used != sizeof(state.msg)) > + return 0; Would it be possible to do locking at the caller? I understand that you are trying to hold the lock across multiple invocations of this function but it feels somewhat counter-intuitive and bug-prone. If it's not possible then at least please add a comment explaining locking algorithm. > + if (state.msg.len > XENSTORE_PAYLOAD_MAX) { > + err = -EINVAL; > + goto out; > + } > + } > + > + len = state.msg.len + 1; > + if (state.msg.type == XS_WATCH_EVENT) > + len += sizeof(*state.watch); > + > + state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH); Why can't you kmalloc to state.body only when type!=XS_WATCH_EVENT ? > + if (!state.alloc) > + return -ENOMEM; > + > + if (state.msg.type == XS_WATCH_EVENT) > + state.body = state.watch->body; > + else > + state.body = state.alloc; > + state.in_hdr = false; > + state.used = 0; > + } > + > + err = xb_read(state.body + state.used, state.msg.len - state.used); > + if (err < 0) > + goto out; > + > + state.used += err; > + if (state.used != state.msg.len) > + return 0; > + > + state.body[state.msg.len] = '\0'; > + > + if (state.msg.type == XS_WATCH_EVENT) { > + state.watch->len = state.msg.len; > + err = xs_watch_msg(state.watch); > + } else { > + err = -ENOENT; > + mutex_lock(&xb_write_mutex); > + list_for_each_entry(req, &xs_reply_list, list) { > + if (req->msg.req_id == state.msg.req_id) { > + if (req->state == xb_req_state_wait_reply) { > + req->msg.type = state.msg.type; > + req->msg.len = state.msg.len; > + req->body = state.body; > + req->state = xb_req_state_got_reply; > + list_del(&req->list); > + req->cb(req); > + } else { > + list_del(&req->list); > + kfree(req); > + } > + err = 0; > + break; > + } > + } > + mutex_unlock(&xb_write_mutex); > + if (err) > + goto out; > + } > + > + mutex_unlock(&xs_response_mutex); > + > + state.in_msg = false; > + state.alloc = NULL; > + return err; > + > + out: > + mutex_unlock(&xs_response_mutex); > + state.in_msg = false; > + kfree(state.alloc); > + state.alloc = NULL; > + return err; > +} > + > +static int process_writes(void) > +{ > + static struct xs_thread_state_write state; > + void *base; > + unsigned int len; > + int err = 0; > + > + if (!xb_data_to_write()) > + return 0; > + > + mutex_lock(&xb_write_mutex); > + > + if (!state.req) { > + state.req = list_first_entry(&xb_write_list, > + struct xb_req_data, list); > + state.idx = -1; > + state.used = 0; > + } > + > + if (state.req->state == xb_req_state_aborted) > + goto out_err; > + > + while (state.idx < state.req->num_vecs) { > + if (state.idx < 0) { > + base = &state.req->msg; > + len = sizeof(state.req->msg); > + } else { > + base = state.req->vec[state.idx].iov_base; > + len = state.req->vec[state.idx].iov_len; > + } > + err = xb_write(base + state.used, len - state.used); > + if (err < 0) > + goto out_err; > + state.used += err; > + if (state.used != len) > + goto out; > + > + state.idx++; > + state.used = 0; > + } > + > + /* > + * You would expect the following to be racy, but as the response is > + * being read by our thread there is no risk of req being freed > + * under our feet. > + */ I don't think I understand this (and it's missing a "so" or something like that between "thread" and "there"). If this is not racy, why are we doing this under xb_write_mutex? > + list_del(&state.req->list); > + state.req->state = xb_req_state_wait_reply; > + list_add_tail(&state.req->list, &xs_reply_list); > + state.req = NULL; > + > + out: > + mutex_unlock(&xb_write_mutex); > + > + return 0; > + > + out_err: > + state.req->msg.type = XS_ERROR; > + state.req->err = err; You don't seem to need this for xb_req_state_aborted since you are freeing state_req. OTOH, why shouldn't aborted requests generate an error reply as well? > + list_del(&state.req->list); > + if (state.req->state == xb_req_state_aborted) > + kfree(state.req); > + else { > + state.req->state = xb_req_state_got_reply; > + wake_up(&state.req->wq); > + } > + > + mutex_unlock(&xb_write_mutex); > + > + state.req = NULL; > + > + return err; > +} > + > +static int xb_thread_work(void) > +{ > + return xb_data_to_read() || xb_data_to_write(); > +} > + > +static int xenbus_thread(void *unused) > +{ > + int err; > + > + while (!kthread_should_stop()) { > + if (wait_event_interruptible(xb_waitq, xb_thread_work())) > + continue; > + > + err = process_msg(); > + if (err == -ENOMEM) > + schedule(); > + else if (err) > + pr_warn("error %d while reading message\n", err); > + > + err = process_writes(); > + if (err) > + pr_warn("error %d while writing message\n", err); Is there a chance that errors are persistent and you then spam the log? -boris > + } > + > + xenbus_task = NULL; > return 0; > } > > @@ -223,6 +464,7 @@ int xb_init_comms(void) > rebind_evtchn_irq(xen_store_evtchn, xenbus_irq); > } else { > int err; > + > err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting, > 0, "xenbus", &xb_waitq); > if (err < 0) { > @@ -231,6 +473,13 @@ int xb_init_comms(void) > } > > xenbus_irq = err; > + > + if (!xenbus_task) { > + xenbus_task = kthread_run(xenbus_thread, NULL, > + "xenbus"); > + if (IS_ERR(xenbus_task)) > + return PTR_ERR(xenbus_task); > + } > } > > return 0; > > > > > _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx https://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |