[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Minios-devel] [UNIKRAFT PATCH] lib/uknetdev: Introduce receive buffer allocator callback



Hey Sharan,

thanks a lot for your comments. Let me know what you think to my replies ;-).

Thanks,

Simon

On 13.12.18 17:54, Sharan Santhanam wrote:
Hello Simon,


Please find my comments inline.

Thanks & Regards
Sharan


On 12/13/18 7:44 AM, Simon Kuenzer wrote:
Sorry, this patch is a bit long but I wanted to keep everything functional working.

On 13.12.18 07:41, Simon Kuenzer wrote:
The idea of having a fill-up parameter on the receive function is
inconvenient because a user never knows how many receive buffers have
to be programmed before calling the receive function.
This commit introduces registering a callback when configuring a
receive queue. This callback has to be provided by the NETDEV API user
and is called by the driver whenever it programs new receive buffers to
the receive queue. In order to still provide performance, this
allocation callback function is called for a batch of receive buffers.
This new mechanism replaces the fill-up parameter on the receive function.

This commit also adopts virtio-net in order to comply with the API change.

Signed-off-by: Simon Kuenzer <simon.kuenzer@xxxxxxxxx>
---
  lib/uknetdev/include/uk/netdev.h        |  41 +++----------
  lib/uknetdev/include/uk/netdev_core.h   |  26 +++++++-
  lib/uknetdev/netdev.c                   |   1 +
  plat/drivers/include/virtio/virtqueue.h |   9 +++
  plat/drivers/virtio/virtio_net.c        | 104 ++++++++++++++++++++------------
  plat/drivers/virtio/virtio_ring.c       |   9 ++-
  6 files changed, 114 insertions(+), 76 deletions(-)

diff --git a/lib/uknetdev/include/uk/netdev.h b/lib/uknetdev/include/uk/netdev.h
index f0fa769..b5ce96d 100644
--- a/lib/uknetdev/include/uk/netdev.h
+++ b/lib/uknetdev/include/uk/netdev.h
@@ -419,9 +419,7 @@ static inline int uk_netdev_rxq_intr_disable(struct uk_netdev *dev,
  }
  /**
- * Receive one packet and re-program used receive descriptor
- * Please note that before any packet can be received, the receive queue
- * has to be filled up with empty netbufs (see fillup parameter).
+ * Receive one packet and re-program used receive descriptors
   *
   * @param dev
   *   The Unikraft Network Device.
@@ -431,52 +429,29 @@ static inline int uk_netdev_rxq_intr_disable(struct uk_netdev *dev,
   *   to uk_netdev_configure().
   * @param pkt
   *   Reference to netbuf pointer which will be point to the received packet - *   after the function call. Can be NULL if function is used to program
- *   receive descriptors only.
- * @param fillup
- *   Array of netbufs that should be used to program used descriptors again. - *   Each of the netbuf should be freshly allocated/initialized and not part
- *   of any chain.
- *   `fillup` can be `NULL` but without re-programming of used descriptors no
- *   new packets can be received at some point.
- * @param fillup_count
- *   Length of `fillup` array. After the function call, `fillup_count` returns - *   the number of left and unused netbufs on the array. `fillup_count` has to
- *   to 0 if `fillup` is `NULL`.
+ *   after the function call. `pkt` has never to be `NULL`.
   * @return
- *   - (0): No packet available or `pkt` was set to NULL,
- *          check `fillup_count` for used `fillup` netbufs
- *   - (1): `pkt` points to received netbuf,
- *          check `fillup_count` for used `fillup` netbufs
+ *   - (0): No packet available
+ *   - (1): `pkt` points to received netbuf
   *   - (2): `pkt` points to received netbuf but more received packets are    *          available on the receive queue. When interrupts are used, they are
- *          disabled until 1 is returned on subsequent calls,
- *          check `fillup_count` for used `fillup` netbufs
+ *          disabled until 1 is returned on subsequent calls
   *   - (<0): Error code from driver
   */
  static inline int uk_netdev_rx_one(struct uk_netdev *dev, uint16_t queue_id,
-                   struct uk_netbuf **pkt,
-                   struct uk_netbuf *fillup[],
-                   uint16_t *fillup_count)
+                   struct uk_netbuf **pkt)
I like the idea of having a callback refill the user buffer but in the previous API, we had a way of communicating with the user the number of buffers refilled. In this API, we fail silently if the buffer refill did not work.


The actual failure happens now within the `uk_netdev_alloc_rxpkts()` callback which I think makes the receive API much easier to use. This callback is provided by the user so it is the user who is aware of the failure anyways.

  {
      UK_ASSERT(dev);
      UK_ASSERT(dev->rx_one);
      UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
      UK_ASSERT(dev->_data->state == UK_NETDEV_RUNNING);
      UK_ASSERT(!PTRISERR(dev->_rx_queue[queue_id]));
-    UK_ASSERT((!fillup && fillup_count) || fillup);
+    UK_ASSERT(pkt);
-    return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt,
-               fillup, fillup_count);
+    return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt);
  }
  /**
- * Shortcut for only filling up a receive queue with empty netbufs
- */
-#define uk_netdev_rx_fillup(dev, queue_id, fillup, fillup_count)    \
-    uk_netdev_rx_one((dev), (queue_id), NULL, (fillup), (fillup_count))
-
-/**
   * Transmit one packet
   *
   * @param dev
diff --git a/lib/uknetdev/include/uk/netdev_core.h b/lib/uknetdev/include/uk/netdev_core.h
index b77c45a..d30886d 100644
--- a/lib/uknetdev/include/uk/netdev_core.h
+++ b/lib/uknetdev/include/uk/netdev_core.h
@@ -176,6 +176,25 @@ typedef void (*uk_netdev_queue_event_t)(struct uk_netdev *dev,
                      uint16_t queue_id, void *argp);
  /**
+ * User callback used by the driver to allocate netbufs
+ * that are used to setup receive descriptors.
+ *
+ * @param argp
+ *   User-provided argument.
+ * @param pkts
+ *   Array for netbuf pointers that the function should allocate.
+ * @param count
+ *   Number of netbufs requested (equal to length of pkts).
+ * @return
+ *   Number of successful allocated netbufs,
+ *   has to be in range [0, count].
+ *   References to allocated packets are placed to pkts[0]...pkts[count -1].
+ */
+typedef uint16_t (*uk_netdev_alloc_rxpkts)(void *argp,
+                       struct uk_netbuf *pkts[],
+                       uint16_t count);
+
Don't we want to return an error to the user in case buffer allocation failed.


It is an interface doing batching. The API user is implementing the function so it is up to the user to handle the error. I think the driver shouldn't care too much about it because a likely error is that receive buffer pools my be dimensioned too small to cope with incoming packet rates. It would be even further painful to handle errors in the driver other than just letting descriptors unprogrammed. Because of this, I think it is best to give the driver what we could successfully allocate. Actually the driver could guess that there was something going wrong (e.g., out of memory) if the function returns less netbufs than requested with `count`. `0` is also valid return value (so no netbuf allocated) which ends up in no descriptor is programmed.

+/**
   * A structure used to configure an Unikraft network device RX queue.
   */
  struct uk_netdev_rxqueue_conf {
@@ -183,6 +202,9 @@ struct uk_netdev_rxqueue_conf {
      void *callback_cookie;            /**< Argument pointer for callback. */       struct uk_alloc *a;               /**< Allocator for descriptors. */
+
+    uk_netdev_alloc_rxpkts alloc_rxpkts; /**< Allocator for rx netbufs */ +    void *alloc_rxpkts_argp;             /**< Argument for alloc_rxpkts */
  #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
      struct uk_sched *s;               /**< Scheduler for dispatcher. */
  #endif
@@ -266,9 +288,7 @@ typedef int (*uk_netdev_rxq_intr_disable_t)(struct uk_netdev *dev,
  /** Driver callback type to retrieve one packet from a RX queue. */
  typedef int (*uk_netdev_rx_one_t)(struct uk_netdev *dev,
                    struct uk_netdev_rx_queue *queue,
-                  struct uk_netbuf **pkt,
-                  struct uk_netbuf *fillup[],
-                  uint16_t *fillup_count);
+                  struct uk_netbuf **pkt);
  /** Driver callback type to submit one packet to a TX queue. */
  typedef int (*uk_netdev_tx_one_t)(struct uk_netdev *dev,
diff --git a/lib/uknetdev/netdev.c b/lib/uknetdev/netdev.c
index 8f7dd6e..3b92f62 100644
--- a/lib/uknetdev/netdev.c
+++ b/lib/uknetdev/netdev.c
@@ -333,6 +333,7 @@ int uk_netdev_rxq_configure(struct uk_netdev *dev, uint16_t queue_id,
      UK_ASSERT(dev->ops->rxq_configure);
      UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
      UK_ASSERT(rx_conf);
+    UK_ASSERT(rx_conf->alloc_rxpkts);
  #ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
      UK_ASSERT((rx_conf->callback && rx_conf->s)
            || !rx_conf->callback);
diff --git a/plat/drivers/include/virtio/virtqueue.h b/plat/drivers/include/virtio/virtqueue.h
index 3d72c50..e8bfe29 100644
--- a/plat/drivers/include/virtio/virtqueue.h
+++ b/plat/drivers/include/virtio/virtqueue.h
@@ -184,6 +184,15 @@ struct virtqueue *virtqueue_create(__u16 queue_id, __u16 nr_descs, __u16 align,
                     struct virtio_dev *vdev, struct uk_alloc *a);
  /**
+ * Return the number of available descriptors of a virtqueue
+ * @param vq
+ *    A reference to the virtqueue.
+ * @return __uint16
+ *    Number of available descriptors
+ */
+__u16 virtqueue_avail(struct virtqueue *vq);
+
+/**
   * Check the virtqueue if full.
   * @param vq
   *    A reference to the virtqueue.
diff --git a/plat/drivers/virtio/virtio_net.c b/plat/drivers/virtio/virtio_net.c
index 75a6cd6..5c7162e 100644
--- a/plat/drivers/virtio/virtio_net.c
+++ b/plat/drivers/virtio/virtio_net.c
@@ -128,6 +128,9 @@ struct uk_netdev_rx_queue {
      uint16_t nb_desc;
      /* The flag to interrupt on the transmit queue */
      uint8_t intr_enabled;
+    /* User-provided receive buffer allocation function */
+    uk_netdev_alloc_rxpkts alloc_rxpkts;
+    void *alloc_rxpkts_argp;
      /* Reference to the uk_netdev */
      struct uk_netdev *ndev;
      /* The scatter list and its associated fragements */
@@ -197,9 +200,7 @@ static int virtio_netdev_xmit(struct uk_netdev *dev,
                    struct uk_netbuf *pkt);
  static int virtio_netdev_recv(struct uk_netdev *dev,
                    struct uk_netdev_rx_queue *queue,
-                  struct uk_netbuf **pkt,
-                  struct uk_netbuf *fillup[],
-                  uint16_t *fillup_count);
+                  struct uk_netbuf **pkt);
  static const struct uk_hwaddr *virtio_net_mac_get(struct uk_netdev *n);
  static __u16 virtio_net_mtu_get(struct uk_netdev *n);
  static unsigned virtio_net_promisc_get(struct uk_netdev *n);
@@ -213,7 +214,7 @@ static int virtio_netdev_rxq_enqueue(struct uk_netdev_rx_queue *rxq,
                       struct uk_netbuf *netbuf);
  static int virtio_netdev_recv_done(struct virtqueue *vq, void *priv);
  static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-        struct uk_netbuf **netbuf, __u16 *count);
+                   int notify);
  /**
   * Static global constants
@@ -262,39 +263,62 @@ static void virtio_netdev_xmit_free(struct uk_netdev_tx_queue *txq)
      uk_pr_debug("Free %"__PRIu16" descriptors\n", cnt);
  }
-static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
-        struct uk_netbuf **netbuf, __u16 *count)
+#define RX_FILLUP_BATCHLEN 64
+
+static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq, int notify)
  {
+    struct uk_netbuf *netbuf[RX_FILLUP_BATCHLEN];
      int rc = 0;
-    __u16 i = 0;
-    __u16 cnt = 0;
+    __u16 i, j;
+    __u16 req;
+    __u16 cnt;
+    __u16 filled = 0;
      /**
       * Fixed amount of memory is allocated to each received buffer. In
       * our case since we don't support jumbo frame or LRO yet we require
       * that the buffer feed to the ring descriptor is atleast
       * ethernet MTU + virtio net header.
+     * Because we using 2 descriptor for a single netbuf, our effective
+     * queue size is just the half.
       */
-    for (i = 0; i < *count; i++) {
-        rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
-        if (rc == -ENOSPC) {
-            uk_pr_debug(
-                "No more place available to add descriptors\n");
-            rc = 0;
-            break;
-        } else if (unlikely(rc < 0)) {
-            uk_pr_err("Failed to add a buffer to the virtqueue: %d\n",
-                  rc);
-            break;
+    while ((req = (virtqueue_avail(rxq->vq) / 2))) {
+        req = MIN(req, RX_FILLUP_BATCHLEN);
+        cnt = rxq->alloc_rxpkts(rxq->alloc_rxpkts_argp, netbuf, req);
+        for (i = 0; i < cnt; i++) {
+            uk_pr_debug("Enqueue netbuf %"PRIu16"/%"PRIu16" (%p) to virtqueue %p...\n",
+                    i + 1, cnt, netbuf[i], rxq);
+            rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
While enqueuing into the virtio-ring the API returns the number of descriptor availble. The moment rc == 0, the queue is full.

As a suggestion we could also change "virtqueue_buffer_dequeue" to return the number of descriptor available, thereby eliminating the need for fetch the queue available.
int virtqueue_buffer_dequeue(struct virtqueue *vq, void **buf,
                   __u32 *len);

I agree, sounds right. I will look into this.

+            if (unlikely(rc < 0)) {

Do we want to report ring full as an error?

That should be now a case that shouldn't happen anymore because this function is querying the number of available slots before asking for allocation. It will exactly fit or less (in case we are in memory pressure). If not, something really bad happened in the driver. However, the general purpose error handling is also catching this kind of failure. I don't think we need a separate error handling for this anymore.

+                uk_pr_err("Failed to add a buffer to receive virtqueue %p: %d\n",
+                      rxq, rc);
+
+                /*
+                 * Release netbufs that we are not going
+                 * to use anymore
+                 */
+                for (j = i; j < cnt; j++)

Should we free this memory or give it back to user? Wouldn't freeing it up make it expensive?

Because we have a unlikely terrible situation, I think the performance of the error clean-up is less important. The problem I see with your suggestion is that we would need another callback to give the buffer back to the user and that would bloat the API. However, the user is actually able to hook into the free operation already: The destructor callback of netbufs can be used for this.

+                    uk_netbuf_free(netbuf[j]);
+                goto out;
+            }
+            filled++;
+        }
+
+        if (unlikely(cnt < req)) {
+            uk_pr_debug("Failed to complete fill-up of receive virtqueue %p: Out of memory",
+                    rxq);
+            goto out;
          }
-        cnt++;
      }
-    *count = *count - cnt;
+
+out:
+    uk_pr_debug("Programmed %"PRIu16" receive netbufs to receive virtqueue %p\n",
+            filled, rxq);
      /**
       * Notify the host, when we submit new descriptor(s).
       */
-    if (cnt)
+    if (notify && filled)
          virtqueue_host_notify(rxq->vq);
      return rc;
@@ -498,34 +522,29 @@ static int virtio_netdev_rxq_dequeue(struct uk_netdev_rx_queue *rxq,
  static int virtio_netdev_recv(struct uk_netdev *dev,
                    struct uk_netdev_rx_queue *queue,
-                  struct uk_netbuf **pkt,
-                  struct uk_netbuf *fillup[],
-                  uint16_t *fillup_count)
+                  struct uk_netbuf **pkt)
  {
      int rc = 0;
      int cnt = 0;
      UK_ASSERT(dev && queue);
-    UK_ASSERT(!fillup || (fillup && *fillup_count > 0));
+    UK_ASSERT(pkt);
-    if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+    if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
          virtqueue_intr_disable(queue->vq);
          queue->intr_enabled &= ~(VTNET_INTR_EN);
      }
-    if (pkt) {
-        rc = virtio_netdev_rxq_dequeue(queue, pkt);
-        if (unlikely(rc < 0)) {
-            uk_pr_err("Failed to dequeue the packet: %d\n", rc);
-            goto err_exit;
-        }
-        cnt = rc;
+    rc = virtio_netdev_rxq_dequeue(queue, pkt);
+    if (unlikely(rc < 0)) {
+        uk_pr_err("Failed to dequeue the packet: %d\n", rc);
+        goto err_exit;
      }
-    if (fillup)
-        virtio_netdev_rx_fillup(queue, fillup, fillup_count);
+    cnt = rc;
+    virtio_netdev_rx_fillup(queue, 1);
      /* Enable interrupt only when user had previously enabled it */
-    if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+    if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
          /* Need to enable the interrupt on the last packet */
          rc = virtqueue_intr_enable(queue->vq);
          if (rc == 1 && cnt == 0) {
@@ -542,11 +561,13 @@ static int virtio_netdev_recv(struct uk_netdev *dev,
              /* Need to enable the interrupt on the last packet */
              rc = virtqueue_intr_enable(queue->vq);
              cnt = (rc == 1) ? 2 : 1;
+            /* Since we received something, we need to fillup */
+            virtio_netdev_rx_fillup(queue, 1);
          } else if (cnt > 0) {
              /* When there is packet in the buffer */
              cnt = (rc == 1) ? 2 : 1;
          }
-    } else if (pkt && cnt > 0) {
+    } else if (cnt > 0) {
          /**
           * For polling case, we report always there are further
           * packets unless the queue is empty.
@@ -573,6 +594,8 @@ static struct uk_netdev_rx_queue *virtio_netdev_rx_queue_setup(
      UK_ASSERT(n);
      UK_ASSERT(conf);
+    UK_ASSERT(conf->alloc_rxpkts);
+
      vndev = to_virtionetdev(n);
      if (queue_id >= vndev->max_vqueue_pairs) {
          uk_pr_err("Invalid virtqueue identifier: %"__PRIu16"\n",
@@ -589,6 +612,11 @@ static struct uk_netdev_rx_queue *virtio_netdev_rx_queue_setup(
          goto err_exit;
      }
      rxq  = &vndev->rxqs[rc];
+    rxq->alloc_rxpkts = conf->alloc_rxpkts;
+    rxq->alloc_rxpkts_argp = conf->alloc_rxpkts_argp;
+
+    /* Allocate receive buffers for this queue */
+    virtio_netdev_rx_fillup(rxq, 0);
  exit:
      return rxq;
diff --git a/plat/drivers/virtio/virtio_ring.c b/plat/drivers/virtio/virtio_ring.c
index 02d568a..5eaa7e7 100644
--- a/plat/drivers/virtio/virtio_ring.c
+++ b/plat/drivers/virtio/virtio_ring.c
@@ -416,12 +416,17 @@ void virtqueue_destroy(struct virtqueue *vq, struct uk_alloc *a)
      uk_free(a, vrq);
  }
-int virtqueue_is_full(struct virtqueue *vq)
+__u16 virtqueue_avail(struct virtqueue *vq)
  {
      struct virtqueue_vring *vrq;
      UK_ASSERT(vq);
      vrq = to_virtqueue_vring(vq);
-    return (vrq->desc_avail == 0);
+    return vrq->desc_avail;
+}
+
+int virtqueue_is_full(struct virtqueue *vq)
+{
+    return (virtqueue_avail(vq) == 0);
  }



_______________________________________________
Minios-devel mailing list
Minios-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/minios-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.