[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH v3 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq
to.On Mon, Jan 7, 2019 at 8:44 AM Christopher Clark <christopher.w.clark@xxxxxxxxx> wrote: > > sendv operation is invoked to perform a synchronous send of buffers > contained in iovs to a remote domain's registered ring. > > It takes: > * A destination address (domid, port) for the ring to send to. > It performs a most-specific match lookup, to allow for wildcard. > * A source address, used to inform the destination of where to reply. > * The address of an array of iovs containing the data to send > * .. and the length of that array of iovs > * and a 32-bit message type, available to communicate message context > data (eg. kernel-to-kernel, separate from the application data). > > If insufficient space exists in the destination ring, it will return > -EAGAIN and Xen will notify the caller when sufficient space becomes > available. > > Accesses to the ring indices are appropriately atomic. The rings are > mapped into Xen's private address space to write as needed and the > mappings are retained for later use. > > Fixed-size types are used in some areas within this code where caution > around avoiding integer overflow is important. > > Notifications are sent to guests via VIRQ and send_guest_global_virq is > exposed in the change to enable argo to call it. VIRQ_ARGO_MESSAGE is > claimed from the VIRQ previously reserved for this purpose (#11). > > The VIRQ notification method is used rather than sending events using > evtchn functions directly because: > > * no current event channel type is an exact fit for the intended > behaviour. ECS_IPI is closest, but it disallows migration to > other VCPUs which is not necessarily a requirement for Argo. > > * at the point of argo_init, allocation of an event channel is > complicated by none of the guest VCPUs being initialized yet > and the event channel logic expects that a valid event channel > has a present VCPU. > > * at the point of signalling a notification, the VIRQ logic is already > defensive: if d->vcpu[0] is NULL, the notification is just silently > dropped, whereas the evtchn_send logic is not so defensive: vcpu[0] > must not be NULL, otherwise a null pointer dereference occurs. > > Using a VIRQ removes the need for the guest to query to determine which > event channel notifications will be delivered on. This is also likely to > simplify establishing future L0/L1 nested hypervisor argo communication. > > Signed-off-by: Christopher Clark <christopher.clark6@xxxxxxxxxxxxxx> > --- > The previous double-read of iovs from guest memory has been removed. > > v2 self: use ring_info backpointer in pending_ent to maintain npending > v2 feedback Jan: drop cookie, implement teardown > v2 self: pending_queue: reap stale ents when in need of space > v2 self: pending_requeue: reclaim ents for stale domains > v2.feedback Jan: only override sender domid if DOMID_ANY > v2 feedback Jan: drop message from argo_message_op > v2 self: check npending vs maximum limit > v2 self: get_sanitized_ring instead of get_rx_ptr > v2 feedback v1#13 Jan: remove double read from ringbuf insert, lower MAX_IOV > v2 self: make iov_count const > v2 self: iov_count : return EMSGSIZE for message too big > v2 self: OVERHAUL > v2 self: s/argo_pending_ent/pending_ent/g > v2 feedback v1#13 Roger: use OS-supplied roundup; drop from public header > v1,2 feedback Jan/Roger/Paul: drop errno returning guest access functions > v1 feedback Roger, Jan: drop argo prefix on static functions > v1 feedback #13 Jan: drop guest_handle_okay when using copy_from_guest > - reorder do_argo_op logic > v2 self: add _hnd suffix to iovs variable name to indicate guest handle type > v2 self: replace use of XEN_GUEST_HANDLE_NULL with two existing macros > > v1 #15 feedback, Jan: sendv op : s/ECONNREFUSED/ESRCH/ > v1 #5 (#15) feedback Paul: sendv: use currd in do_argo_message_op > v1 #13 (#15) feedback Paul: sendv op: do/while reindent only > v1 #13 (#15) feedback Paul: sendv op: do/while: argo_ringbuf_insert to goto > style > v1 #13 (#15) feedback Paul: sendv op: do/while: reindent only again > v1 #13 (#15) feedback Paul: sendv op: do/while : goto > v1 #15 feedback Paul: sendv op: make page var: unsigned > v1 #15 feedback Paul: sendv op: new local var for PAGE_SIZE - offset > v1 #8 feedback Jan: XEN_GUEST_HANDLE : C89 compliance > v1 rebase after switching register op from pfns to page descriptors > v1 self: move iov DEFINE_XEN_GUEST_HANDLE out of public header into argo.c > v1 #13 (#15) feedback Paul: fix loglevel for guest-triggered messages > v1 : add compat xlat.lst entries > v1 self: switched notification to send_guest_global_virq instead of event > v1: fix gprintk use for ARM as its defn dislikes split format strings > v1: init len variable to satisfy ARM compiler initialized checking > v1 #13 feedback Jan: rename page var > v1:#14 feedback Jan: uint8_t* -> void* > v1: #13 feedback Jan: public namespace: prefix with xen > v1: #13 feedback Jan: blank line after case op in do_argo_message_op > v1: #15 feedback Jan: add comments explaining why the writes don't overrun > v1: self: add ASSERT to support comment that overrun cannot happen > v1: self: fail on short writes where guest manipulated the iov_lens > v1: self: rename ent id to domain_id > v1: self: add moan for iov rewrite > v1. feedback #15 Jan: require the pad bits are zero > v1. feedback #15 Jan: drop NULL check in argo_signal_domain as now using VIRQ > v1. self: store domain_cookie in pending ent > v1. feedback #15 Jan: use unsigned where possible > v1. feedback Jan: use handle type for iov_base in public iov interface > v1. self: log whenever visible error occurs > v1 feedback #15, Jan: drop unnecessary mb > v1 self: only update internal tx_ptr if able to return success > and update the visible tx_ptr > v1 self: log on failure to map ring to update visible tx_ptr > v1 feedback #15 Jan: add comment re: notification size policy > v1 self/Roger? remove errant space after sizeof > v1. feedback #15 Jan: require iov pad be zero > v1. self: rename iov_base to iov_hnd for handle in public iov interface > v1: feedback #15 Jan: handle upper-halves of hypercall args; changes some > types in function signatures to match. > v1: self: add dprintk to sendv > v1: self: add debug output to argo_iov_count > v1. feedback #14 Jan: blank line before return in argo_iov_count > v1 feedback #15 Jan: verify src id, not override > > xen/common/argo.c | 653 > +++++++++++++++++++++++++++++++++++++++++++++ > xen/common/event_channel.c | 2 +- > xen/include/public/argo.h | 60 +++++ > xen/include/public/xen.h | 2 +- > xen/include/xen/event.h | 7 + > xen/include/xlat.lst | 2 + > 6 files changed, 724 insertions(+), 2 deletions(-) > > diff --git a/xen/common/argo.c b/xen/common/argo.c > index 59ce8c4..4548435 100644 > --- a/xen/common/argo.c > +++ b/xen/common/argo.c > @@ -29,14 +29,21 @@ > #include <public/argo.h> > > #define MAX_RINGS_PER_DOMAIN 128U > +#define MAX_PENDING_PER_RING 32U > > /* All messages on the ring are padded to a multiple of the slot size. */ > #define ROUNDUP_MESSAGE(a) (ROUNDUP((a), XEN_ARGO_MSG_SLOT_SIZE)) > > +/* The maximum size of a message that may be sent on the largest Argo ring. > */ > +#define MAX_ARGO_MESSAGE_SIZE ((XEN_ARGO_MAX_RING_SIZE) - \ > + (sizeof(struct xen_argo_ring_message_header)) - ROUNDUP_MESSAGE(1)) > + > DEFINE_XEN_GUEST_HANDLE(xen_argo_addr_t); > +DEFINE_XEN_GUEST_HANDLE(xen_argo_iov_t); > DEFINE_XEN_GUEST_HANDLE(xen_argo_page_descr_t); > DEFINE_XEN_GUEST_HANDLE(xen_argo_register_ring_t); > DEFINE_XEN_GUEST_HANDLE(xen_argo_ring_t); > +DEFINE_XEN_GUEST_HANDLE(xen_argo_send_addr_t); > DEFINE_XEN_GUEST_HANDLE(xen_argo_unregister_ring_t); > > /* Xen command line option to enable argo */ > @@ -250,6 +257,14 @@ hash_index(const struct argo_ring_id *id) > } > > static void > +signal_domain(struct domain *d) > +{ > + argo_dprintk("signalling domid:%d\n", d->domain_id); > + > + send_guest_global_virq(d, VIRQ_ARGO_MESSAGE); > +} > + > +static void > ring_unmap(struct argo_ring_info *ring_info) > { > unsigned int i; > @@ -342,6 +357,413 @@ update_tx_ptr(struct argo_ring_info *ring_info, > uint32_t tx_ptr) > smp_wmb(); > } > > +static int > +memcpy_to_guest_ring(struct argo_ring_info *ring_info, uint32_t offset, > + const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd, > + uint32_t len) > +{ > + unsigned int mfns_index = offset >> PAGE_SHIFT; > + void *dst; > + int ret; > + unsigned int src_offset = 0; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + offset &= ~PAGE_MASK; > + > + if ( (len > XEN_ARGO_MAX_RING_SIZE) || (offset > XEN_ARGO_MAX_RING_SIZE) > ) > + return -EFAULT; > + > + while ( (offset + len) > PAGE_SIZE ) I think you could map the whole ring in contiguous virtual address space and then writing to it would be much more easy, you wouldn't need to iterate with memcpy or copy_from_guest, take a look at __vmap. You could likely map this when the ring gets setup and keep it mapped for the lifetime of the ring. > + { > + unsigned int head_len = PAGE_SIZE - offset; > + > + ret = ring_map_page(ring_info, mfns_index, &dst); > + if ( ret ) > + return ret; > + > + if ( src ) > + { > + memcpy(dst + offset, src + src_offset, head_len); > + src_offset += head_len; > + } > + else > + { > + ret = copy_from_guest(dst + offset, src_hnd, head_len) ? > + -EFAULT : 0; > + if ( ret ) > + return ret; You can simplify this to: if ( copy_from_guest(...) ) return -EFAULT; > + > + guest_handle_add_offset(src_hnd, head_len); > + } > + > + mfns_index++; > + len -= head_len; > + offset = 0; > + } > + > + ret = ring_map_page(ring_info, mfns_index, &dst); > + if ( ret ) > + { > + argo_dprintk("argo: ring (vm%u:%x vm%d) %p attempted to map page" > + " %d of %d\n", ring_info->id.domain_id, > ring_info->id.port, > + ring_info->id.partner_id, ring_info, mfns_index, > + ring_info->nmfns); > + return ret; > + } > + > + if ( src ) > + memcpy(dst + offset, src + src_offset, len); > + else > + ret = copy_from_guest(dst + offset, src_hnd, len) ? -EFAULT : 0; > + > + return ret; > +} > + > +/* > + * Use this with caution: rx_ptr is under guest control and may be bogus. > + * See get_sanitized_ring for a safer alternative. > + */ > +static int > +get_rx_ptr(struct argo_ring_info *ring_info, uint32_t *rx_ptr) > +{ > + void *src; > + xen_argo_ring_t *ringp; > + int ret; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + if ( !ring_info->nmfns || ring_info->nmfns < ring_info->npage ) > + return -EINVAL; > + > + ret = ring_map_page(ring_info, 0, &src); > + if ( ret ) > + return ret; > + > + ringp = (xen_argo_ring_t *)src; > + > + *rx_ptr = read_atomic(&ringp->rx_ptr); > + > + return 0; > +} > + > +/* > + * get_sanitized_ring creates a modified copy of the ring pointers where > + * the rx_ptr is rounded up to ensure it is aligned, and then ring > + * wrap is handled. Simplifies safe use of the rx_ptr for available > + * space calculation. > + */ > +static int > +get_sanitized_ring(xen_argo_ring_t *ring, struct argo_ring_info *ring_info) > +{ > + uint32_t rx_ptr; > + int ret; > + > + ret = get_rx_ptr(ring_info, &rx_ptr); > + if ( ret ) > + return ret; > + > + ring->tx_ptr = ring_info->tx_ptr; > + > + rx_ptr = ROUNDUP_MESSAGE(rx_ptr); > + if ( rx_ptr >= ring_info->len ) > + rx_ptr = 0; > + > + ring->rx_ptr = rx_ptr; Newline. > + return 0; > +} > + > +/* > + * iov_count returns its count on success via an out variable to avoid > + * potential for a negative return value to be used incorrectly > + * (eg. coerced into an unsigned variable resulting in a large incorrect > value) > + */ > +static int > +iov_count(const xen_argo_iov_t *piov, unsigned long niov, uint32_t *count) > +{ > + uint32_t sum_iov_lens = 0; > + > + if ( niov > XEN_ARGO_MAXIOV ) > + return -EINVAL; > + > + while ( niov-- ) I would use a for loop here, that would remove the need to piov++, if you want to keep it quite similar: for ( ; niov--; piov++ ) { ... > + { > + /* valid iovs must have the padding field set to zero */ > + if ( piov->pad ) > + { > + argo_dprintk("invalid iov: padding is not zero\n"); > + return -EINVAL; > + } > + > + /* check each to protect sum against integer overflow */ > + if ( piov->iov_len > XEN_ARGO_MAX_RING_SIZE ) > + { > + argo_dprintk("invalid iov_len: too big (%u)>%llu\n", > + piov->iov_len, XEN_ARGO_MAX_RING_SIZE); > + return -EINVAL; > + } > + > + sum_iov_lens += piov->iov_len; > + > + /* > + * Again protect sum from integer overflow > + * and ensure total msg size will be within bounds. > + */ > + if ( sum_iov_lens > MAX_ARGO_MESSAGE_SIZE ) > + { > + argo_dprintk("invalid iov series: total message too big\n"); > + return -EMSGSIZE; > + } > + > + piov++; > + } > + > + *count = sum_iov_lens; > + > + return 0; > +} > + > +static int > +ringbuf_insert(struct domain *d, struct argo_ring_info *ring_info, > + const struct argo_ring_id *src_id, > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, > + unsigned long niov, uint32_t message_type, > + unsigned long *out_len) > +{ > + xen_argo_ring_t ring; > + struct xen_argo_ring_message_header mh = { 0 }; No need for the 0, { } will achieve exactly the same. > + int32_t sp; > + int32_t ret; > + uint32_t len = 0; > + xen_argo_iov_t iovs[XEN_ARGO_MAXIOV]; This seems slightly dangerous, a change of the maximum could cause stack overflow depending on the size of xen_argo_iov_t. I think you need some comment next to definition of XEN_ARGO_MAXIOV to note that increasing this could cause issues. > + xen_argo_iov_t *piov; > + XEN_GUEST_HANDLE(uint8_t) NULL_hnd = > + guest_handle_from_param(guest_handle_from_ptr(NULL, uint8_t), > uint8_t); > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + ret = __copy_from_guest(iovs, iovs_hnd, niov) ? -EFAULT : 0; > + if ( ret ) > + goto out; > + > + /* > + * Obtain the total size of data to transmit -- sets the 'len' variable > + * -- and sanity check that the iovs conform to size and number limits. > + * Enforced below: no more than 'len' bytes of guest data > + * (plus the message header) will be sent in this operation. > + */ > + ret = iov_count(iovs, niov, &len); > + if ( ret ) > + goto out; > + > + /* > + * Size bounds check against ring size and static maximum message limit. > + * The message must not fill the ring; there must be at least one slot > + * remaining so we can distinguish a full ring from an empty one. > + */ > + if ( ((ROUNDUP_MESSAGE(len) + > + sizeof(struct xen_argo_ring_message_header)) >= ring_info->len) > || > + (len > MAX_ARGO_MESSAGE_SIZE) ) > + { > + ret = -EMSGSIZE; > + goto out; > + } > + > + ret = get_sanitized_ring(&ring, ring_info); > + if ( ret ) > + goto out; > + > + argo_dprintk("ring.tx_ptr=%d ring.rx_ptr=%d ring len=%d" > + " ring_info->tx_ptr=%d\n", > + ring.tx_ptr, ring.rx_ptr, ring_info->len, > ring_info->tx_ptr); > + > + if ( ring.rx_ptr == ring.tx_ptr ) > + sp = ring_info->len; > + else > + { > + sp = ring.rx_ptr - ring.tx_ptr; > + if ( sp < 0 ) > + sp += ring_info->len; > + } > + > + /* > + * Size bounds check against currently available space in the ring. > + * Again: the message must not fill the ring leaving no space remaining. > + */ > + if ( (ROUNDUP_MESSAGE(len) + > + sizeof(struct xen_argo_ring_message_header)) >= sp ) > + { > + argo_dprintk("EAGAIN\n"); > + ret = -EAGAIN; > + goto out; > + } > + > + mh.len = len + sizeof(struct xen_argo_ring_message_header); > + mh.source.port = src_id->port; > + mh.source.domain_id = src_id->domain_id; > + mh.message_type = message_type; > + > + /* > + * For this copy to the guest ring, tx_ptr is always 16-byte aligned > + * and the message header is 16 bytes long. > + */ > + BUILD_BUG_ON( > + sizeof(struct xen_argo_ring_message_header) != ROUNDUP_MESSAGE(1)); > + > + /* > + * First data write into the destination ring: fixed size, message > header. > + * This cannot overrun because the available free space (value in 'sp') > + * is checked above and must be at least this size. > + */ > + ret = memcpy_to_guest_ring(ring_info, ring.tx_ptr + > sizeof(xen_argo_ring_t), > + &mh, NULL_hnd, sizeof(mh)); > + if ( ret ) > + { > + gprintk(XENLOG_ERR, > + "argo: failed to write message header to ring (vm%u:%x > vm%d)\n", > + ring_info->id.domain_id, ring_info->id.port, > + ring_info->id.partner_id); > + > + goto out; > + } > + > + ring.tx_ptr += sizeof(mh); > + if ( ring.tx_ptr == ring_info->len ) > + ring.tx_ptr = 0; > + > + piov = iovs; > + > + while ( niov-- ) AFAICT using a for loop would remove the need to also do a piov++ at each iteration. > + { > + XEN_GUEST_HANDLE_64(uint8_t) buf_hnd = piov->iov_hnd; > + uint32_t iov_len = piov->iov_len; > + > + /* If no data is provided in this iov, moan and skip on to the next > */ > + if ( !iov_len ) > + { > + gprintk(XENLOG_ERR, > + "argo: no data iov_len=0 iov_hnd=%p ring (vm%u:%x > vm%d)\n", > + buf_hnd.p, ring_info->id.domain_id, ring_info->id.port, > + ring_info->id.partner_id); > + > + piov++; > + continue; > + } > + > + if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) ) > + { > + gprintk(XENLOG_ERR, > + "argo: bad iov handle [%p, %"PRIx32"] (vm%u:%x vm%d)\n", > + buf_hnd.p, iov_len, > + ring_info->id.domain_id, ring_info->id.port, > + ring_info->id.partner_id); > + > + ret = -EFAULT; > + goto out; > + } > + > + sp = ring_info->len - ring.tx_ptr; > + > + /* Check: iov data size versus free space at the tail of the ring */ > + if ( iov_len > sp ) > + { > + /* > + * Second possible data write: ring-tail-wrap-write. > + * Populate the ring tail and update the internal tx_ptr to > handle > + * wrapping at the end of ring. > + * Size of data written here: sp > + * which is the exact full amount of free space available at the > + * tail of the ring, so this cannot overrun. > + */ > + ret = memcpy_to_guest_ring(ring_info, > + ring.tx_ptr + sizeof(xen_argo_ring_t), > + NULL, buf_hnd, sp); > + if ( ret ) > + { > + gprintk(XENLOG_ERR, > + "argo: failed to copy {%p, %"PRIx32"} (vm%u:%x > vm%d)\n", > + buf_hnd.p, sp, > + ring_info->id.domain_id, ring_info->id.port, > + ring_info->id.partner_id); > + > + goto out; > + } > + > + ring.tx_ptr = 0; > + iov_len -= sp; > + guest_handle_add_offset(buf_hnd, sp); > + > + ASSERT(iov_len <= ring_info->len); > + } > + > + /* > + * Third possible data write: all data remaining for this iov. > + * Size of data written here: iov_len > + * > + * Case 1: if the ring-tail-wrap-write above was performed, then > + * iov_len has been decreased by 'sp' and ring.tx_ptr is > zero. > + * > + * We know from checking the result of iov_count: > + * len + sizeof(message_header) <= ring_info->len > + * We also know that len is the total of summing all iov_lens, so: > + * iov_len <= len > + * so by transitivity: > + * iov_len <= len <= (ring_info->len - sizeof(msgheader)) > + * and therefore: > + * (iov_len + sizeof(msgheader) <= ring_info->len) && > + * (ring.tx_ptr == 0) > + * so this write cannot overrun here. > + * > + * Case 2: ring-tail-wrap-write above was not performed > + * -> so iov_len is the guest-supplied value and: (iov_len <= sp) > + * ie. less than available space at the tail of the ring: > + * so this write cannot overrun. > + */ > + ret = memcpy_to_guest_ring(ring_info, > + ring.tx_ptr + sizeof(xen_argo_ring_t), > + NULL, buf_hnd, iov_len); > + if ( ret ) > + { > + gprintk(XENLOG_ERR, > + "argo: failed to copy [%p, %"PRIx32"] (vm%u:%x vm%d)\n", > + buf_hnd.p, iov_len, ring_info->id.domain_id, > + ring_info->id.port, ring_info->id.partner_id); > + > + goto out; > + } > + > + ring.tx_ptr += iov_len; > + > + if ( ring.tx_ptr == ring_info->len ) > + ring.tx_ptr = 0; > + > + piov++; > + } > + > + ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr); > + > + if ( ring.tx_ptr >= ring_info->len ) > + ring.tx_ptr -= ring_info->len; > + > + update_tx_ptr(ring_info, ring.tx_ptr); > + > + out: Do you really need to out label? *out_len it's only set in the success case, so all the error cases that use a 'goto out' could be replaced by 'return ret;'. > + /* > + * At this point it is possible to unmap the ring_info, ie: > + * ring_unmap(ring_info); > + * but performance should be improved by not doing so, and retaining > + * the mapping. > + * An XSM policy control over level of confidentiality required > + * versus performance cost could be added to decide that here. > + * See the similar comment in ring_map_page re: write-only mappings. > + */ > + > + if ( !ret ) > + *out_len = len; > + > + return ret; > +} > + > static void > wildcard_pending_list_remove(domid_t domain_id, struct pending_ent *ent) > { > @@ -359,6 +781,22 @@ wildcard_pending_list_remove(domid_t domain_id, struct > pending_ent *ent) > } > > static void > +wildcard_pending_list_insert(domid_t domain_id, struct pending_ent *ent) > +{ > + struct domain *d = get_domain_by_id(domain_id); > + if ( !d ) > + return; > + > + if ( d->argo ) > + { > + spin_lock(&d->argo->wildcard_lock); > + hlist_add_head(&ent->wildcard_node, &d->argo->wildcard_pend_list); > + spin_unlock(&d->argo->wildcard_lock); > + } > + put_domain(d); > +} > + > +static void > pending_remove_all(struct argo_ring_info *ring_info) > { > struct hlist_node *node, *next; > @@ -374,6 +812,67 @@ pending_remove_all(struct argo_ring_info *ring_info) > ring_info->npending = 0; > } > > +static int > +pending_queue(struct argo_ring_info *ring_info, domid_t src_id, > + unsigned int len) > +{ > + struct pending_ent *ent; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + if ( ring_info->npending >= MAX_PENDING_PER_RING ) > + return -ENOSPC; > + > + ent = xmalloc(struct pending_ent); > + Extra newline. > + if ( !ent ) > + return -ENOMEM; > + > + ent->len = len; > + ent->domain_id = src_id; > + ent->ring_info = ring_info; > + > + if ( ring_info->id.partner_id == XEN_ARGO_DOMID_ANY ) > + wildcard_pending_list_insert(src_id, ent); > + hlist_add_head(&ent->node, &ring_info->pending); > + ring_info->npending++; > + > + return 0; > +} > + > +static int > +pending_requeue(struct argo_ring_info *ring_info, domid_t src_id, > + unsigned int len) > +{ > + struct hlist_node *node; > + struct pending_ent *ent; > + > + ASSERT(spin_is_locked(&ring_info->lock)); > + > + hlist_for_each_entry(ent, node, &ring_info->pending, node) > + { > + if ( ent->domain_id == src_id ) > + { > + /* > + * Reuse an existing queue entry for a notification rather than > add > + * another. If the existing entry is waiting for a smaller size > than > + * the current message then adjust the record to wait for the > + * current (larger) size to be available before triggering a > + * notification. > + * This assists the waiting sender by ensuring that whenever a > + * notification is triggered, there is sufficient space available > + * for (at least) any one of the messages awaiting transmission. > + */ > + if ( ent->len < len ) > + ent->len = len; > + > + return 0; > + } > + } > + > + return pending_queue(ring_info, src_id, len); > +} > + > static void > wildcard_rings_pending_remove(struct domain *d) > { > @@ -667,6 +1166,28 @@ ring_find_info(const struct domain *d, const struct > argo_ring_id *id) > return NULL; > } > > +static struct argo_ring_info * > +ring_find_info_by_match(const struct domain *d, uint32_t port, > + domid_t partner_id) > +{ > + struct argo_ring_id id; > + struct argo_ring_info *ring_info; > + > + ASSERT(rw_is_locked(&d->argo->lock)); > + > + id.port = port; > + id.domain_id = d->domain_id; > + id.partner_id = partner_id; > + > + ring_info = ring_find_info(d, &id); > + if ( ring_info ) > + return ring_info; > + > + id.partner_id = XEN_ARGO_DOMID_ANY; > + > + return ring_find_info(d, &id); > +} > + > static struct argo_send_info * > send_find_info(const struct domain *d, const struct argo_ring_id *id) > { > @@ -1005,6 +1526,95 @@ register_ring(struct domain *currd, > return ret; > } > > +static long > +sendv(struct domain *src_d, const xen_argo_addr_t *src_addr, > + const xen_argo_addr_t *dst_addr, > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, unsigned long niov, > + uint32_t message_type) > +{ > + struct domain *dst_d = NULL; > + struct argo_ring_id src_id; > + struct argo_ring_info *ring_info; > + int ret = 0; > + unsigned long len = 0; > + > + ASSERT(src_d->domain_id == src_addr->domain_id); > + > + argo_dprintk("sendv: (%d:%x)->(%d:%x) niov:%lu iov:%p type:%u\n", > + src_addr->domain_id, src_addr->port, > + dst_addr->domain_id, dst_addr->port, > + niov, iovs_hnd.p, message_type); > + > + read_lock(&argo_lock); > + > + if ( !src_d->argo ) > + { > + ret = -ENODEV; > + goto out_unlock; > + } > + > + src_id.port = src_addr->port; > + src_id.domain_id = src_d->domain_id; > + src_id.partner_id = dst_addr->domain_id; > + > + dst_d = get_domain_by_id(dst_addr->domain_id); > + if ( !dst_d ) > + { > + argo_dprintk("!dst_d, ESRCH\n"); > + ret = -ESRCH; > + goto out_unlock; > + } > + > + if ( !dst_d->argo ) > + { > + argo_dprintk("!dst_d->argo, ECONNREFUSED\n"); > + ret = -ECONNREFUSED; > + goto out_unlock; The usage of out_unlock here and in the condition above is wrong, since it will unconditionally call read_unlock(&argo_lock); which is wrong here because the lock has not yet been acquired. > + } > + > + read_lock(&dst_d->argo->lock); > + > + ring_info = ring_find_info_by_match(dst_d, dst_addr->port, > + src_addr->domain_id); > + if ( !ring_info ) > + { > + gprintk(XENLOG_ERR, > + "argo: vm%u connection refused, src (vm%u:%x) dst > (vm%u:%x)\n", > + current->domain->domain_id, src_id.domain_id, src_id.port, > + dst_addr->domain_id, dst_addr->port); > + > + ret = -ECONNREFUSED; > + goto out_unlock2; > + } > + > + spin_lock(&ring_info->lock); > + > + ret = ringbuf_insert(dst_d, ring_info, &src_id, iovs_hnd, niov, > + message_type, &len); > + if ( ret == -EAGAIN ) > + { > + argo_dprintk("argo_ringbuf_sendv failed, EAGAIN\n"); > + /* requeue to issue a notification when space is there */ > + ret = pending_requeue(ring_info, src_addr->domain_id, len); > + } > + > + spin_unlock(&ring_info->lock); > + > + if ( ret >= 0 ) > + signal_domain(dst_d); > + > + out_unlock2: There's only a single user of the out_unlock2 label, at which point it might be easier to read to just put the read_unlock there and just use the existing out_unlock label. Thanks, Roger. _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |