[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH v2 13/18] argo: implement the sendv op; evtchn: expose send_guest_global_virq



sendv operation is invoked to perform a synchronous send of buffers
contained in iovs to a remote domain's registered ring.

It takes:
 * A destination address (domid, port) for the ring to send to.
   It performs a most-specific match lookup, to allow for wildcard.
 * A source address, used to inform the destination of where to reply.
 * The address of an array of iovs containing the data to send
 * .. and the length of that array of iovs
 * and a 32-bit message type, available to communicate message context
   data (eg. kernel-to-kernel, separate from the application data).

If insufficient space exists in the destination ring, it will return
-EAGAIN and Xen will notify the caller when sufficient space becomes
available.

Accesses to the ring indices are appropriately atomic. The rings are
mapped into Xen's private address space to write as needed and the
mappings are retained for later use.

When locating the destination ring, a check is performed via a cookie
installed at ring registration time, to ensure that the source domain
is the same as it was when the ring was registered.

Fixed-size types are used in some areas within this code where caution
around avoiding integer overflow is important.

Notifications are sent to guests via VIRQ and send_guest_global_virq is
exposed in the change to enable argo to call it. VIRQ_ARGO_MESSAGE is
claimed from the VIRQ previously reserved for this purpose (#11).

After consideration, the VIRQ notification method has been selected
rather than sending events using evtchn functions directly because:

* no current event channel type is an exact fit for the intended
  behaviour. ECS_IPI is closest, but it disallows migration to
  other VCPUs which is not necessarily a requirement for Argo.

* at the point of argo_init, allocation of an event channel is
  complicated by none of the guest VCPUs being initialized yet
  and the event channel logic expects that a valid event channel
  has a present VCPU.

* at the point of signalling a notification, the VIRQ logic is already
  defensive: if d->vcpu[0] is NULL, the notification is just silently
  dropped, whereas the evtchn_send logic is not so defensive: vcpu[0]
  must not be NULL, otherwise a null pointer dereference occurs.

Using a VIRQ removes the need for the guest to query to determine which
event channel notifications will be delivered on. This is also likely to
simplify establishing future L0/L1 nested hypervisor argo communication.

Signed-off-by: Christopher Clark <christopher.clark6@xxxxxxxxxxxxxx>
---
Changes since v1:

v1 #15 feedback, Jan: sendv op : s/ECONNREFUSED/ESRCH/ if ! dest dom
v1 #5 (#15) feedback Paul: sendv: use currd in do_argo_message_op
v1 #13 (#15) feedback Paul: sendv op: do/while -> goto; reindent
v1 #15 feedback Paul: sendv op: make page var: unsigned
v1 #15 feedback Paul: sendv op: new local var for PAGE_SIZE - offset
v1 #8 feedback Jan: XEN_GUEST_HANDLE : C89 compliance
v1 rebase after switching register op from pfns to page descriptors
v1 self: move iov DEFINE_XEN_GUEST_HANDLE out of public header into argo.c
v1 #13 (#15) feedback Paul: fix loglevel for guest-triggered messages
v1 : add compat xlat.lst entries
v1 self: switched notification to send_guest_global_virq instead of event
v1: fix gprintk use for ARM as its defn dislikes split format strings
v1: init len variable to satisfy ARM compiler initialized checking
v1 #13 feedback Jan: rename page var
v1:#14 feedback Jan: uint8_t* -> void*
v1: #13 feedback Jan: public namespace: prefix with xen
v1: #13 feedback Jan: blank line after case op in do_argo_message_op
v1: #15 feedback Jan: add comments explaining why the writes don't overrun
v1: self: add ASSERT to support comment that overrun cannot happen
v1: self: fail on short writes where guest manipulated the iov_lens
v1: self: rename ent id to domain_id
v1: self: add moan for iov rewrite
v1. feedback #15 Jan: require the pad bits are zero
v1. feedback #15 Jan: drop NULL check in argo_signal_domain as now using VIRQ
v1. self: store domain_cookie in pending ent
v1. feedback #15 Jan: use unsigned where possible
v1. feedback Jan: use handle type for iov_base in public iov interface
v1. self: log whenever visible error occurs
v1 feedback #15, Jan: drop unnecessary mb
v1 self: only update internal tx_ptr if able to return success
         and update the visible tx_ptr
v1 self: log on failure to map ring to update visible tx_ptr
v1 feedback #15 Jan: add comment re: notification size policy
v1 self/Roger? remove errant space after sizeof
v1. feedback #15 Jan: require iov pad be zero
v1. self: rename iov_base to iov_hnd for handle in public iov interface
v1: feedback #15 Jan: handle upper-halves of hypercall args; changes some
    types in function signatures to match.
v1: self: add dprintk to sendv
v1: self: add debug output to argo_iov_count
v1. feedback #14 Jan: blank line before return in argo_iov_count
v1 feedback #15 Jan: verify src id, not override

 xen/common/argo.c          | 746 +++++++++++++++++++++++++++++++++++++++++++++
 xen/common/event_channel.c |   2 +-
 xen/include/public/argo.h  |  64 ++++
 xen/include/public/xen.h   |   2 +-
 xen/include/xen/event.h    |   7 +
 xen/include/xlat.lst       |   2 +
 6 files changed, 821 insertions(+), 2 deletions(-)

diff --git a/xen/common/argo.c b/xen/common/argo.c
index cbb17a3..ed50415 100644
--- a/xen/common/argo.c
+++ b/xen/common/argo.c
@@ -25,13 +25,17 @@
 #include <xen/guest_access.h>
 #include <xen/nospec.h>
 #include <xen/time.h>
+#include <xsm/xsm.h>
 #include <public/argo.h>
 
 #define ARGO_MAX_RINGS_PER_DOMAIN       128U
 
 DEFINE_XEN_GUEST_HANDLE(xen_argo_page_descr_t);
 DEFINE_XEN_GUEST_HANDLE(xen_argo_addr_t);
+DEFINE_XEN_GUEST_HANDLE(xen_argo_iov_t);
+DEFINE_XEN_GUEST_HANDLE(xen_argo_send_addr_t);
 DEFINE_XEN_GUEST_HANDLE(xen_argo_ring_t);
+DECLARE_XEN_GUEST_HANDLE_NULL(uint8_t);
 
 /* pfn type: 64-bit on all architectures */
 typedef uint64_t argo_pfn_t;
@@ -182,6 +186,18 @@ static DEFINE_RWLOCK(argo_lock); /* L1 */
 #endif
 
 /*
+ * notification to guests
+ */
+
+static void
+argo_signal_domain(struct domain *d)
+{
+    argo_dprintk("signalling domid:%d\n", d->domain_id);
+
+    send_guest_global_virq(d, VIRQ_ARGO_MESSAGE);
+}
+
+/*
  * ring buffer
  */
 
@@ -285,6 +301,519 @@ argo_update_tx_ptr(struct argo_ring_info *ring_info, 
uint32_t tx_ptr)
     return 0;
 }
 
+static int
+argo_memcpy_to_guest_ring(struct argo_ring_info *ring_info,
+                          uint32_t offset,
+                          const void *src,
+                          XEN_GUEST_HANDLE(uint8_t) src_hnd,
+                          uint32_t len)
+{
+    unsigned int mfns_index = offset >> PAGE_SHIFT;
+    void *dst;
+    int ret;
+    unsigned int src_offset = 0;
+
+    ASSERT(spin_is_locked(&ring_info->lock));
+
+    offset &= ~PAGE_MASK;
+
+    if ( (len > XEN_ARGO_MAX_RING_SIZE) || (offset > XEN_ARGO_MAX_RING_SIZE) )
+        return -EFAULT;
+
+    while ( (offset + len) > PAGE_SIZE )
+    {
+        unsigned int head_len = PAGE_SIZE - offset;
+
+        ret = argo_ring_map_page(ring_info, mfns_index, &dst);
+        if ( ret )
+            return ret;
+
+        if ( src )
+        {
+            memcpy(dst + offset, src + src_offset, head_len);
+            src_offset += head_len;
+        }
+        else
+        {
+            ret = copy_from_guest_errno(dst + offset, src_hnd, head_len);
+            if ( ret )
+                return ret;
+
+            guest_handle_add_offset(src_hnd, head_len);
+        }
+
+        mfns_index++;
+        len -= head_len;
+        offset = 0;
+    }
+
+    ret = argo_ring_map_page(ring_info, mfns_index, &dst);
+    if ( ret )
+    {
+        argo_dprintk("argo: ring (vm%u:%x vm%d) %p attempted to map page"
+               " %d of %d\n", ring_info->id.addr.domain_id,
+               ring_info->id.addr.port, ring_info->id.partner, ring_info,
+               mfns_index, ring_info->nmfns);
+        return ret;
+    }
+
+    if ( src )
+        memcpy(dst + offset, src + src_offset, len);
+    else
+        ret = copy_from_guest_errno(dst + offset, src_hnd, len);
+
+    return ret;
+}
+
+static int
+argo_ringbuf_get_rx_ptr(struct argo_ring_info *ring_info, uint32_t *rx_ptr)
+{
+    void *src;
+    xen_argo_ring_t *ringp;
+    int ret;
+
+    ASSERT(spin_is_locked(&ring_info->lock));
+
+    if ( !ring_info->nmfns || ring_info->nmfns < ring_info->npage )
+        return -EINVAL;
+
+    ret = argo_ring_map_page(ring_info, 0, &src);
+    if ( ret )
+        return ret;
+
+    ringp = (xen_argo_ring_t *)src;
+
+    *rx_ptr = read_atomic(&ringp->rx_ptr);
+
+    return 0;
+}
+
+/*
+ * argo_sanitize_ring creates a modified copy of the ring pointers
+ * where the rx_ptr is rounded up to ensure it is aligned, and then
+ * ring wrap is handled. Simplifies safe use of the rx_ptr for
+ * available space calculation.
+ */
+static void
+argo_sanitize_ring(xen_argo_ring_t *ring,
+                   const struct argo_ring_info *ring_info)
+{
+    uint32_t rx_ptr = ring->rx_ptr;
+
+    ring->tx_ptr = ring_info->tx_ptr;
+    ring->len = ring_info->len;
+
+    rx_ptr = XEN_ARGO_ROUNDUP(rx_ptr);
+    if ( rx_ptr >= ring_info->len )
+        rx_ptr = 0;
+
+    ring->rx_ptr = rx_ptr;
+}
+
+/*
+ * argo_iov_count returns its count on success via an out variable
+ * to avoid potential for a negative return value to be used incorrectly
+ * (eg. coerced into an unsigned variable resulting in a large incorrect value)
+ */
+static int
+argo_iov_count(XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs, unsigned long niov,
+               uint32_t *count)
+{
+    xen_argo_iov_t iov;
+    uint32_t sum_iov_lens = 0;
+    int ret;
+
+    if ( niov > XEN_ARGO_MAXIOV )
+        return -EINVAL;
+
+    while ( niov-- )
+    {
+        ret = copy_from_guest_errno(&iov, iovs, 1);
+        if ( ret )
+            return ret;
+
+        /* valid iovs must have the padding field set to zero */
+        if ( iov.pad )
+        {
+            argo_dprintk("invalid iov: padding is not zero\n");
+            return -EINVAL;
+        }
+
+        /* check each to protect sum against integer overflow */
+        if ( iov.iov_len > XEN_ARGO_MAX_RING_SIZE )
+        {
+            argo_dprintk("invalid iov_len: too big (%u)>%llu\n",
+                         iov.iov_len, XEN_ARGO_MAX_RING_SIZE);
+            return -EINVAL;
+        }
+
+        sum_iov_lens += iov.iov_len;
+
+        /*
+         * Again protect sum from integer overflow
+         * and ensure total msg size will be within bounds.
+         */
+        if ( sum_iov_lens > XEN_ARGO_MAX_MSG_SIZE )
+        {
+            argo_dprintk("invalid iov series: total message too big\n");
+            return -EINVAL;
+        }
+
+        guest_handle_add_offset(iovs, 1);
+    }
+
+    *count = sum_iov_lens;
+
+    return 0;
+}
+
+static int
+argo_ringbuf_insert(struct domain *d,
+                    struct argo_ring_info *ring_info,
+                    const struct xen_argo_ring_id *src_id,
+                    XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs,
+                    unsigned long niov, uint32_t message_type,
+                    unsigned long *out_len)
+{
+    xen_argo_ring_t ring;
+    struct xen_argo_ring_message_header mh = { 0 };
+    int32_t sp;
+    int32_t ret = 0;
+    uint32_t len = 0;
+    uint32_t iov_len;
+    uint32_t sum_iov_len = 0;
+
+    ASSERT(spin_is_locked(&ring_info->lock));
+
+    /*
+     * Obtain the total size of data to transmit -- sets the 'len' variable
+     * -- and sanity check that the iovs conform to size and number limits.
+     * Reads each iov in the array from the guest for the first time
+     * (nb: just reading the iov structs, not the actual data to transmit).
+     * Enforced below: Once the value 'len' has been determined, no more than
+     * 'len' bytes of guest data (plus the message header) will be sent in this
+     * operation.
+     *
+     * len is used to determine that sufficient space exists in the destination
+     * ring for the message -- aborting the send with EAGAIN if not --
+     * to enable populating the message size field in message header, and for
+     * bounds checking while performing the data transmission.
+     */
+    ret = argo_iov_count(iovs, niov, &len);
+    if ( ret )
+        goto out;
+
+    if ( ((XEN_ARGO_ROUNDUP(len) +
+            sizeof(struct xen_argo_ring_message_header)) >= ring_info->len) ||
+         (len > XEN_ARGO_MAX_MSG_SIZE) )
+    {
+        ret = -EMSGSIZE;
+        goto out;
+    }
+
+    ret = argo_ringbuf_get_rx_ptr(ring_info, &ring.rx_ptr);
+    if ( ret )
+        goto out;
+
+    argo_sanitize_ring(&ring, ring_info);
+
+    argo_dprintk("ring.tx_ptr=%d ring.rx_ptr=%d ring.len=%d"
+                 " ring_info->tx_ptr=%d\n",
+                 ring.tx_ptr, ring.rx_ptr, ring.len, ring_info->tx_ptr);
+
+    if ( ring.rx_ptr == ring.tx_ptr )
+        sp = ring_info->len;
+    else
+    {
+        sp = ring.rx_ptr - ring.tx_ptr;
+        if ( sp < 0 )
+            sp += ring.len;
+    }
+
+    if ( (XEN_ARGO_ROUNDUP(len) +
+            sizeof(struct xen_argo_ring_message_header)) >= sp )
+    {
+        argo_dprintk("EAGAIN\n");
+        ret = -EAGAIN;
+        goto out;
+    }
+
+    mh.len = len + sizeof(struct xen_argo_ring_message_header);
+    mh.source.port = src_id->addr.port;
+    mh.source.domain_id = src_id->addr.domain_id;
+    mh.message_type = message_type;
+
+    /*
+     * For this copy to the guest ring, tx_ptr is always 16-byte aligned
+     * and the message header is 16 bytes long.
+     */
+    BUILD_BUG_ON(
+        sizeof(struct xen_argo_ring_message_header) != XEN_ARGO_ROUNDUP(1));
+
+    /*
+     * First data write into the destination ring: fixed size, message header.
+     * This cannot overrun because the available free space (value in 'sp')
+     * is checked above and must be at least this size.
+     */
+    ret = argo_memcpy_to_guest_ring(ring_info,
+                                    ring.tx_ptr + sizeof(xen_argo_ring_t),
+                                    &mh,
+                                    XEN_GUEST_HANDLE_NULL(uint8_t),
+                                    sizeof(mh));
+    if ( ret )
+    {
+        gprintk(XENLOG_ERR,
+                "argo: failed to write message header to ring (vm%u:%x 
vm%d)\n",
+                ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                ring_info->id.partner);
+
+        goto out;
+    }
+
+    ring.tx_ptr += sizeof(mh);
+    if ( ring.tx_ptr == ring_info->len )
+        ring.tx_ptr = 0;
+
+    while ( niov-- )
+    {
+        XEN_GUEST_HANDLE_64(uint8_t) buf_hnd;
+        xen_argo_iov_t iov;
+
+        /*
+         * This is the second read of the iov from the guest
+         * -- see comments inline below.
+         */
+        ret = copy_from_guest_errno(&iov, iovs, 1);
+        if ( ret )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: failed to re-read iov (vm%u:%x vm%d)\n",
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            goto out;
+        }
+
+        /* Reserve the padding bits: require that they must be zero */
+        if ( iov.pad != 0 )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: iov.pad reserved bits != 0 ring (vm%u:%x vm%d)\n",
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            ret = -EINVAL;
+            goto out;
+        }
+
+        buf_hnd = iov.iov_hnd;
+        iov_len = iov.iov_len;
+
+        /* If no data is provided in this iov, moan and skip on to the next */
+        if ( !iov_len )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: no data iov_len=0 iov_hnd=%p ring (vm%u:%x vm%d)\n",
+                    buf_hnd.p,
+                    ring_info->id.addr.domain_id,
+                    ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            guest_handle_add_offset(iovs, 1);
+            continue;
+        }
+
+        /*
+         * The iov lens could have been modified since the first read above but
+         * each is checked again for continued conformance against
+         * XEN_ARGO_MAX_MSG_SIZE here:
+         */
+        if ( iov_len > XEN_ARGO_MAX_MSG_SIZE )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: iov len %"PRIx32" too big, ring (vm%u:%x vm%d)\n",
+                    iov_len,
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            ret = -EINVAL;
+            goto out;
+        }
+
+        /*
+         * and the running total of data processed ('sum_iov_len') is checked
+         * against 'len', which we counted at the beginning with the first iov
+         * read, so the total data provided cannot exceed that limit:
+         * if it does, transmission is aborted.
+         */
+        sum_iov_len += iov_len;
+        if ( sum_iov_len > len )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: len increased %"PRIx32":%"PRIx32" (vm%u:%x vm%d)\n",
+                    len, sum_iov_len,
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            ret = -EINVAL;
+            goto out;
+        }
+
+        if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: bad iov handle [%p, %"PRIx32"] (vm%u:%x vm%d)\n",
+                    buf_hnd.p, iov_len,
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            ret = -EFAULT;
+            goto out;
+        }
+
+        sp = ring.len - ring.tx_ptr;
+
+        /* Check: iov data size versus free space at the tail of the ring */
+        if ( iov_len > sp )
+        {
+            /*
+             * Second possible data write: ring-tail-wrap-write.
+             * Populate the ring tail and update the internal tx_ptr to handle
+             * wrapping at the end of ring.
+             * Size of data written here: sp
+             * which is the exact full amount of free space available at the
+             * tail of the ring, so this cannot overrun.
+             */
+            ret = argo_memcpy_to_guest_ring(ring_info,
+                                            ring.tx_ptr +
+                                                sizeof(xen_argo_ring_t),
+                                            NULL, buf_hnd, sp);
+            if ( ret )
+            {
+                gprintk(XENLOG_ERR,
+                        "argo: failed to copy {%p, %"PRIx32"} (vm%u:%x 
vm%d)\n",
+                        buf_hnd.p, sp,
+                        ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                        ring_info->id.partner);
+
+                goto out;
+            }
+
+            ring.tx_ptr = 0;
+            iov_len -= sp;
+            guest_handle_add_offset(buf_hnd, sp);
+
+            ASSERT(iov_len <= ring.len);
+        }
+
+        /*
+         * Third possible data write: all data remaining for this iov.
+         * Size of data written here: iov_len
+         *
+         * Case 1: if the ring-tail-wrap-write above was performed, then
+         *         iov_len has been decreased by 'sp' and ring.tx_ptr is zero.
+         *
+         *    We know from the first pass of iov_len counting:
+         *      len + sizeof(message_header) <= ring.len
+         *    We also know that the running total, sum_iov_len (which has been
+         *    incremented by each iov_len when they are read the second time)
+         *    cannot exceed len here -- it is bounds checked above -- so both
+         *    these must be true:
+         *       (iov_len <= sum_iov_len) && (sum_iov_len <= len)
+         *    so by transitivity:
+         *       iov_len <= sum_iov_len <= len <= (ring.len - 
sizeof(msgheader))
+         *    and therefore:
+         *       (iov_len + sizeof(msgheader) <= ring.len) && (ring.tx_ptr == 
0)
+         *    so this write cannot overrun here.
+         *
+         * Case 2: ring-tail-wrap-write above was not performed
+         *    -> so iov_len is the guest-supplied value and: (iov_len <= sp)
+         *    ie. less than available space at the tail of the ring:
+         *        so this write cannot overrun.
+         */
+
+        ret = argo_memcpy_to_guest_ring(ring_info,
+                                        ring.tx_ptr + sizeof(xen_argo_ring_t),
+                                        NULL, buf_hnd, iov_len);
+        if ( ret )
+        {
+            gprintk(XENLOG_ERR,
+                    "argo: failed to copy [%p, %"PRIx32"] (vm%u:%x vm%d)\n",
+                    buf_hnd.p, iov_len,
+                    ring_info->id.addr.domain_id, ring_info->id.addr.port,
+                    ring_info->id.partner);
+
+            goto out;
+        }
+
+        ring.tx_ptr += iov_len;
+
+        if ( ring.tx_ptr == ring_info->len )
+            ring.tx_ptr = 0;
+
+        guest_handle_add_offset(iovs, 1);
+    }
+
+    /*
+     * If the guest decided to lower the values in its copy of the iov_lens
+     * between the first read by the hypervisor and the second, a) it's being
+     * rude and b) the effect is that we have performed a short write: the
+     * private ring.tx_ptr will have been updated correctly for size of data
+     * written but the length written in the message header will not match the
+     * tx_ptr increment or the length of data actually copied into the ring.
+     *
+     * A short write could also occur if a bad iov was introduced, such as one
+     * with a iov_len exceeding XEN_ARGO_MAX_MSG_SIZE, or with a data pointer
+     * that turned out to be invalid, triggering an early exit from the iov
+     * processing loop above -- those cases would 'goto out' above.
+     *
+     * So: check the two summed iov lengths and if they mismatch, return error
+     * and do not update the guest-visible tx_ptr (ie. count this as abort).
+     */
+    if ( sum_iov_len != len )
+    {
+        gprintk(XENLOG_ERR,
+          "argo: iov modified: sum_iov_len(%u) != len(%u) ring(vm%u:%x 
vm%d)\n",
+                sum_iov_len, len,
+                ring_info->id.addr.domain_id,
+                ring_info->id.addr.port, ring_info->id.partner);
+
+        ret = -EINVAL;
+        goto out;
+    }
+
+    ring.tx_ptr = XEN_ARGO_ROUNDUP(ring.tx_ptr);
+
+    if ( ring.tx_ptr >= ring_info->len )
+        ring.tx_ptr -= ring_info->len;
+
+    ret = argo_update_tx_ptr(ring_info, ring.tx_ptr);
+    if ( ret )
+    {
+        gprintk(XENLOG_ERR,
+                "argo: failed to update tx_ptr ring(vm%u:%x vm%d)\n",
+                ring_info->id.addr.domain_id,
+                ring_info->id.addr.port, ring_info->id.partner);
+        goto out;
+    }
+
+ out:
+    /*
+     * At this point it is possible to unmap the ring_info, ie:
+     *   argo_ring_unmap(ring_info);
+     * but performance should be improved by not doing so, and retaining
+     * the mapping.
+     * An XSM policy control over level of confidentiality required
+     * versus performance cost could be added to decide that here.
+     * See the similar comment in argo_ring_map_page re: write-only mappings.
+     */
+
+    if ( !ret )
+        *out_len = len;
+
+    return ret;
+}
+
 /*
  * pending
  */
@@ -306,6 +835,61 @@ argo_pending_remove_all(struct argo_ring_info *ring_info)
         argo_pending_remove_ent(pending_ent);
 }
 
+static int
+argo_pending_queue(struct argo_ring_info *ring_info, domid_t src_id,
+                   uint64_t src_cookie, unsigned int len)
+{
+    struct argo_pending_ent *ent;
+
+    ASSERT(spin_is_locked(&ring_info->lock));
+
+    ent = xmalloc(struct argo_pending_ent);
+
+    if ( !ent )
+        return -ENOMEM;
+
+    ent->len = len;
+    ent->domain_id = src_id;
+    ent->domain_cookie = src_cookie;
+
+    hlist_add_head(&ent->node, &ring_info->pending);
+
+    return 0;
+}
+
+static int
+argo_pending_requeue(struct argo_ring_info *ring_info, domid_t src_id,
+                     uint64_t src_cookie, unsigned int len)
+{
+    struct hlist_node *node;
+    struct argo_pending_ent *ent;
+
+    ASSERT(spin_is_locked(&ring_info->lock));
+
+    hlist_for_each_entry(ent, node, &ring_info->pending, node)
+    {
+        if ( (ent->domain_id == src_id) &&
+             (ent->domain_cookie == src_cookie) )
+        {
+            /*
+             * Reuse an existing queue entry for a notification rather than add
+             * another. If the existing entry is waiting for a smaller size 
than
+             * the current message then adjust the record to wait for the
+             * current (larger) size to be available before triggering a
+             * notification.
+             * This assists the waiting sender by ensuring that whenever a
+             * notification is triggered, there is sufficient space available
+             * for (at least) any one of the messages awaiting transmission.
+             */
+            if ( ent->len < len )
+                ent->len = len;
+            return 0;
+        }
+    }
+
+    return argo_pending_queue(ring_info, src_id, src_cookie, len);
+}
+
 static void argo_ring_remove_mfns(const struct domain *d,
                                   struct argo_ring_info *ring_info)
 {
@@ -565,6 +1149,28 @@ argo_ring_find_info(const struct domain *d, const struct 
xen_argo_ring_id *id)
     return NULL;
 }
 
+static struct argo_ring_info *
+argo_ring_find_info_by_match(const struct domain *d, uint32_t port,
+                             domid_t partner_id, uint64_t partner_cookie)
+{
+    xen_argo_ring_id_t id;
+    struct argo_ring_info *ring_info;
+
+    ASSERT(rw_is_locked(&d->argo->lock));
+
+    id.addr.port = port;
+    id.addr.domain_id = d->domain_id;
+    id.partner = partner_id;
+
+    ring_info = argo_ring_find_info(d, &id);
+    if ( ring_info && (partner_cookie == ring_info->partner_cookie) )
+        return ring_info;
+
+    id.partner = XEN_ARGO_DOMID_ANY;
+
+    return argo_ring_find_info(d, &id);
+}
+
 static long
 argo_unregister_ring(struct domain *currd,
                      XEN_GUEST_HANDLE_PARAM(xen_argo_ring_t) ring_hnd)
@@ -889,6 +1495,112 @@ argo_register_ring(struct domain *currd,
     return ret;
 }
 
+/*
+ * io
+ */
+
+static long
+argo_sendv(struct domain *src_d, const xen_argo_addr_t *src_addr,
+           const xen_argo_addr_t *dst_addr,
+           XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs,
+           unsigned long niov, uint32_t message_type)
+{
+    struct domain *dst_d = NULL;
+    struct xen_argo_ring_id src_id;
+    struct argo_ring_info *ring_info;
+    int ret = 0;
+    unsigned long len = 0;
+
+    ASSERT(src_d->domain_id == src_addr->domain_id);
+
+    argo_dprintk("sendv: (%d:%x)->(%d:%x) niov:%lu iov:%p type:%u\n",
+                 src_addr->domain_id, src_addr->port,
+                 dst_addr->domain_id, dst_addr->port,
+                 niov, iovs.p, message_type);
+
+    read_lock(&argo_lock);
+
+    if ( !src_d->argo )
+    {
+        ret = -ENODEV;
+        goto out_unlock;
+    }
+
+    src_id.addr.pad = 0;
+    src_id.addr.port = src_addr->port;
+    src_id.addr.domain_id = src_d->domain_id;
+    src_id.partner = dst_addr->domain_id;
+
+    dst_d = get_domain_by_id(dst_addr->domain_id);
+    if ( !dst_d )
+    {
+        argo_dprintk("!dst_d, ESRCH\n");
+        ret = -ESRCH;
+        goto out_unlock;
+    }
+
+    if ( !dst_d->argo )
+    {
+        argo_dprintk("!dst_d->argo, ECONNREFUSED\n");
+        ret = -ECONNREFUSED;
+        goto out_unlock;
+    }
+
+    ret = xsm_argo_send(src_d, dst_d);
+    if ( ret )
+    {
+        gprintk(XENLOG_ERR, "argo: XSM REJECTED %i -> %i\n",
+                src_addr->domain_id, dst_addr->domain_id);
+        goto out_unlock;
+    }
+
+    read_lock(&dst_d->argo->lock);
+
+    ring_info = argo_ring_find_info_by_match(dst_d, dst_addr->port,
+                                             src_addr->domain_id,
+                                             src_d->argo->domain_cookie);
+    if ( !ring_info )
+    {
+        gprintk(XENLOG_ERR,
+                "argo: vm%u connection refused, src (vm%u:%x) dst (vm%u:%x)\n",
+                current->domain->domain_id,
+                src_id.addr.domain_id, src_id.addr.port,
+                dst_addr->domain_id, dst_addr->port);
+
+        ret = -ECONNREFUSED;
+        goto out_unlock2;
+    }
+
+    spin_lock(&ring_info->lock);
+
+    ret = argo_ringbuf_insert(dst_d, ring_info, &src_id,
+                              iovs, niov, message_type, &len);
+    if ( ret == -EAGAIN )
+    {
+        argo_dprintk("argo_ringbuf_sendv failed, EAGAIN\n");
+        /* requeue to issue a notification when space is there */
+        if ( argo_pending_requeue(ring_info, src_addr->domain_id,
+                                  src_d->argo->domain_cookie, len) )
+             ret = -ENOMEM;
+    }
+
+    spin_unlock(&ring_info->lock);
+
+    if ( ret >= 0 )
+        argo_signal_domain(dst_d);
+
+ out_unlock2:
+    read_unlock(&dst_d->argo->lock);
+
+ out_unlock:
+    if ( dst_d )
+        put_domain(dst_d);
+
+    read_unlock(&argo_lock);
+
+    return ( ret < 0 ) ? ret : len;
+}
+
 long
 do_argo_message_op(unsigned int cmd, XEN_GUEST_HANDLE_PARAM(void) arg1,
                    XEN_GUEST_HANDLE_PARAM(void) arg2,
@@ -958,6 +1670,40 @@ do_argo_message_op(unsigned int cmd, 
XEN_GUEST_HANDLE_PARAM(void) arg1,
         break;
     }
 
+    case XEN_ARGO_MESSAGE_OP_sendv:
+    {
+        xen_argo_send_addr_t send_addr;
+
+        XEN_GUEST_HANDLE_PARAM(xen_argo_send_addr_t) send_addr_hnd =
+            guest_handle_cast(arg1, xen_argo_send_addr_t);
+        XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs =
+            guest_handle_cast(arg2, xen_argo_iov_t);
+        /* arg3 is niov */
+        /* arg4 is message_type */
+
+        if ( unlikely(!guest_handle_okay(send_addr_hnd, 1)) )
+            break;
+        rc = copy_from_guest_errno(&send_addr, send_addr_hnd, 1);
+        if ( rc )
+            break;
+
+        if ( unlikely((arg3 > XEN_ARGO_MAXIOV) || (arg4 & ~0xffffffffUL)) )
+        {
+            rc = -EINVAL;
+            break;
+        }
+
+        if ( unlikely(send_addr.src.domain_id != currd->domain_id) )
+        {
+            rc = -EPERM;
+            break;
+        }
+
+        rc = argo_sendv(currd, &send_addr.src, &send_addr.dst,
+                        iovs, arg3, arg4);
+        break;
+    }
+
     default:
         rc = -EOPNOTSUPP;
         break;
diff --git a/xen/common/event_channel.c b/xen/common/event_channel.c
index f34d4f0..6fbe346 100644
--- a/xen/common/event_channel.c
+++ b/xen/common/event_channel.c
@@ -746,7 +746,7 @@ void send_guest_vcpu_virq(struct vcpu *v, uint32_t virq)
     spin_unlock_irqrestore(&v->virq_lock, flags);
 }
 
-static void send_guest_global_virq(struct domain *d, uint32_t virq)
+void send_guest_global_virq(struct domain *d, uint32_t virq)
 {
     unsigned long flags;
     int port;
diff --git a/xen/include/public/argo.h b/xen/include/public/argo.h
index 24696e2..d075930 100644
--- a/xen/include/public/argo.h
+++ b/xen/include/public/argo.h
@@ -42,6 +42,33 @@
 #define XEN_ARGO_MAX_RING_SIZE  (0x1000000ULL)
 
 /*
+ * XEN_ARGO_MAXIOV : maximum number of iovs accepted in a single sendv.
+ * Rationale for the value:
+ * The Linux argo driver never passes more than two iovs.
+ * Linux defines UIO_MAXIOV as 1024.
+ * POSIX mandates at least 16 -- not that this is a POSIX API of course.
+ *
+ * Limit the total amount of data posted in a single argo operation to
+ * no more than 2^31 bytes to reduce risk of integer overflow defects.
+ * Each argo iov can hold ~ 2^24 bytes, so set ARGO_MAXIOV to 2^(31-24),
+ * minus one to enable simple efficient bounds checking via masking: 127.
+*/
+#define XEN_ARGO_MAXIOV          127U
+
+DEFINE_XEN_GUEST_HANDLE(uint8_t);
+
+typedef struct xen_argo_iov
+{
+#ifdef XEN_GUEST_HANDLE_64
+    XEN_GUEST_HANDLE_64(uint8_t) iov_hnd;
+#else
+    uint64_t iov_hnd;
+#endif
+    uint32_t iov_len;
+    uint32_t pad;
+} xen_argo_iov_t;
+
+/*
  * Page descriptor: encoding both page address and size in a 64-bit value.
  * Intended to allow ABI to support use of different granularity pages.
  * example of how to populate:
@@ -59,6 +86,12 @@ typedef struct xen_argo_addr
     uint16_t pad;
 } xen_argo_addr_t;
 
+typedef struct xen_argo_send_addr
+{
+    xen_argo_addr_t src;
+    xen_argo_addr_t dst;
+} xen_argo_send_addr_t;
+
 typedef struct xen_argo_ring_id
 {
     xen_argo_addr_t addr;
@@ -150,4 +183,35 @@ struct xen_argo_ring_message_header
  */
 #define XEN_ARGO_MESSAGE_OP_unregister_ring     2
 
+/*
+ * XEN_ARGO_MESSAGE_OP_sendv
+ *
+ * Send a list of buffers contained in iovs.
+ *
+ * The send address struct specifies the source and destination addresses
+ * for the message being sent, which are used to find the destination ring:
+ * Xen first looks for a most-specific match with a registered ring with
+ *  (id.addr == dst) and (id.partner == sending_domain) ;
+ * if that fails, it then looks for a wildcard match (aka multicast receiver)
+ * where (id.addr == dst) and (id.partner == DOMID_ANY).
+ *
+ * For each iov entry, send iov_len bytes from iov_base to the destination 
ring.
+ * If insufficient space exists in the destination ring, it will return -EAGAIN
+ * and Xen will notify the caller when sufficient space becomes available.
+ *
+ * The message type is a 32-bit data field available to communicate message
+ * context data (eg. kernel-to-kernel, rather than application layer).
+ *
+ * arg1: XEN_GUEST_HANDLE(xen_argo_send_addr_t) source and dest addresses
+ * arg2: XEN_GUEST_HANDLE(xen_argo_iov_t) iovs
+ * arg3: unsigned long niov
+ * arg4: unsigned long message type
+ */
+#define XEN_ARGO_MESSAGE_OP_sendv               5
+
+/* The maximum size of a guest message that may be sent on an Argo ring. */
+#define XEN_ARGO_MAX_MSG_SIZE ((XEN_ARGO_MAX_RING_SIZE) - \
+        (sizeof(struct xen_argo_ring_message_header)) - \
+        XEN_ARGO_ROUNDUP(1))
+
 #endif
diff --git a/xen/include/public/xen.h b/xen/include/public/xen.h
index 5f4f760..efd65c4 100644
--- a/xen/include/public/xen.h
+++ b/xen/include/public/xen.h
@@ -178,7 +178,7 @@ DEFINE_XEN_GUEST_HANDLE(xen_ulong_t);
 #define VIRQ_CON_RING   8  /* G. (DOM0) Bytes received on console            */
 #define VIRQ_PCPU_STATE 9  /* G. (DOM0) PCPU state changed                   */
 #define VIRQ_MEM_EVENT  10 /* G. (DOM0) A memory event has occurred          */
-#define VIRQ_XC_RESERVED 11 /* G. Reserved for XenClient                     */
+#define VIRQ_ARGO_MESSAGE 11 /* G. Argo interdomain message notification     */
 #define VIRQ_ENOMEM     12 /* G. (DOM0) Low on heap memory       */
 #define VIRQ_XENPMU     13 /* V.  PMC interrupt                              */
 
diff --git a/xen/include/xen/event.h b/xen/include/xen/event.h
index ebb879e..4650887 100644
--- a/xen/include/xen/event.h
+++ b/xen/include/xen/event.h
@@ -29,6 +29,13 @@ void send_guest_vcpu_virq(struct vcpu *v, uint32_t virq);
 void send_global_virq(uint32_t virq);
 
 /*
+ * send_guest_global_virq:
+ *  @d:        Domain to which VIRQ should be sent
+ *  @virq:     Virtual IRQ number (VIRQ_*), must be global
+ */
+void send_guest_global_virq(struct domain *d, uint32_t virq);
+
+/*
  * sent_global_virq_handler: Set a global VIRQ handler.
  *  @d:        New target domain for this VIRQ
  *  @virq:     Virtual IRQ number (VIRQ_*), must be global
diff --git a/xen/include/xlat.lst b/xen/include/xlat.lst
index 33e4fd1..742b546 100644
--- a/xen/include/xlat.lst
+++ b/xen/include/xlat.lst
@@ -151,3 +151,5 @@
 ?      argo_addr                       argo.h
 ?      argo_ring_id                    argo.h
 ?      argo_ring                       argo.h
+?      argo_iov                        argo.h
+?      argo_send_addr                  argo.h
-- 
2.7.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.