[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [Xen-devel] [PATCH v3 09/15] argo: implement the sendv op; evtchn: expose send_guest_global_virq
Thanks for the review, Roger. Replies inline below. On Wed, Jan 9, 2019 at 10:57 AM Roger Pau Monné <royger@xxxxxxxxxxx> wrote: > > to.On Mon, Jan 7, 2019 at 8:44 AM Christopher Clark > <christopher.w.clark@xxxxxxxxx> wrote: > > > > sendv operation is invoked to perform a synchronous send of buffers > > contained in iovs to a remote domain's registered ring. > > > > diff --git a/xen/common/argo.c b/xen/common/argo.c > > index 59ce8c4..4548435 100644 > > --- a/xen/common/argo.c > > +++ b/xen/common/argo.c > > > > +static int > > +memcpy_to_guest_ring(struct argo_ring_info *ring_info, uint32_t offset, > > + const void *src, XEN_GUEST_HANDLE(uint8_t) src_hnd, > > + uint32_t len) > > +{ > > + unsigned int mfns_index = offset >> PAGE_SHIFT; > > + void *dst; > > + int ret; > > + unsigned int src_offset = 0; > > + > > + ASSERT(spin_is_locked(&ring_info->lock)); > > + > > + offset &= ~PAGE_MASK; > > + > > + if ( (len > XEN_ARGO_MAX_RING_SIZE) || (offset > > > XEN_ARGO_MAX_RING_SIZE) ) > > + return -EFAULT; > > + > > + while ( (offset + len) > PAGE_SIZE ) > > I think you could map the whole ring in contiguous virtual address > space and then writing to it would be much more easy, you wouldn't > need to iterate with memcpy or copy_from_guest, take a look at __vmap. > You could likely map this when the ring gets setup and keep it mapped > for the lifetime of the ring. You're right about that, because map_domain_page_global, which the current code uses, uses vmap itself. I think there's a couple of reasons why the code has been implemented the iterative way though. The first is that I think ring resize has been a consideration: it's useful to be able to increase the size of a live and active ring that is under load without having to tear down the mappings, find a new virtual address region of the right size and then remap it: you can just supply some more memory and map those pages onto the end of the ring, and ensure both sides know about the new ring size. Similarly, shrinking a quiet ring can be useful. However, the "gfn race" that you (correctly) pointed out in an earlier review, and Jan's related request to drop the "revalidate an existing mapping on ring reregister" motivated removal of a section of the code involved, and then in v3 of the series, I've actually just blocked ring resize because it's missing a walk through the pending notifications to find any that have become untriggerable with the new ring size when a ring is shrunk and I'd like to defer implementing that for now. So the ring resize reason is more of a consideration for a possible later version of Argo than the current one. The second reason is about avoiding exposing the Xen virtual memory allocator directly to frequent guest-supplied size requests for contiguous regions (of up to 16GB). With single-page allocations to build a ring, fragmentation is not a problem, and mischief by a guest seems difficult. Changing it to issue requests for contiguous regions, with variable ring sizes up to the maximum of 16GB, it seems like significant fragmentation may be achievable. I don't know the practical impact of that but it seems worth avoiding. Are the other users of __vmap (or vmap) for multi-gigabyte regions only either boot-time, infrequent operations (livepatch), or for actions by privileged (ie. somewhat trusted) domains (ioremap), or is it already a frequent operation somewhere else? Given the context above, and Jason's simplification to the memcpy_to_guest_ring function, plus the imminent merge freeze deadline, and the understanding that this loop and the related data structures supporting it have been tested and are working, would it be acceptable to omit making this contiguous mapping change from this current series? > > > + { > > + unsigned int head_len = PAGE_SIZE - offset; > > + > > + ret = ring_map_page(ring_info, mfns_index, &dst); > > + if ( ret ) > > + return ret; > > + > > + if ( src ) > > + { > > + memcpy(dst + offset, src + src_offset, head_len); > > + src_offset += head_len; > > + } > > + else > > + { > > + ret = copy_from_guest(dst + offset, src_hnd, head_len) ? > > + -EFAULT : 0; > > + if ( ret ) > > + return ret; > > You can simplify this to: > > if ( copy_from_guest(...) ) > return -EFAULT; yes! ack - thanks <snip> > > +/* > > + * get_sanitized_ring creates a modified copy of the ring pointers where > > + * the rx_ptr is rounded up to ensure it is aligned, and then ring > > + * wrap is handled. Simplifies safe use of the rx_ptr for available > > + * space calculation. > > + */ > > +static int > > +get_sanitized_ring(xen_argo_ring_t *ring, struct argo_ring_info *ring_info) > > +{ > > + uint32_t rx_ptr; > > + int ret; > > + > > + ret = get_rx_ptr(ring_info, &rx_ptr); > > + if ( ret ) > > + return ret; > > + > > + ring->tx_ptr = ring_info->tx_ptr; > > + > > + rx_ptr = ROUNDUP_MESSAGE(rx_ptr); > > + if ( rx_ptr >= ring_info->len ) > > + rx_ptr = 0; > > + > > + ring->rx_ptr = rx_ptr; > > Newline. ack, thanks <snip> > > +/* > > + * iov_count returns its count on success via an out variable to avoid > > + * potential for a negative return value to be used incorrectly > > + * (eg. coerced into an unsigned variable resulting in a large incorrect > > value) > > + */ > > +static int > > +iov_count(const xen_argo_iov_t *piov, unsigned long niov, uint32_t *count) > > +{ > > + uint32_t sum_iov_lens = 0; > > + > > + if ( niov > XEN_ARGO_MAXIOV ) > > + return -EINVAL; > > + > > + while ( niov-- ) > > I would use a for loop here, that would remove the need to piov++, if > you want to keep it quite similar: > > for ( ; niov--; piov++ ) > { Yes, that is better - thanks, applied. <snip> > > + > > +static int > > +ringbuf_insert(struct domain *d, struct argo_ring_info *ring_info, > > + const struct argo_ring_id *src_id, > > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, > > + unsigned long niov, uint32_t message_type, > > + unsigned long *out_len) > > +{ > > + xen_argo_ring_t ring; > > + struct xen_argo_ring_message_header mh = { 0 }; > > No need for the 0, { } will achieve exactly the same. ack, applied > > > + int32_t sp; > > + int32_t ret; > > + uint32_t len = 0; > > + xen_argo_iov_t iovs[XEN_ARGO_MAXIOV]; > > This seems slightly dangerous, a change of the maximum could cause > stack overflow depending on the size of xen_argo_iov_t. I think you > need some comment next to definition of XEN_ARGO_MAXIOV to note that > increasing this could cause issues. That makes sense, will do. <snip> > > + /* > > + * First data write into the destination ring: fixed size, message > > header. > > + * This cannot overrun because the available free space (value in 'sp') > > + * is checked above and must be at least this size. > > + */ > > + ret = memcpy_to_guest_ring(ring_info, ring.tx_ptr + > > sizeof(xen_argo_ring_t), > > + &mh, NULL_hnd, sizeof(mh)); > > + if ( ret ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: failed to write message header to ring (vm%u:%x > > vm%d)\n", > > + ring_info->id.domain_id, ring_info->id.port, > > + ring_info->id.partner_id); > > + > > + goto out; > > + } > > + > > + ring.tx_ptr += sizeof(mh); > > + if ( ring.tx_ptr == ring_info->len ) > > + ring.tx_ptr = 0; > > + > > + piov = iovs; > > + > > + while ( niov-- ) > > AFAICT using a for loop would remove the need to also do a piov++ at > each iteration. ack, applied. <snip> > > + * Case 2: ring-tail-wrap-write above was not performed > > + * -> so iov_len is the guest-supplied value and: (iov_len <= > > sp) > > + * ie. less than available space at the tail of the ring: > > + * so this write cannot overrun. > > + */ > > + ret = memcpy_to_guest_ring(ring_info, > > + ring.tx_ptr + sizeof(xen_argo_ring_t), > > + NULL, buf_hnd, iov_len); > > + if ( ret ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: failed to copy [%p, %"PRIx32"] (vm%u:%x > > vm%d)\n", > > + buf_hnd.p, iov_len, ring_info->id.domain_id, > > + ring_info->id.port, ring_info->id.partner_id); > > + > > + goto out; > > + } > > + > > + ring.tx_ptr += iov_len; > > + > > + if ( ring.tx_ptr == ring_info->len ) > > + ring.tx_ptr = 0; > > + > > + piov++; > > + } > > + > > + ring.tx_ptr = ROUNDUP_MESSAGE(ring.tx_ptr); > > + > > + if ( ring.tx_ptr >= ring_info->len ) > > + ring.tx_ptr -= ring_info->len; > > + > > + update_tx_ptr(ring_info, ring.tx_ptr); > > + > > + out: > > Do you really need to out label? *out_len it's only set in the success > case, so all the error cases that use a 'goto out' could be replaced > by 'return ret;'. ack, thanks -- done. <snip> > > +static int > > +pending_queue(struct argo_ring_info *ring_info, domid_t src_id, > > + unsigned int len) > > +{ > > + struct pending_ent *ent; > > + > > + ASSERT(spin_is_locked(&ring_info->lock)); > > + > > + if ( ring_info->npending >= MAX_PENDING_PER_RING ) > > + return -ENOSPC; > > + > > + ent = xmalloc(struct pending_ent); > > + > > Extra newline. ack <snip> > > > > +static long > > +sendv(struct domain *src_d, const xen_argo_addr_t *src_addr, > > + const xen_argo_addr_t *dst_addr, > > + XEN_GUEST_HANDLE_PARAM(xen_argo_iov_t) iovs_hnd, unsigned long niov, > > + uint32_t message_type) > > +{ > > + struct domain *dst_d = NULL; > > + struct argo_ring_id src_id; > > + struct argo_ring_info *ring_info; > > + int ret = 0; > > + unsigned long len = 0; > > + > > + ASSERT(src_d->domain_id == src_addr->domain_id); > > + > > + argo_dprintk("sendv: (%d:%x)->(%d:%x) niov:%lu iov:%p type:%u\n", > > + src_addr->domain_id, src_addr->port, > > + dst_addr->domain_id, dst_addr->port, > > + niov, iovs_hnd.p, message_type); > > + > > + read_lock(&argo_lock); > > + > > + if ( !src_d->argo ) > > + { > > + ret = -ENODEV; > > + goto out_unlock; > > + } > > + > > + src_id.port = src_addr->port; > > + src_id.domain_id = src_d->domain_id; > > + src_id.partner_id = dst_addr->domain_id; > > + > > + dst_d = get_domain_by_id(dst_addr->domain_id); > > + if ( !dst_d ) > > + { > > + argo_dprintk("!dst_d, ESRCH\n"); > > + ret = -ESRCH; > > + goto out_unlock; > > + } > > + > > + if ( !dst_d->argo ) > > + { > > + argo_dprintk("!dst_d->argo, ECONNREFUSED\n"); > > + ret = -ECONNREFUSED; > > + goto out_unlock; > > The usage of out_unlock here and in the condition above is wrong, > since it will unconditionally call read_unlock(&argo_lock); which is > wrong here because the lock has not yet been acquired. Sorry, I don't think that's quite right -- if you scroll up a bit here, you can see where argo_lock is taken unconditionally, just after the dprintk and before checking whether src_d is argo enabled. The second lock hasn't been taken yet - but that's not the one being unlocked on that out_unlock path. > > > + } > > + > > + read_lock(&dst_d->argo->lock); > > + > > + ring_info = ring_find_info_by_match(dst_d, dst_addr->port, > > + src_addr->domain_id); > > + if ( !ring_info ) > > + { > > + gprintk(XENLOG_ERR, > > + "argo: vm%u connection refused, src (vm%u:%x) dst > > (vm%u:%x)\n", > > + current->domain->domain_id, src_id.domain_id, src_id.port, > > + dst_addr->domain_id, dst_addr->port); > > + > > + ret = -ECONNREFUSED; > > + goto out_unlock2; > > + } > > + > > + spin_lock(&ring_info->lock); > > + > > + ret = ringbuf_insert(dst_d, ring_info, &src_id, iovs_hnd, niov, > > + message_type, &len); > > + if ( ret == -EAGAIN ) > > + { > > + argo_dprintk("argo_ringbuf_sendv failed, EAGAIN\n"); > > + /* requeue to issue a notification when space is there */ > > + ret = pending_requeue(ring_info, src_addr->domain_id, len); > > + } > > + > > + spin_unlock(&ring_info->lock); > > + > > + if ( ret >= 0 ) > > + signal_domain(dst_d); > > + > > + out_unlock2: > > There's only a single user of the out_unlock2 label, at which point it > might be easier to read to just put the read_unlock there and just use > the existing out_unlock label. ack, will change that. Thanks again, Christopher _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxxxxxxxxx https://lists.xenproject.org/mailman/listinfo/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |