[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH v5 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq
On Tue, Jan 22, 2019 at 4:08 AM Roger Pau Monné <roger.pau@xxxxxxxxxx> wrote: > > On Mon, Jan 21, 2019 at 01:59:49AM -0800, Christopher Clark wrote: > > sendv operation is invoked to perform a synchronous send of buffers > > contained in iovs to a remote domain's registered ring. > > > > It takes: > > * A destination address (domid, port) for the ring to send to. > > It performs a most-specific match lookup, to allow for wildcard. > > * A source address, used to inform the destination of where to reply. > > * The address of an array of iovs containing the data to send > > * .. and the length of that array of iovs > > * and a 32-bit message type, available to communicate message context > > data (eg. kernel-to-kernel, separate from the application data). > > > > If insufficient space exists in the destination ring, it will return > > -EAGAIN and Xen will notify the caller when sufficient space becomes > > available. > > > > Accesses to the ring indices are appropriately atomic. The rings are > > mapped into Xen's private address space to write as needed and the > > mappings are retained for later use. > > > > Notifications are sent to guests via VIRQ and send_guest_global_virq is > > exposed in the change to enable argo to call it. VIRQ_ARGO_MESSAGE is > ^ VIRQ_ARGO > > claimed from the VIRQ previously reserved for this purpose (#11). > > > > The VIRQ notification method is used rather than sending events using > > evtchn functions directly because: > > > > * no current event channel type is an exact fit for the intended > > behaviour. ECS_IPI is closest, but it disallows migration to > > other VCPUs which is not necessarily a requirement for Argo. > > > > * at the point of argo_init, allocation of an event channel is > > complicated by none of the guest VCPUs being initialized yet > > and the event channel logic expects that a valid event channel > > has a present VCPU. > > IMO iff you wanted to use event channels those _must_ be setup by the > guest, ie: the guest argo driver would load, allocate an event channel > and then tell the hypervisor about the event channel that should be > used for argo notifications. > > > +static int > > +memcpy_to_guest_ring(const struct domain *d, struct argo_ring_info > > *ring_info, > > + unsigned int offset, > > + const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd, > > + unsigned int len) > > +{ > > + unsigned int mfns_index = offset >> PAGE_SHIFT; > > + void *dst; > > + int ret; > > + unsigned int src_offset = 0; > > + > > + ASSERT(LOCKING_L3(d, ring_info)); > > + > > + offset &= ~PAGE_MASK; > > + > > + if ( len + offset > XEN_ARGO_MAX_RING_SIZE ) > > + return -EFAULT; > > + > > + while ( len ) > > + { > > + unsigned int head_len = (offset + len) > PAGE_SIZE ? PAGE_SIZE - > > offset > > + : len; > > IMO that would be clearer as: > > head_len = min(PAGE_SIZE - offset, len); You're right that the calculated result should be the same, but I've left this unchanged because I think the reason for using that value (ie. intent) is clearer in the form it has: it's not about trying to find the smallest amount of data to write, it's about only writing up to the PAGE_SIZE boundary, starting at offset. > > But anyway, this should go away when you move to using vmap. > > [...] > > +static int > > +ringbuf_insert(const struct domain *d, struct argo_ring_info *ring_info, > > + const struct argo_ring_id *src_id, > > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, > > + unsigned long niov, uint32_t message_type, > > + unsigned long *out_len) > > +{ > > + xen_argo_ring_t ring; > > + struct xen_argo_ring_message_header mh = { }; > > + int sp, ret; > > + unsigned int len = 0; > > + xen_argo_iov_t iovs[XEN_ARGO_MAXIOV]; > > + xen_argo_iov_t *piov; > > + XEN_GUEST_HANDLE(uint8_t) NULL_hnd = > > + guest_handle_from_param(guest_handle_from_ptr(NULL, uint8_t), > > uint8_t); > > + > > + ASSERT(LOCKING_L3(d, ring_info)); > > + > > + ret = __copy_from_guest(iovs, iovs_hnd, niov) ? -EFAULT : 0; > > + if ( ret ) > > + return ret; > > + > > + /* > > + * Obtain the total size of data to transmit -- sets the 'len' variable > > + * -- and sanity check that the iovs conform to size and number limits. > > + * Enforced below: no more than 'len' bytes of guest data > > + * (plus the message header) will be sent in this operation. > > + */ > > + ret = iov_count(iovs, niov, &len); > > + if ( ret ) > > + return ret; > > + > > + /* > > + * Size bounds check against ring size and static maximum message > > limit. > > + * The message must not fill the ring; there must be at least one slot > > + * remaining so we can distinguish a full ring from an empty one. > > + */ > > + if ( ((ROUNDUP_MESSAGE(len) + > > + sizeof(struct xen_argo_ring_message_header)) >= > > ring_info->len) || > > + (len > MAX_ARGO_MESSAGE_SIZE) ) > > len is already checked to be <= MAX_ARGO_MESSAGE_SIZE in iov_count > where it gets set, this is redundant. ack, removed, thanks. > > > + return -EMSGSIZE; > > + > > + ret = get_sanitized_ring(d, &ring, ring_info); > > + if ( ret ) > > + return ret; > > + > > + argo_dprintk("ring.tx_ptr=%u ring.rx_ptr=%u ring len=%u" > > + " ring_info->tx_ptr=%u\n", > > + ring.tx_ptr, ring.rx_ptr, ring_info->len, > > ring_info->tx_ptr); > > + > > + if ( ring.rx_ptr == ring.tx_ptr ) > > + sp = ring_info->len; > > + else > > + { > > + sp = ring.rx_ptr - ring.tx_ptr; > > + if ( sp < 0 ) > > + sp += ring_info->len; > > + } > > + > > + /* > > + * Size bounds check against currently available space in the ring. > > + * Again: the message must not fill the ring leaving no space > > remaining. > > + */ > > + if ( (ROUNDUP_MESSAGE(len) + > > + sizeof(struct xen_argo_ring_message_header)) >= sp ) > > + { > > + argo_dprintk("EAGAIN\n"); > > + return -EAGAIN; > > + } > > + > > + mh.len = len + sizeof(struct xen_argo_ring_message_header); > > + mh.source.aport = src_id->aport; > > + mh.source.domain_id = src_id->domain_id; > > + mh.message_type = message_type; > > + > > + /* > > + * For this copy to the guest ring, tx_ptr is always 16-byte aligned > > + * and the message header is 16 bytes long. > > + */ > > + BUILD_BUG_ON( > > + sizeof(struct xen_argo_ring_message_header) != ROUNDUP_MESSAGE(1)); > > + > > + /* > > + * First data write into the destination ring: fixed size, message > > header. > > + * This cannot overrun because the available free space (value in 'sp') > > + * is checked above and must be at least this size. > > + */ > > + ret = memcpy_to_guest_ring(d, ring_info, > > + ring.tx_ptr + sizeof(xen_argo_ring_t), > > + &mh, NULL_hnd, sizeof(mh)); > > + if ( ret ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: failed to write message header to ring (vm%u:%x > > vm%u)\n", > > + ring_info->id.domain_id, ring_info->id.aport, > > + ring_info->id.partner_id); > > + > > + return ret; > > + } > > + > > + ring.tx_ptr += sizeof(mh); > > + if ( ring.tx_ptr == ring_info->len ) > > + ring.tx_ptr = 0; > > + > > + for ( piov = iovs; niov--; piov++ ) > > + { > > + XEN_GUEST_HANDLE_64(uint8_t) buf_hnd = piov->iov_hnd; > > + unsigned int iov_len = piov->iov_len; > > + > > + /* If no data is provided in this iov, moan and skip on to the > > next */ > > + if ( !iov_len ) > > + { > > + gprintk(XENLOG_ERR, > > This should likely be WARN or INFO, since it's not an error? Yes, changed to WARNING, ack. > > > + "argo: no data iov_len=0 iov_hnd=%p ring (vm%u:%x > > vm%u)\n", > > + buf_hnd.p, ring_info->id.domain_id, > > ring_info->id.aport, > > + ring_info->id.partner_id); > > + > > + continue; > > + } > > + > > + if ( unlikely(!guest_handle_okay(buf_hnd, iov_len)) ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: bad iov handle [%p, %u] (vm%u:%x vm%u)\n", > > + buf_hnd.p, iov_len, > > + ring_info->id.domain_id, ring_info->id.aport, > > + ring_info->id.partner_id); > > + > > + return -EFAULT; > > + } > > + > > + sp = ring_info->len - ring.tx_ptr; > > + > > + /* Check: iov data size versus free space at the tail of the ring > > */ > > + if ( iov_len > sp ) > > + { > > + /* > > + * Second possible data write: ring-tail-wrap-write. > > + * Populate the ring tail and update the internal tx_ptr to > > handle > > + * wrapping at the end of ring. > > + * Size of data written here: sp > > + * which is the exact full amount of free space available at > > the > > + * tail of the ring, so this cannot overrun. > > + */ > > + ret = memcpy_to_guest_ring(d, ring_info, > > + ring.tx_ptr + > > sizeof(xen_argo_ring_t), > > + NULL, buf_hnd, sp); > > + if ( ret ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: failed to copy {%p, %d} (vm%u:%x vm%u)\n", > > + buf_hnd.p, sp, > > + ring_info->id.domain_id, ring_info->id.aport, > > + ring_info->id.partner_id); > > + > > + return ret; > > + } > > + > > + ring.tx_ptr = 0; > > + iov_len -= sp; > > + guest_handle_add_offset(buf_hnd, sp); > > + > > + ASSERT(iov_len <= ring_info->len); > > + } > > + > > + /* > > + * Third possible data write: all data remaining for this iov. > > + * Size of data written here: iov_len > > + * > > + * Case 1: if the ring-tail-wrap-write above was performed, then > > + * iov_len has been decreased by 'sp' and ring.tx_ptr is > > zero. > > + * > > + * We know from checking the result of iov_count: > > + * len + sizeof(message_header) <= ring_info->len > > + * We also know that len is the total of summing all iov_lens, > > so: > > + * iov_len <= len > > + * so by transitivity: > > + * iov_len <= len <= (ring_info->len - sizeof(msgheader)) > > + * and therefore: > > + * (iov_len + sizeof(msgheader) <= ring_info->len) && > > + * (ring.tx_ptr == 0) > > + * so this write cannot overrun here. > > + * > > + * Case 2: ring-tail-wrap-write above was not performed > > + * -> so iov_len is the guest-supplied value and: (iov_len <= > > sp) > > + * ie. less than available space at the tail of the ring: > > + * so this write cannot overrun. > > + */ > > + ret = memcpy_to_guest_ring(d, ring_info, > > + ring.tx_ptr + sizeof(xen_argo_ring_t), > > + NULL, buf_hnd, iov_len); > > + if ( ret ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: failed to copy [%p, %u] (vm%u:%x vm%u)\n", > > + buf_hnd.p, iov_len, ring_info->id.domain_id, > > + ring_info->id.aport, ring_info->id.partner_id); > > + > > + return ret; > > + } > > + > > + ring.tx_ptr += iov_len; > > + > > + if ( ring.tx_ptr == ring_info->len ) > > + ring.tx_ptr = 0; > > + } > > + > > + ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr); > > + > > + if ( ring.tx_ptr >= ring_info->len ) > > + ring.tx_ptr -= ring_info->len; > > You seem to handle the wrapping after each possible write, so I think > the above is not needed? Maybe it should be an assert instead? The wrap handling is necesssary due to that ROUNDUP_MESSAGE immediately above it. I've added a new comment to make it a bit clearer: Finished writing data from all iovs into the ring: now need to round up tx_ptr to align to the next message boundary, and then wrap if necessary. > > > + > > + update_tx_ptr(d, ring_info, ring.tx_ptr); > > + > > + /* > > + * At this point (and also on an error exit paths from this function) > > it is > > + * possible to unmap the ring_info, ie: > > + * ring_unmap(d, ring_info); > > + * but performance should be improved by not doing so, and retaining > > + * the mapping. > > + * An XSM policy control over level of confidentiality required > > + * versus performance cost could be added to decide that here. > > + */ > > + > > + *out_len = len; > > + > > + return ret; > > +} > > + > > static void > > wildcard_pending_list_remove(domid_t domain_id, struct pending_ent *ent) > > { > > @@ -497,6 +918,25 @@ wildcard_pending_list_remove(domid_t domain_id, struct > > pending_ent *ent) > > } > > > > static void > > +wildcard_pending_list_insert(domid_t domain_id, struct pending_ent *ent) > > +{ > > + struct domain *d = get_domain_by_id(domain_id); > > + > > + if ( !d ) > > + return; > > + > > + ASSERT(LOCKING_Read_L1); > > + > > + if ( d->argo ) > > + { > > + spin_lock(&d->argo->wildcard_L2_lock); > > + list_add(&ent->wildcard_node, &d->argo->wildcard_pend_list); > > + spin_unlock(&d->argo->wildcard_L2_lock); > > + } > > + put_domain(d); > > +} > > + > > +static void > > pending_remove_all(const struct domain *d, struct argo_ring_info > > *ring_info) > > { > > struct list_head *ring_pending = &ring_info->pending; > > @@ -518,6 +958,70 @@ pending_remove_all(const struct domain *d, struct > > argo_ring_info *ring_info) > > ring_info->npending = 0; > > } > > > > +static int > > +pending_queue(const struct domain *d, struct argo_ring_info *ring_info, > > + domid_t src_id, unsigned int len) > > +{ > > + struct pending_ent *ent; > > + > > + ASSERT(LOCKING_L3(d, ring_info)); > > + > > + if ( ring_info->npending >= MAX_PENDING_PER_RING ) > > + return -ENOSPC; > > + > > + ent = xmalloc(struct pending_ent); > > + if ( !ent ) > > + return -ENOMEM; > > + > > + ent->len = len; > > + ent->domain_id = src_id; > > + ent->ring_info = ring_info; > > + > > + if ( ring_info->id.partner_id == XEN_ARGO_DOMID_ANY ) > > + wildcard_pending_list_insert(src_id, ent); > > + list_add(&ent->node, &ring_info->pending); > > + ring_info->npending++; > > + > > + return 0; > > +} > > + > > +static int > > +pending_requeue(const struct domain *d, struct argo_ring_info *ring_info, > > + domid_t src_id, unsigned int len) > > +{ > > + struct list_head *cursor, *head; > > + > > + ASSERT(LOCKING_L3(d, ring_info)); > > + > > + /* List structure is not modified here. Update len in a match if > > found. */ > > + head = &ring_info->pending; > > + > > + for ( cursor = head->next; cursor != head; cursor = cursor->next ) > > list_for_each_entry ack > > > long > > do_argo_op(unsigned int cmd, XEN_GUEST_HANDLE_PARAM(void) arg1, > > XEN_GUEST_HANDLE_PARAM(void) arg2, unsigned long arg3, > > @@ -1145,6 +1734,53 @@ do_argo_op(unsigned int cmd, > > XEN_GUEST_HANDLE_PARAM(void) arg1, > > break; > > } > > > > + case XEN_ARGO_OP_sendv: > > + { > > + xen_argo_send_addr_t send_addr; > > + > > + XEN_GUEST_HANDLE_PARAM(xen_argo_send_addr_t) send_addr_hnd = > > + guest_handle_cast(arg1, xen_argo_send_addr_t); > > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd = > > + guest_handle_cast(arg2, xen_argo_iov_t); > > + /* arg3 is niov */ > > + /* arg4 is message_type. Must be a 32-bit value. */ > > + > > + rc = copy_from_guest(&send_addr, send_addr_hnd, 1) ? -EFAULT : 0; > > + if ( rc ) > > + break; > > + > > + /* > > + * Check padding is zeroed. Reject niov above limit or > > message_types > > + * that are outside 32 bit range. > > + */ > > + if ( unlikely(send_addr.src.pad || send_addr.dst.pad || > > + (arg3 > XEN_ARGO_MAXIOV) || (arg4 & ~0xffffffffUL)) ) > > arg4 & (GB(4) - 1) > > Is clearer IMO, or: > > arg4 > UINT32_MAX I've left the code unchanged, as the mask constant is used multiple places elsewhere in Xen. UINT32_MAX is only used as a threshold value. > > > + { > > + rc = -EINVAL; > > + break; > > + } > > + > > + if ( send_addr.src.domain_id == XEN_ARGO_DOMID_ANY ) > > + send_addr.src.domain_id = currd->domain_id; > > + > > + /* No domain is currently authorized to send on behalf of another > > */ > > + if ( unlikely(send_addr.src.domain_id != currd->domain_id) ) > > + { > > + rc = -EPERM; > > + break; > > + } > > + > > + /* > > + * Check access to the whole array here so we can use the faster > > __copy > > + * operations to read each element later. > > + */ > > + if ( unlikely(!guest_handle_okay(iovs_hnd, arg3)) ) > > You need to set rc to EFAULT here, because the call to copy_from_guest > has set it to 0. ack. > > Alternatively you can change the call above to be: > > if ( copy_from_guest(&send_addr, send_addr_hnd, 1) ) > return -EFAULT; > > So rc doesn't get set to 0 on success. > With those taken care of: > > Reviewed-by: Roger Pau Monné <roger.pau@xxxxxxxxxx> _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |