[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v4] xenbus: support large messages



Hello,

Sorry, it seems I missed that mail :/

Added my tag below.

BTW, I didn't see the mb() fix between rsp_cons+= and reading rsp_prod
on the Linux side?

Samuel

Juergen Gross, le mer. 24 nov. 2021 08:00:55 +0100, a ecrit:
> Ping?
> 
> On 04.10.21 11:40, Juergen Gross wrote:
> > Today the implementation of the xenbus protocol in Mini-OS will only
> > allow to transfer the complete message to or from the ring page buffer.
> > This is limiting the maximum message size to lower values as the xenbus
> > protocol normally would allow.
> > 
> > Change that by allowing to transfer the xenbus message in chunks as
> > soon as they are available.
> > 
> > Avoid crashing Mini-OS in case of illegal data read from the ring
> > buffer.
> > 
> > Signed-off-by: Juergen Gross <jgross@xxxxxxxx>

Reviewed-by: Samuel Thibault <samuel.thibault@xxxxxxxxxxxx>

> > ---
> > V2:
> > - drop redundant if (Samuel Thibault)
> > - move rmb() (Samuel Thibault)
> > V3:
> > - correct notification test (Samuel Thibault)
> > V4:
> > - more memory barriers (Samuel Thibault)
> > ---
> >   xenbus/xenbus.c | 210 ++++++++++++++++++++++++++++--------------------
> >   1 file changed, 122 insertions(+), 88 deletions(-)
> > 
> > diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> > index 23de61e..b687678 100644
> > --- a/xenbus/xenbus.c
> > +++ b/xenbus/xenbus.c
> > @@ -29,6 +29,7 @@
> >   #include <xen/hvm/params.h>
> >   #include <mini-os/spinlock.h>
> >   #include <mini-os/xmalloc.h>
> > +#include <mini-os/semaphore.h>
> >   #define min(x,y) ({                       \
> >           typeof(x) tmpx = (x);                 \
> > @@ -46,6 +47,7 @@
> >   static struct xenstore_domain_interface *xenstore_buf;
> >   static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
> >   DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> > +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
> >   xenbus_event_queue xenbus_events;
> >   static struct watch {
> > @@ -231,75 +233,103 @@ char *xenbus_wait_for_state_change(const char* path, 
> > XenbusState *state, xenbus_
> >   }
> > +static void xenbus_read_data(char *buf, unsigned int len)
> > +{
> > +    unsigned int off = 0;
> > +    unsigned int prod, cons;
> > +    unsigned int size;
> > +
> > +    while (off != len)
> > +    {
> > +        wait_event(xb_waitq, xenstore_buf->rsp_prod != 
> > xenstore_buf->rsp_cons);
> > +
> > +        prod = xenstore_buf->rsp_prod;
> > +        cons = xenstore_buf->rsp_cons;
> > +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
> > +        size = min(len - off, prod - cons);
> > +
> > +        rmb();   /* Make sure data read from ring is ordered with 
> > rsp_prod. */
> > +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> > +                         MASK_XENSTORE_IDX(cons), size);
> > +        off += size;
> > +        mb();    /* memcpy() and rsp_cons update must not be reordered. */
> > +        xenstore_buf->rsp_cons += size;
> > +        mb();    /* rsp_cons must be visible before we look at rsp_prod. */
> > +        if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
> > +            notify_remote_via_evtchn(xenbus_evtchn);
> > +    }
> > +}
> > +
> >   static void xenbus_thread_func(void *ign)
> >   {
> >       struct xsd_sockmsg msg;
> > -    unsigned prod = xenstore_buf->rsp_prod;
> > +    char *data;
> >       for (;;) {
> > -        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> > -        while (1) {
> > -            prod = xenstore_buf->rsp_prod;
> > -            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> > -                  xenstore_buf->rsp_prod);
> > -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < 
> > sizeof(msg))
> > -                break;
> > -            rmb();
> > -            memcpy_from_ring(xenstore_buf->rsp, &msg,
> > -                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > -                             sizeof(msg));
> > -            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > -                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, 
> > msg.req_id);
> > -
> > -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> > -                sizeof(msg) + msg.len)
> > -                break;
> > -
> > -            DEBUG("Message is good.\n");
> > -
> > -            if (msg.type == XS_WATCH_EVENT) {
> > -                struct xenbus_event *event = malloc(sizeof(*event) + 
> > msg.len);
> > -                xenbus_event_queue *events = NULL;
> > -                char *data = (char*)event + sizeof(*event);
> > -                struct watch *watch;
> > -
> > -                memcpy_from_ring(xenstore_buf->rsp, data,
> > -                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + 
> > sizeof(msg)),
> > -                    msg.len);
> > -
> > -                event->path = data;
> > -                event->token = event->path + strlen(event->path) + 1;
> > -
> > -                mb();
> > -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > -
> > -                for (watch = watches; watch; watch = watch->next)
> > -                    if (!strcmp(watch->token, event->token)) {
> > -                        events = watch->events;
> > -                        break;
> > -                    }
> > -
> > -                if (events) {
> > -                    event->next = *events;
> > -                    *events = event;
> > -                    wake_up(&xenbus_watch_queue);
> > -                } else {
> > -                    printk("unexpected watch token %s\n", event->token);
> > -                    free(event);
> > +        xenbus_read_data((char *)&msg, sizeof(msg));
> > +        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> > +              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> > +
> > +        if (msg.len > XENSTORE_PAYLOAD_MAX) {
> > +            printk("Xenstore violates protocol, message longer than 
> > allowed.\n");
> > +            return;
> > +        }
> > +
> > +        if (msg.type == XS_WATCH_EVENT) {
> > +            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> > +            xenbus_event_queue *events = NULL;
> > +            struct watch *watch;
> > +            char *c;
> > +            int zeroes = 0;
> > +
> > +            data = (char*)event + sizeof(*event);
> > +            xenbus_read_data(data, msg.len);
> > +
> > +            for (c = data; c < data + msg.len; c++)
> > +                if (!*c)
> > +                    zeroes++;
> > +            if (zeroes != 2) {
> > +                printk("Xenstore: illegal watch event data\n");
> > +                free(event);
> > +                continue;
> > +            }
> > +
> > +            event->path = data;
> > +            event->token = event->path + strlen(event->path) + 1;
> > +
> > +            for (watch = watches; watch; watch = watch->next)
> > +                if (!strcmp(watch->token, event->token)) {
> > +                    events = watch->events;
> > +                    break;
> >                   }
> > +
> > +            if (events) {
> > +                event->next = *events;
> > +                *events = event;
> > +                wake_up(&xenbus_watch_queue);
> >               } else {
> > -                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> > -                memcpy_from_ring(xenstore_buf->rsp, 
> > req_info[msg.req_id].reply,
> > -                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> > -                                 msg.len + sizeof(msg));
> > -                mb();
> > -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> > -                wake_up(&req_info[msg.req_id].waitq);
> > +                printk("Xenstore: unexpected watch token %s\n", 
> > event->token);
> > +                free(event);
> >               }
> > -            wmb();
> > -            notify_remote_via_evtchn(xenbus_evtchn);
> > +            continue;
> >           }
> > +
> > +        data = malloc(sizeof(msg) + msg.len);
> > +        memcpy(data, &msg, sizeof(msg));
> > +        xenbus_read_data(data + sizeof(msg), msg.len);
> > +
> > +        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> > +            printk("Xenstore: illegal request id %d\n", msg.req_id);
> > +            free(data);
> > +            continue;
> > +        }
> > +
> > +        DEBUG("Message is good.\n");
> > +
> > +        req_info[msg.req_id].reply = data;
> > +
> > +        wake_up(&req_info[msg.req_id].waitq);
> >       }
> >   }
> > @@ -451,36 +481,40 @@ static void xb_write(int type, int req_id, 
> > xenbus_transaction_t trans_id,
> >       cur_req = &header_req;
> > -    BUG_ON(len > XENSTORE_RING_SIZE);
> > -    /* Wait for the ring to drain to the point where we can send the
> > -       message. */
> > -    prod = xenstore_buf->req_prod;
> > -    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> > -    {
> > -        /* Wait for there to be space on the ring */
> > -        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> > -                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> > -        wait_event(xb_waitq,
> > -                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> > -                XENSTORE_RING_SIZE);
> > -        DEBUG("Back from wait.\n");
> > -        prod = xenstore_buf->req_prod;
> > -    }
> > +    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
> > +
> > +    /* Make sure we are the only thread trying to write. */
> > +    down(&xb_write_sem);
> > -    /* We're now guaranteed to be able to send the message without
> > -       overflowing the ring.  Do so. */
> > +    /* Send the message in chunks using free ring space when available. */
> >       total_off = 0;
> >       req_off = 0;
> > -    while (total_off < len)
> > +    while (total_off < len)
> >       {
> > +        prod = xenstore_buf->req_prod;
> > +        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> > +        {
> > +            /* Send evtchn to notify remote */
> > +            notify_remote_via_evtchn(xenbus_evtchn);
> > +
> > +            /* Wait for there to be space on the ring */
> > +            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> > +                  len - total_off, xenstore_buf->req_cons, 
> > XENSTORE_RING_SIZE);
> > +            wait_event(xb_waitq,
> > +                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> > +            DEBUG("Back from wait.\n");
> > +        }
> > +
> >           this_chunk = min(cur_req->len - req_off,
> > -                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > +                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> > +        this_chunk = min(this_chunk,
> > +                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - 
> > prod);
> >           memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> > -                (char *)cur_req->data + req_off, this_chunk);
> > +               (char *)cur_req->data + req_off, this_chunk);
> >           prod += this_chunk;
> >           req_off += this_chunk;
> >           total_off += this_chunk;
> > -        if (req_off == cur_req->len)
> > +        if (req_off == cur_req->len)
> >           {
> >               req_off = 0;
> >               if (cur_req == &header_req)
> > @@ -488,20 +522,20 @@ static void xb_write(int type, int req_id, 
> > xenbus_transaction_t trans_id,
> >               else
> >                   cur_req++;
> >           }
> > +
> > +        /* Remote must see entire message before updating indexes */
> > +        wmb();
> > +        xenstore_buf->req_prod = prod;
> >       }
> > +    /* Send evtchn to notify remote */
> > +    notify_remote_via_evtchn(xenbus_evtchn);
> > +
> >       DEBUG("Complete main loop of xb_write.\n");
> >       BUG_ON(req_off != 0);
> >       BUG_ON(total_off != len);
> > -    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
> > -    /* Remote must see entire message before updating indexes */
> > -    wmb();
> > -
> > -    xenstore_buf->req_prod += len;
> > -
> > -    /* Send evtchn to notify remote */
> > -    notify_remote_via_evtchn(xenbus_evtchn);
> > +    up(&xb_write_sem);
> >   }
> >   /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> > 
> 



 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.