[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH v3 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq



Thanks for the review, Roger. Replies inline below.

On Wed, Jan 9, 2019 at 10:57 AM Roger Pau Monné <royger@xxxxxxxxxxx> wrote:
>
>  to.On Mon, Jan 7, 2019 at 8:44 AM Christopher Clark
> <christopher.w.clark@xxxxxxxxx> wrote:
> >
> > sendv operation is invoked to perform a synchronous send of buffers
> > contained in iovs to a remote domain's registered ring.
> >
> > diff --git a/xen/common/argo.c b/xen/common/argo.c
> > index 59ce8c4..4548435 100644
> > --- a/xen/common/argo.c
> > +++ b/xen/common/argo.c

> >
> > +static int
> > +memcpy_to_guest_ring(struct argo_ring_info *ring_info, uint32_t offset,
> > +                     const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd,
> > +                     uint32_t len)
> > +{
> > +    unsigned int mfns_index = offset >> PAGE_SHIFT;
> > +    void *dst;
> > +    int ret;
> > +    unsigned int src_offset = 0;
> > +
> > +    ASSERT(spin_is_locked(&ring_info->lock));
> > +
> > +    offset &= ~PAGE_MASK;
> > +
> > +    if ( (len > XEN_ARGO_MAX_RING_SIZE) || (offset > 
> > XEN_ARGO_MAX_RING_SIZE) )
> > +        return -EFAULT;
> > +
> > +    while ( (offset + len) > PAGE_SIZE )
>
> I think you could map the whole ring in contiguous virtual address
> space and then writing to it would be much more easy, you wouldn't
> need to iterate with memcpy or copy_from_guest, take a look at __vmap.
> You could likely map this when the ring gets setup and keep it mapped
> for the lifetime of the ring.

You're right about that, because map_domain_page_global, which the
current code uses, uses vmap itself. I think there's a couple of
reasons why the code has been implemented the iterative way though.

The first is that I think ring resize has been a consideration: it's
useful to be able to increase the size of a live and active ring that
is under load without having to tear down the mappings, find a new
virtual address region of the right size and then remap it: you can
just supply some more memory and map those pages onto the end of the
ring, and ensure both sides know about the new ring size. Similarly,
shrinking a quiet ring can be useful.
However, the "gfn race" that you (correctly) pointed out in an earlier
review, and Jan's related request to drop the "revalidate an existing
mapping on ring reregister" motivated removal of a section of the code
involved, and then in v3 of the series, I've actually just blocked
ring resize because it's missing a walk through the pending
notifications to find any that have become untriggerable with the new
ring size when a ring is shrunk and I'd like to defer implementing
that for now. So the ring resize reason is more of a consideration for
a possible later version of Argo than the current one.

The second reason is about avoiding exposing the Xen virtual memory
allocator directly to frequent guest-supplied size requests for
contiguous regions (of up to 16GB). With single-page allocations to
build a ring, fragmentation is not a problem, and mischief by a guest
seems difficult. Changing it to issue requests for contiguous regions,
with variable ring sizes up to the maximum of 16GB, it seems like
significant fragmentation may be achievable. I don't know the
practical impact of that but it seems worth avoiding. Are the other
users of __vmap (or vmap) for multi-gigabyte regions only either
boot-time, infrequent operations (livepatch), or for actions by
privileged (ie. somewhat trusted) domains (ioremap), or is it already
a frequent operation somewhere else?

Given the context above, and Jason's simplification to the
memcpy_to_guest_ring function, plus the imminent merge freeze
deadline, and the understanding that this loop and the related data
structures supporting it have been tested and are working, would it be
acceptable to omit making this contiguous mapping change from this
current series?

>
> > +    {
> > +        unsigned int head_len = PAGE_SIZE - offset;
> > +
> > +        ret = ring_map_page(ring_info, mfns_index, &dst);
> > +        if ( ret )
> > +            return ret;
> > +
> > +        if ( src )
> > +        {
> > +            memcpy(dst + offset, src + src_offset, head_len);
> > +            src_offset += head_len;
> > +        }
> > +        else
> > +        {
> > +            ret = copy_from_guest(dst + offset, src_hnd, head_len) ?
> > +                    -EFAULT : 0;
> > +            if ( ret )
> > +                return ret;
>
> You can simplify this to:
>
> if ( copy_from_guest(...) )
>     return -EFAULT;

yes! ack - thanks

<snip>
> > +/*
> > + * get_sanitized_ring creates a modified copy of the ring pointers where
> > + * the rx_ptr is rounded up to ensure it is aligned, and then ring
> > + * wrap is handled. Simplifies safe use of the rx_ptr for available
> > + * space calculation.
> > + */
> > +static int
> > +get_sanitized_ring(xen_argo_ring_t *ring, struct argo_ring_info *ring_info)
> > +{
> > +    uint32_t rx_ptr;
> > +    int ret;
> > +
> > +    ret = get_rx_ptr(ring_info, &rx_ptr);
> > +    if ( ret )
> > +        return ret;
> > +
> > +    ring->tx_ptr = ring_info->tx_ptr;
> > +
> > +    rx_ptr = ROUNDUP_MESSAGE(rx_ptr);
> > +    if ( rx_ptr >= ring_info->len )
> > +        rx_ptr = 0;
> > +
> > +    ring->rx_ptr = rx_ptr;
>
> Newline.

ack, thanks

<snip>
> > +/*
> > + * iov_count returns its count on success via an out variable to avoid
> > + * potential for a negative return value to be used incorrectly
> > + * (eg. coerced into an unsigned variable resulting in a large incorrect 
> > value)
> > + */
> > +static int
> > +iov_count(const xen_argo_iov_t *piov, unsigned long niov, uint32_t *count)
> > +{
> > +    uint32_t sum_iov_lens = 0;
> > +
> > +    if ( niov > XEN_ARGO_MAXIOV )
> > +        return -EINVAL;
> > +
> > +    while ( niov-- )
>
> I would use a for loop here, that would remove the need to piov++, if
> you want to keep it quite similar:
>
> for ( ; niov--; piov++ )
> {

Yes, that is better - thanks, applied.

<snip>
> > +
> > +static int
> > +ringbuf_insert(struct domain *d, struct argo_ring_info *ring_info,
> > +               const struct argo_ring_id *src_id,
> > +               XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd,
> > +               unsigned long niov, uint32_t message_type,
> > +               unsigned long *out_len)
> > +{
> > +    xen_argo_ring_t ring;
> > +    struct xen_argo_ring_message_header mh = { 0 };
>
> No need for the 0, { } will achieve exactly the same.

ack, applied

>
> > +    int32_t sp;
> > +    int32_t ret;
> > +    uint32_t len = 0;
> > +    xen_argo_iov_t iovs[XEN_ARGO_MAXIOV];
>
> This seems slightly dangerous, a change of the maximum could cause
> stack overflow depending on the size of xen_argo_iov_t. I think you
> need some comment next to definition of XEN_ARGO_MAXIOV to note that
> increasing this could cause issues.

That makes sense, will do.

<snip>
> > +    /*
> > +     * First data write into the destination ring: fixed size, message 
> > header.
> > +     * This cannot overrun because the available free space (value in 'sp')
> > +     * is checked above and must be at least this size.
> > +     */
> > +    ret = memcpy_to_guest_ring(ring_info, ring.tx_ptr + 
> > sizeof(xen_argo_ring_t),
> > +                               &mh, NULL_hnd, sizeof(mh));
> > +    if ( ret )
> > +    {
> > +        gprintk(XENLOG_ERR,
> > +                "argo: failed to write message header to ring (vm%u:%x 
> > vm%d)\n",
> > +                ring_info->id.domain_id, ring_info->id.port,
> > +                ring_info->id.partner_id);
> > +
> > +        goto out;
> > +    }
> > +
> > +    ring.tx_ptr += sizeof(mh);
> > +    if ( ring.tx_ptr == ring_info->len )
> > +        ring.tx_ptr = 0;
> > +
> > +    piov = iovs;
> > +
> > +    while ( niov-- )
>
> AFAICT using a for loop would remove the need to also do a piov++ at
> each iteration.

ack, applied.

<snip>
> > +         * Case 2: ring-tail-wrap-write above was not performed
> > +         *    -> so iov_len is the guest-supplied value and: (iov_len <= 
> > sp)
> > +         *    ie. less than available space at the tail of the ring:
> > +         *        so this write cannot overrun.
> > +         */
> > +        ret = memcpy_to_guest_ring(ring_info,
> > +                                   ring.tx_ptr + sizeof(xen_argo_ring_t),
> > +                                   NULL, buf_hnd, iov_len);
> > +        if ( ret )
> > +        {
> > +            gprintk(XENLOG_ERR,
> > +                    "argo: failed to copy [%p, %"PRIx32"] (vm%u:%x 
> > vm%d)\n",
> > +                    buf_hnd.p, iov_len, ring_info->id.domain_id,
> > +                    ring_info->id.port, ring_info->id.partner_id);
> > +
> > +            goto out;
> > +        }
> > +
> > +        ring.tx_ptr += iov_len;
> > +
> > +        if ( ring.tx_ptr == ring_info->len )
> > +            ring.tx_ptr = 0;
> > +
> > +        piov++;
> > +    }
> > +
> > +    ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr);
> > +
> > +    if ( ring.tx_ptr >= ring_info->len )
> > +        ring.tx_ptr -= ring_info->len;
> > +
> > +    update_tx_ptr(ring_info, ring.tx_ptr);
> > +
> > + out:
>
> Do you really need to out label? *out_len it's only set in the success
> case, so all the error cases that use a 'goto out' could be replaced
> by 'return ret;'.

ack, thanks -- done.

<snip>
> > +static int
> > +pending_queue(struct argo_ring_info *ring_info, domid_t src_id,
> > +              unsigned int len)
> > +{
> > +    struct pending_ent *ent;
> > +
> > +    ASSERT(spin_is_locked(&ring_info->lock));
> > +
> > +    if ( ring_info->npending >= MAX_PENDING_PER_RING )
> > +        return -ENOSPC;
> > +
> > +    ent = xmalloc(struct pending_ent);
> > +
>
> Extra newline.

ack

<snip>
> >
> > +static long
> > +sendv(struct domain *src_d, const xen_argo_addr_t *src_addr,
> > +      const xen_argo_addr_t *dst_addr,
> > +      XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, unsigned long niov,
> > +      uint32_t message_type)
> > +{
> > +    struct domain *dst_d = NULL;
> > +    struct argo_ring_id src_id;
> > +    struct argo_ring_info *ring_info;
> > +    int ret = 0;
> > +    unsigned long len = 0;
> > +
> > +    ASSERT(src_d->domain_id == src_addr->domain_id);
> > +
> > +    argo_dprintk("sendv: (%d:%x)->(%d:%x) niov:%lu iov:%p type:%u\n",
> > +                 src_addr->domain_id, src_addr->port,
> > +                 dst_addr->domain_id, dst_addr->port,
> > +                 niov, iovs_hnd.p, message_type);
> > +
> > +    read_lock(&argo_lock);
> > +
> > +    if ( !src_d->argo )
> > +    {
> > +        ret = -ENODEV;
> > +        goto out_unlock;
> > +    }
> > +
> > +    src_id.port = src_addr->port;
> > +    src_id.domain_id = src_d->domain_id;
> > +    src_id.partner_id = dst_addr->domain_id;
> > +
> > +    dst_d = get_domain_by_id(dst_addr->domain_id);
> > +    if ( !dst_d )
> > +    {
> > +        argo_dprintk("!dst_d, ESRCH\n");
> > +        ret = -ESRCH;
> > +        goto out_unlock;
> > +    }
> > +
> > +    if ( !dst_d->argo )
> > +    {
> > +        argo_dprintk("!dst_d->argo, ECONNREFUSED\n");
> > +        ret = -ECONNREFUSED;
> > +        goto out_unlock;
>
> The usage of out_unlock here and in the condition above is wrong,
> since it will unconditionally call read_unlock(&argo_lock); which is
> wrong here because the lock has not yet been acquired.

Sorry, I don't think that's quite right -- if you scroll up a bit
here, you can see where argo_lock is taken unconditionally, just after
the dprintk and before checking whether src_d is argo enabled. The
second lock hasn't been taken yet - but that's not the one being
unlocked on that out_unlock path.

>
> > +    }
> > +
> > +    read_lock(&dst_d->argo->lock);
> > +
> > +    ring_info = ring_find_info_by_match(dst_d, dst_addr->port,
> > +                                        src_addr->domain_id);
> > +    if ( !ring_info )
> > +    {
> > +        gprintk(XENLOG_ERR,
> > +                "argo: vm%u connection refused, src (vm%u:%x) dst 
> > (vm%u:%x)\n",
> > +                current->domain->domain_id, src_id.domain_id, src_id.port,
> > +                dst_addr->domain_id, dst_addr->port);
> > +
> > +        ret = -ECONNREFUSED;
> > +        goto out_unlock2;
> > +    }
> > +
> > +    spin_lock(&ring_info->lock);
> > +
> > +    ret = ringbuf_insert(dst_d, ring_info, &src_id, iovs_hnd, niov,
> > +                         message_type, &len);
> > +    if ( ret == -EAGAIN )
> > +    {
> > +        argo_dprintk("argo_ringbuf_sendv failed, EAGAIN\n");
> > +        /* requeue to issue a notification when space is there */
> > +        ret = pending_requeue(ring_info, src_addr->domain_id, len);
> > +    }
> > +
> > +    spin_unlock(&ring_info->lock);
> > +
> > +    if ( ret >= 0 )
> > +        signal_domain(dst_d);
> > +
> > + out_unlock2:
>
> There's only a single user of the out_unlock2 label, at which point it
> might be easier to read to just put the read_unlock there and just use
> the existing out_unlock label.

ack, will change that.

Thanks again,

Christopher

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.