[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Minios-devel] [UNIKRAFT PATCH v4 1/4] lib/uknetdev: Introduce receive buffer allocator callback



The idea of having a fill-up parameter on the receive function is
inconvenient because a user never knows how many receive buffers have
to be programmed before calling the receive function.
This commit introduces registering a callback when configuring a
receive queue. This callback has to be provided by the NETDEV API user
and is called by the driver whenever it programs new receive buffers to
the receive queue. In order to still provide performance, this
allocation callback function is called for a batch of receive buffers.
This new mechanism replaces the fill-up parameter on the receive function.

This commit also adopts virtio-net in order to comply with the API change.

Signed-off-by: Simon Kuenzer <simon.kuenzer@xxxxxxxxx>
---
 lib/uknetdev/include/uk/netdev.h        |  50 +++------
 lib/uknetdev/include/uk/netdev_core.h   |  26 ++++-
 lib/uknetdev/netdev.c                   |   1 +
 plat/drivers/include/virtio/virtqueue.h |  13 ++-
 plat/drivers/virtio/virtio_net.c        | 128 +++++++++++++++---------
 plat/drivers/virtio/virtio_ring.c       |  10 +-
 6 files changed, 135 insertions(+), 93 deletions(-)

diff --git a/lib/uknetdev/include/uk/netdev.h b/lib/uknetdev/include/uk/netdev.h
index f0fa769f..18878400 100644
--- a/lib/uknetdev/include/uk/netdev.h
+++ b/lib/uknetdev/include/uk/netdev.h
@@ -215,7 +215,10 @@ int uk_netdev_rxq_info_get(struct uk_netdev *dev, uint16_t 
queue_id,
  *   value.
  * @param rx_conf
  *   The pointer to the configuration data to be used for the receive queue.
- *   Its memory can be released after invoking this function.
+ *   Its memory can be released after invoking this function. Please note that
+ *   the receive buffer allocator (`rx_conf->alloc_rxpkts`) has to be
+ *   interrupt-context-safe when `uk_netdev_rx_one` is going to be called from
+ *   interrupt context.
  * @return
  *   - (0): Success, receive queue correctly set up.
  *   - (-ENOMEM): Unable to allocate the receive ring descriptors.
@@ -419,9 +422,11 @@ static inline int uk_netdev_rxq_intr_disable(struct 
uk_netdev *dev,
 }
 
 /**
- * Receive one packet and re-program used receive descriptor
- * Please note that before any packet can be received, the receive queue
- * has to be filled up with empty netbufs (see fillup parameter).
+ * Receive one packet and re-program used receive descriptors
+ * If this function is called from interrupt context (e.g., within receive 
event
+ * handler when no dispatcher threads are configured) make sure that the
+ * provided receive buffer allocator function is interrupt-context-safe
+ * (see: `uk_netdev_rxq_configure`).
  *
  * @param dev
  *   The Unikraft Network Device.
@@ -431,51 +436,28 @@ static inline int uk_netdev_rxq_intr_disable(struct 
uk_netdev *dev,
  *   to uk_netdev_configure().
  * @param pkt
  *   Reference to netbuf pointer which will be point to the received packet
- *   after the function call. Can be NULL if function is used to program
- *   receive descriptors only.
- * @param fillup
- *   Array of netbufs that should be used to program used descriptors again.
- *   Each of the netbuf should be freshly allocated/initialized and not part
- *   of any chain.
- *   `fillup` can be `NULL` but without re-programming of used descriptors no
- *   new packets can be received at some point.
- * @param fillup_count
- *   Length of `fillup` array. After the function call, `fillup_count` returns
- *   the number of left and unused netbufs on the array. `fillup_count` has to
- *   to 0 if `fillup` is `NULL`.
+ *   after the function call. `pkt` has never to be `NULL`.
  * @return
- *   - (0): No packet available or `pkt` was set to NULL,
- *          check `fillup_count` for used `fillup` netbufs
- *   - (1): `pkt` points to received netbuf,
- *          check `fillup_count` for used `fillup` netbufs
+ *   - (0): No packet available
+ *   - (1): `pkt` points to received netbuf
  *   - (2): `pkt` points to received netbuf but more received packets are
  *          available on the receive queue. When interrupts are used, they are
- *          disabled until 1 is returned on subsequent calls,
- *          check `fillup_count` for used `fillup` netbufs
+ *          disabled until 1 is returned on subsequent calls
  *   - (<0): Error code from driver
  */
 static inline int uk_netdev_rx_one(struct uk_netdev *dev, uint16_t queue_id,
-                                  struct uk_netbuf **pkt,
-                                  struct uk_netbuf *fillup[],
-                                  uint16_t *fillup_count)
+                                  struct uk_netbuf **pkt)
 {
        UK_ASSERT(dev);
        UK_ASSERT(dev->rx_one);
        UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
        UK_ASSERT(dev->_data->state == UK_NETDEV_RUNNING);
        UK_ASSERT(!PTRISERR(dev->_rx_queue[queue_id]));
-       UK_ASSERT((!fillup && fillup_count) || fillup);
+       UK_ASSERT(pkt);
 
-       return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt,
-                          fillup, fillup_count);
+       return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt);
 }
 
-/**
- * Shortcut for only filling up a receive queue with empty netbufs
- */
-#define uk_netdev_rx_fillup(dev, queue_id, fillup, fillup_count)       \
-       uk_netdev_rx_one((dev), (queue_id), NULL, (fillup), (fillup_count))
-
 /**
  * Transmit one packet
  *
diff --git a/lib/uknetdev/include/uk/netdev_core.h 
b/lib/uknetdev/include/uk/netdev_core.h
index b77c45a4..d30886de 100644
--- a/lib/uknetdev/include/uk/netdev_core.h
+++ b/lib/uknetdev/include/uk/netdev_core.h
@@ -175,6 +175,25 @@ enum uk_netdev_einfo_type {
 typedef void (*uk_netdev_queue_event_t)(struct uk_netdev *dev,
                                        uint16_t queue_id, void *argp);
 
+/**
+ * User callback used by the driver to allocate netbufs
+ * that are used to setup receive descriptors.
+ *
+ * @param argp
+ *   User-provided argument.
+ * @param pkts
+ *   Array for netbuf pointers that the function should allocate.
+ * @param count
+ *   Number of netbufs requested (equal to length of pkts).
+ * @return
+ *   Number of successful allocated netbufs,
+ *   has to be in range [0, count].
+ *   References to allocated packets are placed to pkts[0]...pkts[count -1].
+ */
+typedef uint16_t (*uk_netdev_alloc_rxpkts)(void *argp,
+                                          struct uk_netbuf *pkts[],
+                                          uint16_t count);
+
 /**
  * A structure used to configure an Unikraft network device RX queue.
  */
@@ -183,6 +202,9 @@ struct uk_netdev_rxqueue_conf {
        void *callback_cookie;            /**< Argument pointer for callback. */
 
        struct uk_alloc *a;               /**< Allocator for descriptors. */
+
+       uk_netdev_alloc_rxpkts alloc_rxpkts; /**< Allocator for rx netbufs */
+       void *alloc_rxpkts_argp;             /**< Argument for alloc_rxpkts */
 #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
        struct uk_sched *s;               /**< Scheduler for dispatcher. */
 #endif
@@ -266,9 +288,7 @@ typedef int (*uk_netdev_rxq_intr_disable_t)(struct 
uk_netdev *dev,
 /** Driver callback type to retrieve one packet from a RX queue. */
 typedef int (*uk_netdev_rx_one_t)(struct uk_netdev *dev,
                                  struct uk_netdev_rx_queue *queue,
-                                 struct uk_netbuf **pkt,
-                                 struct uk_netbuf *fillup[],
-                                 uint16_t *fillup_count);
+                                 struct uk_netbuf **pkt);
 
 /** Driver callback type to submit one packet to a TX queue. */
 typedef int (*uk_netdev_tx_one_t)(struct uk_netdev *dev,
diff --git a/lib/uknetdev/netdev.c b/lib/uknetdev/netdev.c
index 8f7dd6e1..3b92f622 100644
--- a/lib/uknetdev/netdev.c
+++ b/lib/uknetdev/netdev.c
@@ -333,6 +333,7 @@ int uk_netdev_rxq_configure(struct uk_netdev *dev, uint16_t 
queue_id,
        UK_ASSERT(dev->ops->rxq_configure);
        UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
        UK_ASSERT(rx_conf);
+       UK_ASSERT(rx_conf->alloc_rxpkts);
 #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
        UK_ASSERT((rx_conf->callback && rx_conf->s)
                  || !rx_conf->callback);
diff --git a/plat/drivers/include/virtio/virtqueue.h 
b/plat/drivers/include/virtio/virtqueue.h
index 3d72c500..a4181f57 100644
--- a/plat/drivers/include/virtio/virtqueue.h
+++ b/plat/drivers/include/virtio/virtqueue.h
@@ -121,14 +121,19 @@ int virtqueue_notify_enabled(struct virtqueue *vq);
  *
  * @param vq
  *     Reference to the virtqueue.
+ * @param
+ *      Reference to a reference that will point to the cookie that was
+ *      submitted with the dequeued descriptor after successful exit of this
+ *      function.
  * @param len
  *     Reference to the length of the data packet.
  * @return
- *     On Success, returns a reference to cookie that was submitted with
- *     descriptor.
- *     On failure, returns NULL with the length unmodified.
+ *     >= 0 A buffer was dequeued from the ring and the count indicates
+ *     the number of used slots in the ring after dequeueing.
+ *     < 0 Failed to dequeue a buffer, the output parameters cookie and len
+ *      are unmodified.
  */
-void *virtqueue_buffer_dequeue(struct virtqueue *vq, __u32 *len);
+int virtqueue_buffer_dequeue(struct virtqueue *vq, void **cookie, __u32 *len);
 
 /**
  * Create a descriptor chain starting at index head,
diff --git a/plat/drivers/virtio/virtio_net.c b/plat/drivers/virtio/virtio_net.c
index 75a6cd69..4a41f94b 100644
--- a/plat/drivers/virtio/virtio_net.c
+++ b/plat/drivers/virtio/virtio_net.c
@@ -128,6 +128,9 @@ struct uk_netdev_rx_queue {
        uint16_t nb_desc;
        /* The flag to interrupt on the transmit queue */
        uint8_t intr_enabled;
+       /* User-provided receive buffer allocation function */
+       uk_netdev_alloc_rxpkts alloc_rxpkts;
+       void *alloc_rxpkts_argp;
        /* Reference to the uk_netdev */
        struct uk_netdev *ndev;
        /* The scatter list and its associated fragements */
@@ -197,9 +200,7 @@ static int virtio_netdev_xmit(struct uk_netdev *dev,
                              struct uk_netbuf *pkt);
 static int virtio_netdev_recv(struct uk_netdev *dev,
                              struct uk_netdev_rx_queue *queue,
-                             struct uk_netbuf **pkt,
-                             struct uk_netbuf *fillup[],
-                             uint16_t *fillup_count);
+                             struct uk_netbuf **pkt);
 static const struct uk_hwaddr *virtio_net_mac_get(struct uk_netdev *n);
 static __u16 virtio_net_mtu_get(struct uk_netdev *n);
 static unsigned virtio_net_promisc_get(struct uk_netdev *n);
@@ -212,8 +213,8 @@ static int virtio_netdev_rxq_dequeue(struct 
uk_netdev_rx_queue *rxq,
 static int virtio_netdev_rxq_enqueue(struct uk_netdev_rx_queue *rxq,
                                     struct uk_netbuf *netbuf);
 static int virtio_netdev_recv_done(struct virtqueue *vq, void *priv);
-static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-               struct uk_netbuf **netbuf, __u16 *count);
+static void virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
+                                   __u16 num, int notify);
 
 /**
  * Static global constants
@@ -245,12 +246,15 @@ static void virtio_netdev_xmit_free(struct 
uk_netdev_tx_queue *txq)
 {
        struct uk_netbuf *pkt = NULL;
        int cnt = 0;
+       int rc;
 
        for (;;) {
-               pkt = (struct uk_netbuf *)
-                       virtqueue_buffer_dequeue(txq->vq, NULL);
-               if (!pkt)
+               rc = virtqueue_buffer_dequeue(txq->vq, (void **) &pkt, NULL);
+               if (rc < 0)
                        break;
+
+               UK_ASSERT(pkt);
+
                /**
                 * Releasing the free buffer back to netbuf. The netbuf could
                 * use the destructor to inform the stack regarding the free up
@@ -262,42 +266,66 @@ static void virtio_netdev_xmit_free(struct 
uk_netdev_tx_queue *txq)
        uk_pr_debug("Free %"__PRIu16" descriptors\n", cnt);
 }
 
-static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-               struct uk_netbuf **netbuf, __u16 *count)
+#define RX_FILLUP_BATCHLEN 64
+
+static void virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
+                                   __u16 nb_desc,
+                                   int notify)
 {
+       struct uk_netbuf *netbuf[RX_FILLUP_BATCHLEN];
        int rc = 0;
-       __u16 i = 0;
+       __u16 i, j;
+       __u16 req;
        __u16 cnt = 0;
+       __u16 filled = 0;
 
        /**
         * Fixed amount of memory is allocated to each received buffer. In
         * our case since we don't support jumbo frame or LRO yet we require
         * that the buffer feed to the ring descriptor is atleast
         * ethernet MTU + virtio net header.
+        * Because we using 2 descriptor for a single netbuf, our effective
+        * queue size is just the half.
         */
-       for (i = 0; i < *count; i++) {
-               rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
-               if (rc == -ENOSPC) {
-                       uk_pr_debug(
-                               "No more place available to add descriptors\n");
-                       rc = 0;
-                       break;
-               } else if (unlikely(rc < 0)) {
-                       uk_pr_err("Failed to add a buffer to the virtqueue: 
%d\n",
-                                 rc);
-                       break;
+       nb_desc = ALIGN_DOWN(nb_desc, 2);
+       while (filled < nb_desc) {
+               req = MIN(nb_desc / 2, RX_FILLUP_BATCHLEN);
+               cnt = rxq->alloc_rxpkts(rxq->alloc_rxpkts_argp, netbuf, req);
+               for (i = 0; i < cnt; i++) {
+                       uk_pr_debug("Enqueue netbuf %"PRIu16"/%"PRIu16" (%p) to 
virtqueue %p...\n",
+                                   i + 1, cnt, netbuf[i], rxq);
+                       rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
+                       if (unlikely(rc < 0)) {
+                               uk_pr_err("Failed to add a buffer to receive 
virtqueue %p: %d\n",
+                                         rxq, rc);
+
+                               /*
+                                * Release netbufs that we are not going
+                                * to use anymore
+                                */
+                               for (j = i; j < cnt; j++)
+                                       uk_netbuf_free(netbuf[j]);
+                               goto out;
+                       }
+                       filled += 2;
+               }
+
+               if (unlikely(cnt < req)) {
+                       uk_pr_debug("Incomplete fill-up of netbufs on receive 
virtqueue %p: Out of memory",
+                                   rxq);
+                       goto out;
                }
-               cnt++;
        }
-       *count = *count - cnt;
+
+out:
+       uk_pr_debug("Programmed %"PRIu16" receive netbufs to receive virtqueue 
%p (status %x)\n",
+                   filled / 2, rxq, status);
 
        /**
         * Notify the host, when we submit new descriptor(s).
         */
-       if (cnt)
+       if (notify && filled)
                virtqueue_host_notify(rxq->vq);
-
-       return rc;
 }
 
 static int virtio_netdev_xmit(struct uk_netdev *dev,
@@ -463,17 +491,18 @@ static int virtio_netdev_rxq_enqueue(struct 
uk_netdev_rx_queue *rxq,
 static int virtio_netdev_rxq_dequeue(struct uk_netdev_rx_queue *rxq,
                                     struct uk_netbuf **netbuf)
 {
+       int ret;
        int rc = 0;
        struct uk_netbuf *buf = NULL;
        __u32 len;
 
        UK_ASSERT(netbuf);
 
-       buf = (struct uk_netbuf *)virtqueue_buffer_dequeue(rxq->vq, &len);
-       if (!buf) {
+       ret = virtqueue_buffer_dequeue(rxq->vq, (void **) &buf, &len);
+       if (ret < 0) {
                uk_pr_debug("No data available in the queue\n");
                *netbuf = NULL;
-               return 0;
+               return rxq->nb_desc;
        }
        if (unlikely((len < VIRTIO_HDR_LEN + ETH_HDR_LEN)
                     || (len > VIRTIO_PKT_BUFFER_LEN))) {
@@ -493,39 +522,34 @@ static int virtio_netdev_rxq_dequeue(struct 
uk_netdev_rx_queue *rxq,
        UK_ASSERT(rc == 1);
        *netbuf = buf;
 
-       return 1;
+       return ret;
 }
 
 static int virtio_netdev_recv(struct uk_netdev *dev,
                              struct uk_netdev_rx_queue *queue,
-                             struct uk_netbuf **pkt,
-                             struct uk_netbuf *fillup[],
-                             uint16_t *fillup_count)
+                             struct uk_netbuf **pkt)
 {
        int rc = 0;
        int cnt = 0;
 
        UK_ASSERT(dev && queue);
-       UK_ASSERT(!fillup || (fillup && *fillup_count > 0));
+       UK_ASSERT(pkt);
 
-       if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+       if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
                virtqueue_intr_disable(queue->vq);
                queue->intr_enabled &= ~(VTNET_INTR_EN);
        }
 
-       if (pkt) {
-               rc = virtio_netdev_rxq_dequeue(queue, pkt);
-               if (unlikely(rc < 0)) {
-                       uk_pr_err("Failed to dequeue the packet: %d\n", rc);
-                       goto err_exit;
-               }
-               cnt = rc;
+       rc = virtio_netdev_rxq_dequeue(queue, pkt);
+       if (unlikely(rc < 0)) {
+               uk_pr_err("Failed to dequeue the packet: %d\n", rc);
+               goto err_exit;
        }
-       if (fillup)
-               virtio_netdev_rx_fillup(queue, fillup, fillup_count);
+       cnt = (*pkt) ? 1 : 0;
+       virtio_netdev_rx_fillup(queue, (queue->nb_desc - rc), 1);
 
        /* Enable interrupt only when user had previously enabled it */
-       if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+       if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
                /* Need to enable the interrupt on the last packet */
                rc = virtqueue_intr_enable(queue->vq);
                if (rc == 1 && cnt == 0) {
@@ -539,6 +563,9 @@ static int virtio_netdev_recv(struct uk_netdev *dev,
                                          rc);
                                goto err_exit;
                        }
+                       /* Since we received something, we need to fillup */
+                       virtio_netdev_rx_fillup(queue, (queue->nb_desc - rc), 
1);
+
                        /* Need to enable the interrupt on the last packet */
                        rc = virtqueue_intr_enable(queue->vq);
                        cnt = (rc == 1) ? 2 : 1;
@@ -546,7 +573,7 @@ static int virtio_netdev_recv(struct uk_netdev *dev,
                        /* When there is packet in the buffer */
                        cnt = (rc == 1) ? 2 : 1;
                }
-       } else if (pkt && cnt > 0) {
+       } else if (cnt > 0) {
                /**
                 * For polling case, we report always there are further
                 * packets unless the queue is empty.
@@ -573,6 +600,8 @@ static struct uk_netdev_rx_queue 
*virtio_netdev_rx_queue_setup(
 
        UK_ASSERT(n);
        UK_ASSERT(conf);
+       UK_ASSERT(conf->alloc_rxpkts);
+
        vndev = to_virtionetdev(n);
        if (queue_id >= vndev->max_vqueue_pairs) {
                uk_pr_err("Invalid virtqueue identifier: %"__PRIu16"\n",
@@ -589,6 +618,11 @@ static struct uk_netdev_rx_queue 
*virtio_netdev_rx_queue_setup(
                goto err_exit;
        }
        rxq  = &vndev->rxqs[rc];
+       rxq->alloc_rxpkts = conf->alloc_rxpkts;
+       rxq->alloc_rxpkts_argp = conf->alloc_rxpkts_argp;
+
+       /* Allocate receive buffers for this queue */
+       virtio_netdev_rx_fillup(rxq, rxq->nb_desc, 0);
 
 exit:
        return rxq;
diff --git a/plat/drivers/virtio/virtio_ring.c 
b/plat/drivers/virtio/virtio_ring.c
index 02d568aa..56b1b6cd 100644
--- a/plat/drivers/virtio/virtio_ring.c
+++ b/plat/drivers/virtio/virtio_ring.c
@@ -260,19 +260,19 @@ __phys_addr virtqueue_physaddr(struct virtqueue *vq)
        return ukplat_virt_to_phys(vrq->vring_mem);
 }
 
-void *virtqueue_buffer_dequeue(struct virtqueue *vq, __u32 *len)
+int virtqueue_buffer_dequeue(struct virtqueue *vq, void **cookie, __u32 *len)
 {
        struct virtqueue_vring *vrq = NULL;
        __u16 used_idx, head_idx;
        struct vring_used_elem *elem;
-       void *cookie;
 
        UK_ASSERT(vq);
+       UK_ASSERT(cookie);
        vrq = to_virtqueue_vring(vq);
 
        /* No new descriptor since last dequeue operation */
        if (!virtqueue_hasdata(vrq))
-               return NULL;
+               return -ENOMSG;
        used_idx = vrq->last_used_desc_idx++ & (vrq->vring.num - 1);
        elem = &vrq->vring.used->ring[used_idx];
        /**
@@ -283,10 +283,10 @@ void *virtqueue_buffer_dequeue(struct virtqueue *vq, 
__u32 *len)
        head_idx = elem->id;
        if (len)
                *len = elem->len;
-       cookie = vrq->vq_info[head_idx].cookie;
+       *cookie = vrq->vq_info[head_idx].cookie;
        virtqueue_detach_desc(vrq, head_idx);
        vrq->vq_info[head_idx].cookie = NULL;
-       return cookie;
+       return (vrq->vring.num - vrq->desc_avail);
 }
 
 int virtqueue_buffer_enqueue(struct virtqueue *vq, void *cookie,
-- 
2.17.1


_______________________________________________
Minios-devel mailing list
Minios-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/minios-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.