[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH v5 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq



On Tue, Jan 22, 2019 at 4:08 AM Roger Pau Monné <roger.pau@xxxxxxxxxx> wrote:
>
> On Mon, Jan 21, 2019 at 01:59:49AM -0800, Christopher Clark wrote:
> > sendv operation is invoked to perform a synchronous send of buffers
> > contained in iovs to a remote domain's registered ring.
> >
> > It takes:
> >  * A destination address (domid, port) for the ring to send to.
> >    It performs a most-specific match lookup, to allow for wildcard.
> >  * A source address, used to inform the destination of where to reply.
> >  * The address of an array of iovs containing the data to send
> >  * .. and the length of that array of iovs
> >  * and a 32-bit message type, available to communicate message context
> >    data (eg. kernel-to-kernel, separate from the application data).
> >
> > If insufficient space exists in the destination ring, it will return
> > -EAGAIN and Xen will notify the caller when sufficient space becomes
> > available.
> >
> > Accesses to the ring indices are appropriately atomic. The rings are
> > mapped into Xen's private address space to write as needed and the
> > mappings are retained for later use.
> >
> > Notifications are sent to guests via VIRQ and send_guest_global_virq is
> > exposed in the change to enable argo to call it. VIRQ_ARGO_MESSAGE is
>                                                    ^ VIRQ_ARGO
> > claimed from the VIRQ previously reserved for this purpose (#11).
> >
> > The VIRQ notification method is used rather than sending events using
> > evtchn functions directly because:
> >
> > * no current event channel type is an exact fit for the intended
> >   behaviour. ECS_IPI is closest, but it disallows migration to
> >   other VCPUs which is not necessarily a requirement for Argo.
> >
> > * at the point of argo_init, allocation of an event channel is
> >   complicated by none of the guest VCPUs being initialized yet
> >   and the event channel logic expects that a valid event channel
> >   has a present VCPU.
>
> IMO iff you wanted to use event channels those _must_ be setup by the
> guest, ie: the guest argo driver would load, allocate an event channel
> and then tell the hypervisor about the event channel that should be
> used for argo notifications.
>
> > +static int
> > +memcpy_to_guest_ring(const struct domain *d, struct argo_ring_info 
> > *ring_info,
> > +                     unsigned int offset,
> > +                     const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd,
> > +                     unsigned int len)
> > +{
> > +    unsigned int mfns_index = offset >> PAGE_SHIFT;
> > +    void *dst;
> > +    int ret;
> > +    unsigned int src_offset = 0;
> > +
> > +    ASSERT(LOCKING_L3(d, ring_info));
> > +
> > +    offset &= ~PAGE_MASK;
> > +
> > +    if ( len + offset > XEN_ARGO_MAX_RING_SIZE )
> > +        return -EFAULT;
> > +
> > +    while ( len )
> > +    {
> > +        unsigned int head_len = (offset + len) > PAGE_SIZE ? PAGE_SIZE - 
> > offset
> > +                                                           : len;
>
> IMO that would be clearer as:
>
> head_len = min(PAGE_SIZE - offset, len);

You're right that the calculated result should be the same, but I've left
this unchanged because I think the reason for using that value (ie. intent)
is clearer in the form it has:
it's not about trying to find the smallest amount of data to write,
it's about only writing up to the PAGE_SIZE boundary, starting at offset.

>
> But anyway, this should go away when you move to using vmap.
>
> [...]
> > +static int
> > +ringbuf_insert(const struct domain *d, struct argo_ring_info *ring_info,
> > +               const struct argo_ring_id *src_id,
> > +               XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd,
> > +               unsigned long niov, uint32_t message_type,
> > +               unsigned long *out_len)
> > +{
> > +    xen_argo_ring_t ring;
> > +    struct xen_argo_ring_message_header mh = { };
> > +    int sp, ret;
> > +    unsigned int len = 0;
> > +    xen_argo_iov_t iovs[XEN_ARGO_MAXIOV];
> > +    xen_argo_iov_t *piov;
> > +    XEN_GUEST_HANDLE(uint8_t) NULL_hnd =
> > +       guest_handle_from_param(guest_handle_from_ptr(NULL, uint8_t), 
> > uint8_t);
> > +
> > +    ASSERT(LOCKING_L3(d, ring_info));
> > +
> > +    ret = __copy_from_guest(iovs, iovs_hnd, niov) ? -EFAULT : 0;
> > +    if ( ret )
> > +        return ret;
> > +
> > +    /*
> > +     * Obtain the total size of data to transmit -- sets the 'len' variable
> > +     * -- and sanity check that the iovs conform to size and number limits.
> > +     * Enforced below: no more than 'len' bytes of guest data
> > +     * (plus the message header) will be sent in this operation.
> > +     */
> > +    ret = iov_count(iovs, niov, &len);
> > +    if ( ret )
> > +        return ret;
> > +
> > +    /*
> > +     * Size bounds check against ring size and static maximum message 
> > limit.
> > +     * The message must not fill the ring; there must be at least one slot
> > +     * remaining so we can distinguish a full ring from an empty one.
> > +     */
> > +    if ( ((ROUNDUP_MESSAGE(len) +
> > +            sizeof(struct xen_argo_ring_message_header)) >= 
> > ring_info->len) ||
> > +         (len > MAX_ARGO_MESSAGE_SIZE) )
>
> len is already checked to be <= MAX_ARGO_MESSAGE_SIZE in iov_count
> where it gets set, this is redundant.

ack, removed, thanks.

>
> > +        return -EMSGSIZE;
> > +
> > +    ret = get_sanitized_ring(d, &ring, ring_info);
> > +    if ( ret )
> > +        return ret;
> > +
> > +    argo_dprintk("ring.tx_ptr=%u ring.rx_ptr=%u ring len=%u"
> > +                 " ring_info->tx_ptr=%u\n",
> > +                 ring.tx_ptr, ring.rx_ptr, ring_info->len, 
> > ring_info->tx_ptr);
> > +
> > +    if ( ring.rx_ptr == ring.tx_ptr )
> > +        sp = ring_info->len;
> > +    else
> > +    {
> > +        sp = ring.rx_ptr - ring.tx_ptr;
> > +        if ( sp < 0 )
> > +            sp += ring_info->len;
> > +    }
> > +
> > +    /*
> > +     * Size bounds check against currently available space in the ring.
> > +     * Again: the message must not fill the ring leaving no space 
> > remaining.
> > +     */
> > +    if ( (ROUNDUP_MESSAGE(len) +
> > +            sizeof(struct xen_argo_ring_message_header)) >= sp )
> > +    {
> > +        argo_dprintk("EAGAIN\n");
> > +        return -EAGAIN;
> > +    }
> > +
> > +    mh.len = len + sizeof(struct xen_argo_ring_message_header);
> > +    mh.source.aport = src_id->aport;
> > +    mh.source.domain_id = src_id->domain_id;
> > +    mh.message_type = message_type;
> > +
> > +    /*
> > +     * For this copy to the guest ring, tx_ptr is always 16-byte aligned
> > +     * and the message header is 16 bytes long.
> > +     */
> > +    BUILD_BUG_ON(
> > +        sizeof(struct xen_argo_ring_message_header) != ROUNDUP_MESSAGE(1));
> > +
> > +    /*
> > +     * First data write into the destination ring: fixed size, message 
> > header.
> > +     * This cannot overrun because the available free space (value in 'sp')
> > +     * is checked above and must be at least this size.
> > +     */
> > +    ret = memcpy_to_guest_ring(d, ring_info,
> > +                               ring.tx_ptr + sizeof(xen_argo_ring_t),
> > +                               &mh, NULL_hnd, sizeof(mh));
> > +    if ( ret )
> > +    {
> > +        gprintk(XENLOG_ERR,
> > +                "argo: failed to write message header to ring (vm%u:%x 
> > vm%u)\n",
> > +                ring_info->id.domain_id, ring_info->id.aport,
> > +                ring_info->id.partner_id);
> > +
> > +        return ret;
> > +    }
> > +
> > +    ring.tx_ptr += sizeof(mh);
> > +    if ( ring.tx_ptr == ring_info->len )
> > +        ring.tx_ptr = 0;
> > +
> > +    for ( piov = iovs; niov--; piov++ )
> > +    {
> > +        XEN_GUEST_HANDLE_64(uint8_t) buf_hnd = piov->iov_hnd;
> > +        unsigned int iov_len = piov->iov_len;
> > +
> > +        /* If no data is provided in this iov, moan and skip on to the 
> > next */
> > +        if ( !iov_len )
> > +        {
> > +            gprintk(XENLOG_ERR,
>
> This should likely be WARN or INFO, since it's not an error?

Yes, changed to WARNING, ack.

>
> > +                    "argo: no data iov_len=0 iov_hnd=%p ring (vm%u:%x 
> > vm%u)\n",
> > +                    buf_hnd.p, ring_info->id.domain_id, 
> > ring_info->id.aport,
> > +                    ring_info->id.partner_id);
> > +
> > +            continue;
> > +        }
> > +
> > +        if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) )
> > +        {
> > +            gprintk(XENLOG_ERR,
> > +                    "argo: bad iov handle [%p, %u] (vm%u:%x vm%u)\n",
> > +                    buf_hnd.p, iov_len,
> > +                    ring_info->id.domain_id, ring_info->id.aport,
> > +                    ring_info->id.partner_id);
> > +
> > +            return -EFAULT;
> > +        }
> > +
> > +        sp = ring_info->len - ring.tx_ptr;
> > +
> > +        /* Check: iov data size versus free space at the tail of the ring 
> > */
> > +        if ( iov_len > sp )
> > +        {
> > +            /*
> > +             * Second possible data write: ring-tail-wrap-write.
> > +             * Populate the ring tail and update the internal tx_ptr to 
> > handle
> > +             * wrapping at the end of ring.
> > +             * Size of data written here: sp
> > +             * which is the exact full amount of free space available at 
> > the
> > +             * tail of the ring, so this cannot overrun.
> > +             */
> > +            ret = memcpy_to_guest_ring(d, ring_info,
> > +                                       ring.tx_ptr + 
> > sizeof(xen_argo_ring_t),
> > +                                       NULL, buf_hnd, sp);
> > +            if ( ret )
> > +            {
> > +                gprintk(XENLOG_ERR,
> > +                        "argo: failed to copy {%p, %d} (vm%u:%x vm%u)\n",
> > +                        buf_hnd.p, sp,
> > +                        ring_info->id.domain_id, ring_info->id.aport,
> > +                        ring_info->id.partner_id);
> > +
> > +                return ret;
> > +            }
> > +
> > +            ring.tx_ptr = 0;
> > +            iov_len -= sp;
> > +            guest_handle_add_offset(buf_hnd, sp);
> > +
> > +            ASSERT(iov_len <= ring_info->len);
> > +        }
> > +
> > +        /*
> > +         * Third possible data write: all data remaining for this iov.
> > +         * Size of data written here: iov_len
> > +         *
> > +         * Case 1: if the ring-tail-wrap-write above was performed, then
> > +         *         iov_len has been decreased by 'sp' and ring.tx_ptr is 
> > zero.
> > +         *
> > +         *    We know from checking the result of iov_count:
> > +         *      len + sizeof(message_header) <= ring_info->len
> > +         *    We also know that len is the total of summing all iov_lens, 
> > so:
> > +         *       iov_len <= len
> > +         *    so by transitivity:
> > +         *       iov_len <= len <= (ring_info->len - sizeof(msgheader))
> > +         *    and therefore:
> > +         *       (iov_len + sizeof(msgheader) <= ring_info->len) &&
> > +         *       (ring.tx_ptr == 0)
> > +         *    so this write cannot overrun here.
> > +         *
> > +         * Case 2: ring-tail-wrap-write above was not performed
> > +         *    -> so iov_len is the guest-supplied value and: (iov_len <= 
> > sp)
> > +         *    ie. less than available space at the tail of the ring:
> > +         *        so this write cannot overrun.
> > +         */
> > +        ret = memcpy_to_guest_ring(d, ring_info,
> > +                                   ring.tx_ptr + sizeof(xen_argo_ring_t),
> > +                                   NULL, buf_hnd, iov_len);
> > +        if ( ret )
> > +        {
> > +            gprintk(XENLOG_ERR,
> > +                    "argo: failed to copy [%p, %u] (vm%u:%x vm%u)\n",
> > +                    buf_hnd.p, iov_len, ring_info->id.domain_id,
> > +                    ring_info->id.aport, ring_info->id.partner_id);
> > +
> > +            return ret;
> > +        }
> > +
> > +        ring.tx_ptr += iov_len;
> > +
> > +        if ( ring.tx_ptr == ring_info->len )
> > +            ring.tx_ptr = 0;
> > +    }
> > +
> > +    ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr);
> > +
> > +    if ( ring.tx_ptr >= ring_info->len )
> > +        ring.tx_ptr -= ring_info->len;
>
> You seem to handle the wrapping after each possible write, so I think
> the above is not needed? Maybe it should be an assert instead?

The wrap handling is necesssary due to that ROUNDUP_MESSAGE
immediately above it.

I've added a new comment to make it a bit clearer:

Finished writing data from all iovs into the ring: now need to
round up tx_ptr to align to the next message boundary, and then
wrap if necessary.

>
> > +
> > +    update_tx_ptr(d, ring_info, ring.tx_ptr);
> > +
> > +    /*
> > +     * At this point (and also on an error exit paths from this function) 
> > it is
> > +     * possible to unmap the ring_info, ie:
> > +     *   ring_unmap(d, ring_info);
> > +     * but performance should be improved by not doing so, and retaining
> > +     * the mapping.
> > +     * An XSM policy control over level of confidentiality required
> > +     * versus performance cost could be added to decide that here.
> > +     */
> > +
> > +    *out_len = len;
> > +
> > +    return ret;
> > +}
> > +
> >  static void
> >  wildcard_pending_list_remove(domid_t domain_id, struct pending_ent *ent)
> >  {
> > @@ -497,6 +918,25 @@ wildcard_pending_list_remove(domid_t domain_id, struct 
> > pending_ent *ent)
> >  }
> >
> >  static void
> > +wildcard_pending_list_insert(domid_t domain_id, struct pending_ent *ent)
> > +{
> > +    struct domain *d = get_domain_by_id(domain_id);
> > +
> > +    if ( !d )
> > +        return;
> > +
> > +    ASSERT(LOCKING_Read_L1);
> > +
> > +    if ( d->argo )
> > +    {
> > +        spin_lock(&d->argo->wildcard_L2_lock);
> > +        list_add(&ent->wildcard_node, &d->argo->wildcard_pend_list);
> > +        spin_unlock(&d->argo->wildcard_L2_lock);
> > +    }
> > +    put_domain(d);
> > +}
> > +
> > +static void
> >  pending_remove_all(const struct domain *d, struct argo_ring_info 
> > *ring_info)
> >  {
> >      struct list_head *ring_pending = &ring_info->pending;
> > @@ -518,6 +958,70 @@ pending_remove_all(const struct domain *d, struct 
> > argo_ring_info *ring_info)
> >      ring_info->npending = 0;
> >  }
> >
> > +static int
> > +pending_queue(const struct domain *d, struct argo_ring_info *ring_info,
> > +              domid_t src_id, unsigned int len)
> > +{
> > +    struct pending_ent *ent;
> > +
> > +    ASSERT(LOCKING_L3(d, ring_info));
> > +
> > +    if ( ring_info->npending >= MAX_PENDING_PER_RING )
> > +        return -ENOSPC;
> > +
> > +    ent = xmalloc(struct pending_ent);
> > +    if ( !ent )
> > +        return -ENOMEM;
> > +
> > +    ent->len = len;
> > +    ent->domain_id = src_id;
> > +    ent->ring_info = ring_info;
> > +
> > +    if ( ring_info->id.partner_id == XEN_ARGO_DOMID_ANY )
> > +        wildcard_pending_list_insert(src_id, ent);
> > +    list_add(&ent->node, &ring_info->pending);
> > +    ring_info->npending++;
> > +
> > +    return 0;
> > +}
> > +
> > +static int
> > +pending_requeue(const struct domain *d, struct argo_ring_info *ring_info,
> > +                domid_t src_id, unsigned int len)
> > +{
> > +    struct list_head *cursor, *head;
> > +
> > +    ASSERT(LOCKING_L3(d, ring_info));
> > +
> > +    /* List structure is not modified here. Update len in a match if 
> > found. */
> > +    head = &ring_info->pending;
> > +
> > +    for ( cursor = head->next; cursor != head; cursor = cursor->next )
>
> list_for_each_entry

ack

>
> >  long
> >  do_argo_op(unsigned int cmd, XEN_GUEST_HANDLE_PARAM(void) arg1,
> >             XEN_GUEST_HANDLE_PARAM(void) arg2, unsigned long arg3,
> > @@ -1145,6 +1734,53 @@ do_argo_op(unsigned int cmd, 
> > XEN_GUEST_HANDLE_PARAM(void) arg1,
> >          break;
> >      }
> >
> > +    case XEN_ARGO_OP_sendv:
> > +    {
> > +        xen_argo_send_addr_t send_addr;
> > +
> > +        XEN_GUEST_HANDLE_PARAM(xen_argo_send_addr_t) send_addr_hnd =
> > +            guest_handle_cast(arg1, xen_argo_send_addr_t);
> > +        XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd =
> > +            guest_handle_cast(arg2, xen_argo_iov_t);
> > +        /* arg3 is niov */
> > +        /* arg4 is message_type. Must be a 32-bit value. */
> > +
> > +        rc = copy_from_guest(&send_addr, send_addr_hnd, 1) ? -EFAULT : 0;
> > +        if ( rc )
> > +            break;
> > +
> > +        /*
> > +         * Check padding is zeroed. Reject niov above limit or 
> > message_types
> > +         * that are outside 32 bit range.
> > +         */
> > +        if ( unlikely(send_addr.src.pad || send_addr.dst.pad ||
> > +                      (arg3 > XEN_ARGO_MAXIOV) || (arg4 & ~0xffffffffUL)) )
>
> arg4 & (GB(4) - 1)
>
> Is clearer IMO, or:
>
> arg4 > UINT32_MAX

I've left the code unchanged, as the mask constant is used multiple
places elsewhere in Xen. UINT32_MAX is only used as a threshold value.

>
> > +        {
> > +            rc = -EINVAL;
> > +            break;
> > +        }
> > +
> > +        if ( send_addr.src.domain_id == XEN_ARGO_DOMID_ANY )
> > +            send_addr.src.domain_id = currd->domain_id;
> > +
> > +        /* No domain is currently authorized to send on behalf of another 
> > */
> > +        if ( unlikely(send_addr.src.domain_id != currd->domain_id) )
> > +        {
> > +            rc = -EPERM;
> > +            break;
> > +        }
> > +
> > +        /*
> > +         * Check access to the whole array here so we can use the faster 
> > __copy
> > +         * operations to read each element later.
> > +         */
> > +        if ( unlikely(!guest_handle_okay(iovs_hnd, arg3)) )
>
> You need to set rc to EFAULT here, because the call to copy_from_guest
> has set it to 0.

ack.

>
> Alternatively you can change the call above to be:
>
> if ( copy_from_guest(&send_addr, send_addr_hnd, 1) )
>     return -EFAULT;
>
> So rc doesn't get set to 0 on success.

> With those taken care of:
>
> Reviewed-by: Roger Pau Monné <roger.pau@xxxxxxxxxx>

_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.