[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Minios-devel] [UNIKRAFT PATCH v2] lib/uknetdev: Introduce receive buffer allocator callback



The idea of having a fill-up parameter on the receive function is
inconvenient because a user never knows how many receive buffers have
to be programmed before calling the receive function.
This commit introduces registering a callback when configuring a
receive queue. This callback has to be provided by the NETDEV API user
and is called by the driver whenever it programs new receive buffers to
the receive queue. In order to still provide performance, this
allocation callback function is called for a batch of receive buffers.
This new mechanism replaces the fill-up parameter on the receive function.

This commit also adopts virtio-net in order to comply with the API change.

Signed-off-by: Simon Kuenzer <simon.kuenzer@xxxxxxxxx>
---
 lib/uknetdev/include/uk/netdev.h        |  41 ++---------
 lib/uknetdev/include/uk/netdev_core.h   |  26 ++++++-
 lib/uknetdev/netdev.c                   |   1 +
 plat/drivers/include/virtio/virtqueue.h |  13 +++-
 plat/drivers/virtio/virtio_net.c        | 125 +++++++++++++++++++++-----------
 plat/drivers/virtio/virtio_ring.c       |  10 +--
 6 files changed, 127 insertions(+), 89 deletions(-)

diff --git a/lib/uknetdev/include/uk/netdev.h b/lib/uknetdev/include/uk/netdev.h
index f0fa769..b5ce96d 100644
--- a/lib/uknetdev/include/uk/netdev.h
+++ b/lib/uknetdev/include/uk/netdev.h
@@ -419,9 +419,7 @@ static inline int uk_netdev_rxq_intr_disable(struct 
uk_netdev *dev,
 }
 
 /**
- * Receive one packet and re-program used receive descriptor
- * Please note that before any packet can be received, the receive queue
- * has to be filled up with empty netbufs (see fillup parameter).
+ * Receive one packet and re-program used receive descriptors
  *
  * @param dev
  *   The Unikraft Network Device.
@@ -431,52 +429,29 @@ static inline int uk_netdev_rxq_intr_disable(struct 
uk_netdev *dev,
  *   to uk_netdev_configure().
  * @param pkt
  *   Reference to netbuf pointer which will be point to the received packet
- *   after the function call. Can be NULL if function is used to program
- *   receive descriptors only.
- * @param fillup
- *   Array of netbufs that should be used to program used descriptors again.
- *   Each of the netbuf should be freshly allocated/initialized and not part
- *   of any chain.
- *   `fillup` can be `NULL` but without re-programming of used descriptors no
- *   new packets can be received at some point.
- * @param fillup_count
- *   Length of `fillup` array. After the function call, `fillup_count` returns
- *   the number of left and unused netbufs on the array. `fillup_count` has to
- *   to 0 if `fillup` is `NULL`.
+ *   after the function call. `pkt` has never to be `NULL`.
  * @return
- *   - (0): No packet available or `pkt` was set to NULL,
- *          check `fillup_count` for used `fillup` netbufs
- *   - (1): `pkt` points to received netbuf,
- *          check `fillup_count` for used `fillup` netbufs
+ *   - (0): No packet available
+ *   - (1): `pkt` points to received netbuf
  *   - (2): `pkt` points to received netbuf but more received packets are
  *          available on the receive queue. When interrupts are used, they are
- *          disabled until 1 is returned on subsequent calls,
- *          check `fillup_count` for used `fillup` netbufs
+ *          disabled until 1 is returned on subsequent calls
  *   - (<0): Error code from driver
  */
 static inline int uk_netdev_rx_one(struct uk_netdev *dev, uint16_t queue_id,
-                                  struct uk_netbuf **pkt,
-                                  struct uk_netbuf *fillup[],
-                                  uint16_t *fillup_count)
+                                  struct uk_netbuf **pkt)
 {
        UK_ASSERT(dev);
        UK_ASSERT(dev->rx_one);
        UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
        UK_ASSERT(dev->_data->state == UK_NETDEV_RUNNING);
        UK_ASSERT(!PTRISERR(dev->_rx_queue[queue_id]));
-       UK_ASSERT((!fillup && fillup_count) || fillup);
+       UK_ASSERT(pkt);
 
-       return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt,
-                          fillup, fillup_count);
+       return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt);
 }
 
 /**
- * Shortcut for only filling up a receive queue with empty netbufs
- */
-#define uk_netdev_rx_fillup(dev, queue_id, fillup, fillup_count)       \
-       uk_netdev_rx_one((dev), (queue_id), NULL, (fillup), (fillup_count))
-
-/**
  * Transmit one packet
  *
  * @param dev
diff --git a/lib/uknetdev/include/uk/netdev_core.h 
b/lib/uknetdev/include/uk/netdev_core.h
index b77c45a..d30886d 100644
--- a/lib/uknetdev/include/uk/netdev_core.h
+++ b/lib/uknetdev/include/uk/netdev_core.h
@@ -176,6 +176,25 @@ typedef void (*uk_netdev_queue_event_t)(struct uk_netdev 
*dev,
                                        uint16_t queue_id, void *argp);
 
 /**
+ * User callback used by the driver to allocate netbufs
+ * that are used to setup receive descriptors.
+ *
+ * @param argp
+ *   User-provided argument.
+ * @param pkts
+ *   Array for netbuf pointers that the function should allocate.
+ * @param count
+ *   Number of netbufs requested (equal to length of pkts).
+ * @return
+ *   Number of successful allocated netbufs,
+ *   has to be in range [0, count].
+ *   References to allocated packets are placed to pkts[0]...pkts[count -1].
+ */
+typedef uint16_t (*uk_netdev_alloc_rxpkts)(void *argp,
+                                          struct uk_netbuf *pkts[],
+                                          uint16_t count);
+
+/**
  * A structure used to configure an Unikraft network device RX queue.
  */
 struct uk_netdev_rxqueue_conf {
@@ -183,6 +202,9 @@ struct uk_netdev_rxqueue_conf {
        void *callback_cookie;            /**< Argument pointer for callback. */
 
        struct uk_alloc *a;               /**< Allocator for descriptors. */
+
+       uk_netdev_alloc_rxpkts alloc_rxpkts; /**< Allocator for rx netbufs */
+       void *alloc_rxpkts_argp;             /**< Argument for alloc_rxpkts */
 #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
        struct uk_sched *s;               /**< Scheduler for dispatcher. */
 #endif
@@ -266,9 +288,7 @@ typedef int (*uk_netdev_rxq_intr_disable_t)(struct 
uk_netdev *dev,
 /** Driver callback type to retrieve one packet from a RX queue. */
 typedef int (*uk_netdev_rx_one_t)(struct uk_netdev *dev,
                                  struct uk_netdev_rx_queue *queue,
-                                 struct uk_netbuf **pkt,
-                                 struct uk_netbuf *fillup[],
-                                 uint16_t *fillup_count);
+                                 struct uk_netbuf **pkt);
 
 /** Driver callback type to submit one packet to a TX queue. */
 typedef int (*uk_netdev_tx_one_t)(struct uk_netdev *dev,
diff --git a/lib/uknetdev/netdev.c b/lib/uknetdev/netdev.c
index 8f7dd6e..3b92f62 100644
--- a/lib/uknetdev/netdev.c
+++ b/lib/uknetdev/netdev.c
@@ -333,6 +333,7 @@ int uk_netdev_rxq_configure(struct uk_netdev *dev, uint16_t 
queue_id,
        UK_ASSERT(dev->ops->rxq_configure);
        UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
        UK_ASSERT(rx_conf);
+       UK_ASSERT(rx_conf->alloc_rxpkts);
 #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
        UK_ASSERT((rx_conf->callback && rx_conf->s)
                  || !rx_conf->callback);
diff --git a/plat/drivers/include/virtio/virtqueue.h 
b/plat/drivers/include/virtio/virtqueue.h
index 3d72c50..a4181f5 100644
--- a/plat/drivers/include/virtio/virtqueue.h
+++ b/plat/drivers/include/virtio/virtqueue.h
@@ -121,14 +121,19 @@ int virtqueue_notify_enabled(struct virtqueue *vq);
  *
  * @param vq
  *     Reference to the virtqueue.
+ * @param
+ *      Reference to a reference that will point to the cookie that was
+ *      submitted with the dequeued descriptor after successful exit of this
+ *      function.
  * @param len
  *     Reference to the length of the data packet.
  * @return
- *     On Success, returns a reference to cookie that was submitted with
- *     descriptor.
- *     On failure, returns NULL with the length unmodified.
+ *     >= 0 A buffer was dequeued from the ring and the count indicates
+ *     the number of used slots in the ring after dequeueing.
+ *     < 0 Failed to dequeue a buffer, the output parameters cookie and len
+ *      are unmodified.
  */
-void *virtqueue_buffer_dequeue(struct virtqueue *vq, __u32 *len);
+int virtqueue_buffer_dequeue(struct virtqueue *vq, void **cookie, __u32 *len);
 
 /**
  * Create a descriptor chain starting at index head,
diff --git a/plat/drivers/virtio/virtio_net.c b/plat/drivers/virtio/virtio_net.c
index 75a6cd6..1088ccc 100644
--- a/plat/drivers/virtio/virtio_net.c
+++ b/plat/drivers/virtio/virtio_net.c
@@ -128,6 +128,9 @@ struct uk_netdev_rx_queue {
        uint16_t nb_desc;
        /* The flag to interrupt on the transmit queue */
        uint8_t intr_enabled;
+       /* User-provided receive buffer allocation function */
+       uk_netdev_alloc_rxpkts alloc_rxpkts;
+       void *alloc_rxpkts_argp;
        /* Reference to the uk_netdev */
        struct uk_netdev *ndev;
        /* The scatter list and its associated fragements */
@@ -197,9 +200,7 @@ static int virtio_netdev_xmit(struct uk_netdev *dev,
                              struct uk_netbuf *pkt);
 static int virtio_netdev_recv(struct uk_netdev *dev,
                              struct uk_netdev_rx_queue *queue,
-                             struct uk_netbuf **pkt,
-                             struct uk_netbuf *fillup[],
-                             uint16_t *fillup_count);
+                             struct uk_netbuf **pkt);
 static const struct uk_hwaddr *virtio_net_mac_get(struct uk_netdev *n);
 static __u16 virtio_net_mtu_get(struct uk_netdev *n);
 static unsigned virtio_net_promisc_get(struct uk_netdev *n);
@@ -213,7 +214,7 @@ static int virtio_netdev_rxq_enqueue(struct 
uk_netdev_rx_queue *rxq,
                                     struct uk_netbuf *netbuf);
 static int virtio_netdev_recv_done(struct virtqueue *vq, void *priv);
 static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-               struct uk_netbuf **netbuf, __u16 *count);
+                                  __u16 num, int notify);
 
 /**
  * Static global constants
@@ -245,12 +246,15 @@ static void virtio_netdev_xmit_free(struct 
uk_netdev_tx_queue *txq)
 {
        struct uk_netbuf *pkt = NULL;
        int cnt = 0;
+       int rc;
 
        for (;;) {
-               pkt = (struct uk_netbuf *)
-                       virtqueue_buffer_dequeue(txq->vq, NULL);
-               if (!pkt)
+               rc = virtqueue_buffer_dequeue(txq->vq, (void **) &pkt, NULL);
+               if (rc < 0)
                        break;
+
+               UK_ASSERT(pkt);
+
                /**
                 * Releasing the free buffer back to netbuf. The netbuf could
                 * use the destructor to inform the stack regarding the free up
@@ -262,42 +266,70 @@ static void virtio_netdev_xmit_free(struct 
uk_netdev_tx_queue *txq)
        uk_pr_debug("Free %"__PRIu16" descriptors\n", cnt);
 }
 
+#define RX_FILLUP_BATCHLEN 64
+
 static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-               struct uk_netbuf **netbuf, __u16 *count)
+                                  __u16 nb_desc,
+                                  int notify)
 {
+       struct uk_netbuf *netbuf[RX_FILLUP_BATCHLEN];
        int rc = 0;
-       __u16 i = 0;
+       int ret = 0;
+       __u16 i, j;
+       __u16 req;
        __u16 cnt = 0;
+       __u16 filled = 0;
 
        /**
         * Fixed amount of memory is allocated to each received buffer. In
         * our case since we don't support jumbo frame or LRO yet we require
         * that the buffer feed to the ring descriptor is atleast
         * ethernet MTU + virtio net header.
+        * Because we using 2 descriptor for a single netbuf, our effective
+        * queue size is just the half.
         */
-       for (i = 0; i < *count; i++) {
-               rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
-               if (rc == -ENOSPC) {
-                       uk_pr_debug(
-                               "No more place available to add descriptors\n");
-                       rc = 0;
-                       break;
-               } else if (unlikely(rc < 0)) {
-                       uk_pr_err("Failed to add a buffer to the virtqueue: 
%d\n",
-                                 rc);
-                       break;
+       nb_desc = ALIGN_DOWN(nb_desc, 2);
+       while (filled < nb_desc) {
+               req = MIN(nb_desc / 2, RX_FILLUP_BATCHLEN);
+               cnt = rxq->alloc_rxpkts(rxq->alloc_rxpkts_argp, netbuf, req);
+               for (i = 0; i < cnt; i++) {
+                       uk_pr_debug("Enqueue netbuf %"PRIu16"/%"PRIu16" (%p) to 
virtqueue %p...\n",
+                                   i + 1, cnt, netbuf[i], rxq);
+                       rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
+                       if (unlikely(rc < 0)) {
+                               uk_pr_err("Failed to add a buffer to receive 
virtqueue %p: %d\n",
+                                         rxq, rc);
+
+                               /*
+                                * Release netbufs that we are not going
+                                * to use anymore
+                                */
+                               for (j = i; j < cnt; j++)
+                                       uk_netbuf_free(netbuf[j]);
+                               return rc;
+                       }
+               }
+               filled += (cnt * 2);
+
+               if (unlikely(cnt < req)) {
+                       uk_pr_debug("Incomplete fill-up of netbufs on receive 
virtqueue %p: Out of memory",
+                                   rxq);
+                       ret = 1;
+                       goto out;
                }
-               cnt++;
        }
-       *count = *count - cnt;
+
+out:
+       uk_pr_debug("Programmed %"PRIu16" receive netbufs to receive virtqueue 
%p\n",
+                   filled / 2, rxq);
 
        /**
         * Notify the host, when we submit new descriptor(s).
         */
-       if (cnt)
+       if (notify && filled)
                virtqueue_host_notify(rxq->vq);
 
-       return rc;
+       return ret;
 }
 
 static int virtio_netdev_xmit(struct uk_netdev *dev,
@@ -463,17 +495,18 @@ static int virtio_netdev_rxq_enqueue(struct 
uk_netdev_rx_queue *rxq,
 static int virtio_netdev_rxq_dequeue(struct uk_netdev_rx_queue *rxq,
                                     struct uk_netbuf **netbuf)
 {
+       int ret;
        int rc = 0;
        struct uk_netbuf *buf = NULL;
        __u32 len;
 
        UK_ASSERT(netbuf);
 
-       buf = (struct uk_netbuf *)virtqueue_buffer_dequeue(rxq->vq, &len);
-       if (!buf) {
+       ret = virtqueue_buffer_dequeue(rxq->vq, (void **) &buf, &len);
+       if (ret < 0) {
                uk_pr_debug("No data available in the queue\n");
                *netbuf = NULL;
-               return 0;
+               return rxq->nb_desc;
        }
        if (unlikely((len < VIRTIO_HDR_LEN + ETH_HDR_LEN)
                     || (len > VIRTIO_PKT_BUFFER_LEN))) {
@@ -493,39 +526,34 @@ static int virtio_netdev_rxq_dequeue(struct 
uk_netdev_rx_queue *rxq,
        UK_ASSERT(rc == 1);
        *netbuf = buf;
 
-       return 1;
+       return ret;
 }
 
 static int virtio_netdev_recv(struct uk_netdev *dev,
                              struct uk_netdev_rx_queue *queue,
-                             struct uk_netbuf **pkt,
-                             struct uk_netbuf *fillup[],
-                             uint16_t *fillup_count)
+                             struct uk_netbuf **pkt)
 {
        int rc = 0;
        int cnt = 0;
 
        UK_ASSERT(dev && queue);
-       UK_ASSERT(!fillup || (fillup && *fillup_count > 0));
+       UK_ASSERT(pkt);
 
-       if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+       if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
                virtqueue_intr_disable(queue->vq);
                queue->intr_enabled &= ~(VTNET_INTR_EN);
        }
 
-       if (pkt) {
-               rc = virtio_netdev_rxq_dequeue(queue, pkt);
-               if (unlikely(rc < 0)) {
-                       uk_pr_err("Failed to dequeue the packet: %d\n", rc);
-                       goto err_exit;
-               }
-               cnt = rc;
+       rc = virtio_netdev_rxq_dequeue(queue, pkt);
+       if (unlikely(rc < 0)) {
+               uk_pr_err("Failed to dequeue the packet: %d\n", rc);
+               goto err_exit;
        }
-       if (fillup)
-               virtio_netdev_rx_fillup(queue, fillup, fillup_count);
+       cnt = (*pkt) ? 1 : 0;
+       virtio_netdev_rx_fillup(queue, (queue->nb_desc - rc), 1);
 
        /* Enable interrupt only when user had previously enabled it */
-       if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+       if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
                /* Need to enable the interrupt on the last packet */
                rc = virtqueue_intr_enable(queue->vq);
                if (rc == 1 && cnt == 0) {
@@ -542,11 +570,13 @@ static int virtio_netdev_recv(struct uk_netdev *dev,
                        /* Need to enable the interrupt on the last packet */
                        rc = virtqueue_intr_enable(queue->vq);
                        cnt = (rc == 1) ? 2 : 1;
+                       /* Since we received something, we need to fillup */
+                       virtio_netdev_rx_fillup(queue, (queue->nb_desc - rc), 
1);
                } else if (cnt > 0) {
                        /* When there is packet in the buffer */
                        cnt = (rc == 1) ? 2 : 1;
                }
-       } else if (pkt && cnt > 0) {
+       } else if (cnt > 0) {
                /**
                 * For polling case, we report always there are further
                 * packets unless the queue is empty.
@@ -573,6 +603,8 @@ static struct uk_netdev_rx_queue 
*virtio_netdev_rx_queue_setup(
 
        UK_ASSERT(n);
        UK_ASSERT(conf);
+       UK_ASSERT(conf->alloc_rxpkts);
+
        vndev = to_virtionetdev(n);
        if (queue_id >= vndev->max_vqueue_pairs) {
                uk_pr_err("Invalid virtqueue identifier: %"__PRIu16"\n",
@@ -589,6 +621,11 @@ static struct uk_netdev_rx_queue 
*virtio_netdev_rx_queue_setup(
                goto err_exit;
        }
        rxq  = &vndev->rxqs[rc];
+       rxq->alloc_rxpkts = conf->alloc_rxpkts;
+       rxq->alloc_rxpkts_argp = conf->alloc_rxpkts_argp;
+
+       /* Allocate receive buffers for this queue */
+       virtio_netdev_rx_fillup(rxq, rxq->nb_desc, 0);
 
 exit:
        return rxq;
diff --git a/plat/drivers/virtio/virtio_ring.c 
b/plat/drivers/virtio/virtio_ring.c
index 02d568a..56b1b6c 100644
--- a/plat/drivers/virtio/virtio_ring.c
+++ b/plat/drivers/virtio/virtio_ring.c
@@ -260,19 +260,19 @@ __phys_addr virtqueue_physaddr(struct virtqueue *vq)
        return ukplat_virt_to_phys(vrq->vring_mem);
 }
 
-void *virtqueue_buffer_dequeue(struct virtqueue *vq, __u32 *len)
+int virtqueue_buffer_dequeue(struct virtqueue *vq, void **cookie, __u32 *len)
 {
        struct virtqueue_vring *vrq = NULL;
        __u16 used_idx, head_idx;
        struct vring_used_elem *elem;
-       void *cookie;
 
        UK_ASSERT(vq);
+       UK_ASSERT(cookie);
        vrq = to_virtqueue_vring(vq);
 
        /* No new descriptor since last dequeue operation */
        if (!virtqueue_hasdata(vrq))
-               return NULL;
+               return -ENOMSG;
        used_idx = vrq->last_used_desc_idx++ & (vrq->vring.num - 1);
        elem = &vrq->vring.used->ring[used_idx];
        /**
@@ -283,10 +283,10 @@ void *virtqueue_buffer_dequeue(struct virtqueue *vq, 
__u32 *len)
        head_idx = elem->id;
        if (len)
                *len = elem->len;
-       cookie = vrq->vq_info[head_idx].cookie;
+       *cookie = vrq->vq_info[head_idx].cookie;
        virtqueue_detach_desc(vrq, head_idx);
        vrq->vq_info[head_idx].cookie = NULL;
-       return cookie;
+       return (vrq->vring.num - vrq->desc_avail);
 }
 
 int virtqueue_buffer_enqueue(struct virtqueue *vq, void *cookie,
-- 
2.7.4


_______________________________________________
Minios-devel mailing list
Minios-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/minios-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.