[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Minios-devel] [UNIKRAFT PATCH] lib/uknetdev: Introduce receive buffer allocator callback
Hey Sharan
On 14.12.18 13:33, Sharan Santhanam wrote:
Hello Simon,
Please find the comment inline.
Thanks & Regards
Sharan
On 12/14/18 11:21 AM, Simon Kuenzer wrote:
Hey Sharan,
thanks a lot for your comments. Let me know what you think to my
replies ;-).
Thanks,
Simon
On 13.12.18 17:54, Sharan Santhanam wrote:
Hello Simon,
Please find my comments inline.
Thanks & Regards
Sharan
On 12/13/18 7:44 AM, Simon Kuenzer wrote:
Sorry, this patch is a bit long but I wanted to keep everything
functional working.
On 13.12.18 07:41, Simon Kuenzer wrote:
The idea of having a fill-up parameter on the receive function is
inconvenient because a user never knows how many receive buffers have
to be programmed before calling the receive function.
This commit introduces registering a callback when configuring a
receive queue. This callback has to be provided by the NETDEV API user
and is called by the driver whenever it programs new receive
buffers to
the receive queue. In order to still provide performance, this
allocation callback function is called for a batch of receive buffers.
This new mechanism replaces the fill-up parameter on the receive
function.
This commit also adopts virtio-net in order to comply with the API
change.
Signed-off-by: Simon Kuenzer <simon.kuenzer@xxxxxxxxx>
---
lib/uknetdev/include/uk/netdev.h | 41 +++----------
lib/uknetdev/include/uk/netdev_core.h | 26 +++++++-
lib/uknetdev/netdev.c | 1 +
plat/drivers/include/virtio/virtqueue.h | 9 +++
plat/drivers/virtio/virtio_net.c | 104
++++++++++++++++++++------------
plat/drivers/virtio/virtio_ring.c | 9 ++-
6 files changed, 114 insertions(+), 76 deletions(-)
diff --git a/lib/uknetdev/include/uk/netdev.h
b/lib/uknetdev/include/uk/netdev.h
index f0fa769..b5ce96d 100644
--- a/lib/uknetdev/include/uk/netdev.h
+++ b/lib/uknetdev/include/uk/netdev.h
@@ -419,9 +419,7 @@ static inline int
uk_netdev_rxq_intr_disable(struct uk_netdev *dev,
}
/**
- * Receive one packet and re-program used receive descriptor
- * Please note that before any packet can be received, the receive
queue
- * has to be filled up with empty netbufs (see fillup parameter).
+ * Receive one packet and re-program used receive descriptors
*
* @param dev
* The Unikraft Network Device.
@@ -431,52 +429,29 @@ static inline int
uk_netdev_rxq_intr_disable(struct uk_netdev *dev,
* to uk_netdev_configure().
* @param pkt
* Reference to netbuf pointer which will be point to the
received packet
- * after the function call. Can be NULL if function is used to
program
- * receive descriptors only.
- * @param fillup
- * Array of netbufs that should be used to program used
descriptors again.
- * Each of the netbuf should be freshly allocated/initialized
and not part
- * of any chain.
- * `fillup` can be `NULL` but without re-programming of used
descriptors no
- * new packets can be received at some point.
- * @param fillup_count
- * Length of `fillup` array. After the function call,
`fillup_count` returns
- * the number of left and unused netbufs on the array.
`fillup_count` has to
- * to 0 if `fillup` is `NULL`.
+ * after the function call. `pkt` has never to be `NULL`.
* @return
- * - (0): No packet available or `pkt` was set to NULL,
- * check `fillup_count` for used `fillup` netbufs
- * - (1): `pkt` points to received netbuf,
- * check `fillup_count` for used `fillup` netbufs
+ * - (0): No packet available
+ * - (1): `pkt` points to received netbuf
* - (2): `pkt` points to received netbuf but more received
packets are
* available on the receive queue. When interrupts are
used, they are
- * disabled until 1 is returned on subsequent calls,
- * check `fillup_count` for used `fillup` netbufs
+ * disabled until 1 is returned on subsequent calls
* - (<0): Error code from driver
*/
static inline int uk_netdev_rx_one(struct uk_netdev *dev,
uint16_t queue_id,
- struct uk_netbuf **pkt,
- struct uk_netbuf *fillup[],
- uint16_t *fillup_count)
+ struct uk_netbuf **pkt)
I like the idea of having a callback refill the user buffer but in
the previous API, we had a way of communicating with the user the
number of buffers refilled. In this API, we fail silently if the
buffer refill did not work.
The actual failure happens now within the `uk_netdev_alloc_rxpkts()`
callback which I think makes the receive API much easier to use.
I agree the callback provides a cleaner interface for filling up the
memory buffers.
This callback is provided by the user so it is the user who is aware
of the failure anyways.
I would prefer if we add a status parameter or a return code regarding
the error. Otherwise the user may have to set flag in the callback to
hand over information from the allocator and the receive function.
Hum, probably the easiest to use with the least overhead are return
codes as bit flags. I would add a flag that will return that a queue
underrun happened (less descriptors re-programmed for receiving packets
than slots available on the queue). Programs interested in it, could
look at this flags and react accordingly. Others would ignore it. What
do you think?
{
UK_ASSERT(dev);
UK_ASSERT(dev->rx_one);
UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
UK_ASSERT(dev->_data->state == UK_NETDEV_RUNNING);
UK_ASSERT(!PTRISERR(dev->_rx_queue[queue_id]));
- UK_ASSERT((!fillup && fillup_count) || fillup);
+ UK_ASSERT(pkt);
- return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt,
- fillup, fillup_count);
+ return dev->rx_one(dev, dev->_rx_queue[queue_id], pkt);
}
/**
- * Shortcut for only filling up a receive queue with empty netbufs
- */
-#define uk_netdev_rx_fillup(dev, queue_id, fillup, fillup_count) \
- uk_netdev_rx_one((dev), (queue_id), NULL, (fillup),
(fillup_count))
-
-/**
* Transmit one packet
*
* @param dev
diff --git a/lib/uknetdev/include/uk/netdev_core.h
b/lib/uknetdev/include/uk/netdev_core.h
index b77c45a..d30886d 100644
--- a/lib/uknetdev/include/uk/netdev_core.h
+++ b/lib/uknetdev/include/uk/netdev_core.h
@@ -176,6 +176,25 @@ typedef void (*uk_netdev_queue_event_t)(struct
uk_netdev *dev,
uint16_t queue_id, void *argp);
/**
+ * User callback used by the driver to allocate netbufs
+ * that are used to setup receive descriptors.
+ *
+ * @param argp
+ * User-provided argument.
+ * @param pkts
+ * Array for netbuf pointers that the function should allocate.
+ * @param count
+ * Number of netbufs requested (equal to length of pkts).
+ * @return
+ * Number of successful allocated netbufs,
+ * has to be in range [0, count].
+ * References to allocated packets are placed to
pkts[0]...pkts[count -1].
+ */
+typedef uint16_t (*uk_netdev_alloc_rxpkts)(void *argp,
+ struct uk_netbuf *pkts[],
+ uint16_t count);
+
Don't we want to return an error to the user in case buffer
allocation failed.
It is an interface doing batching. The API user is implementing the
function so it is up to the user to handle the error. I think the
driver shouldn't care too much about it because a likely error is that
receive buffer pools my be dimensioned too small to cope with incoming
packet rates. It would be even further painful to handle errors in the
driver other than just letting descriptors unprogrammed.
Because of this, I think it is best to give the driver what we could
successfully allocate. Actually the driver could guess that there was
something going wrong (e.g., out of memory) if the function returns
less netbufs than requested with `count`. `0` is also valid return
value (so no netbuf allocated) which ends up in no descriptor is
programmed.
I am not suggesting the driver to handle the driver. Our driver is the
user of the API and we must forward an error return from the allocator
to the user of the receive function. Maybe on returning '0' we report
back to user the allocator failed.
I would report back now as soon returned count is less then the
requested number of allocations.
+/**
* A structure used to configure an Unikraft network device RX
queue.
*/
struct uk_netdev_rxqueue_conf {
@@ -183,6 +202,9 @@ struct uk_netdev_rxqueue_conf {
void *callback_cookie; /**< Argument pointer for
callback. */
struct uk_alloc *a; /**< Allocator for
descriptors. */
+
+ uk_netdev_alloc_rxpkts alloc_rxpkts; /**< Allocator for rx
netbufs */
+ void *alloc_rxpkts_argp; /**< Argument for
alloc_rxpkts */
#ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
struct uk_sched *s; /**< Scheduler for
dispatcher. */
#endif
@@ -266,9 +288,7 @@ typedef int
(*uk_netdev_rxq_intr_disable_t)(struct uk_netdev *dev,
/** Driver callback type to retrieve one packet from a RX queue. */
typedef int (*uk_netdev_rx_one_t)(struct uk_netdev *dev,
struct uk_netdev_rx_queue *queue,
- struct uk_netbuf **pkt,
- struct uk_netbuf *fillup[],
- uint16_t *fillup_count);
+ struct uk_netbuf **pkt);
/** Driver callback type to submit one packet to a TX queue. */
typedef int (*uk_netdev_tx_one_t)(struct uk_netdev *dev,
diff --git a/lib/uknetdev/netdev.c b/lib/uknetdev/netdev.c
index 8f7dd6e..3b92f62 100644
--- a/lib/uknetdev/netdev.c
+++ b/lib/uknetdev/netdev.c
@@ -333,6 +333,7 @@ int uk_netdev_rxq_configure(struct uk_netdev
*dev, uint16_t queue_id,
UK_ASSERT(dev->ops->rxq_configure);
UK_ASSERT(queue_id < CONFIG_LIBUKNETDEV_MAXNBQUEUES);
UK_ASSERT(rx_conf);
+ UK_ASSERT(rx_conf->alloc_rxpkts);
#ifdef CONFIG_LIBUKNETDEV_DISPATCHERTHREADS
UK_ASSERT((rx_conf->callback && rx_conf->s)
|| !rx_conf->callback);
diff --git a/plat/drivers/include/virtio/virtqueue.h
b/plat/drivers/include/virtio/virtqueue.h
index 3d72c50..e8bfe29 100644
--- a/plat/drivers/include/virtio/virtqueue.h
+++ b/plat/drivers/include/virtio/virtqueue.h
@@ -184,6 +184,15 @@ struct virtqueue *virtqueue_create(__u16
queue_id, __u16 nr_descs, __u16 align,
struct virtio_dev *vdev, struct uk_alloc *a);
/**
+ * Return the number of available descriptors of a virtqueue
+ * @param vq
+ * A reference to the virtqueue.
+ * @return __uint16
+ * Number of available descriptors
+ */
+__u16 virtqueue_avail(struct virtqueue *vq);
+
+/**
* Check the virtqueue if full.
* @param vq
* A reference to the virtqueue.
diff --git a/plat/drivers/virtio/virtio_net.c
b/plat/drivers/virtio/virtio_net.c
index 75a6cd6..5c7162e 100644
--- a/plat/drivers/virtio/virtio_net.c
+++ b/plat/drivers/virtio/virtio_net.c
@@ -128,6 +128,9 @@ struct uk_netdev_rx_queue {
uint16_t nb_desc;
/* The flag to interrupt on the transmit queue */
uint8_t intr_enabled;
+ /* User-provided receive buffer allocation function */
+ uk_netdev_alloc_rxpkts alloc_rxpkts;
+ void *alloc_rxpkts_argp;
/* Reference to the uk_netdev */
struct uk_netdev *ndev;
/* The scatter list and its associated fragements */
@@ -197,9 +200,7 @@ static int virtio_netdev_xmit(struct uk_netdev
*dev,
struct uk_netbuf *pkt);
static int virtio_netdev_recv(struct uk_netdev *dev,
struct uk_netdev_rx_queue *queue,
- struct uk_netbuf **pkt,
- struct uk_netbuf *fillup[],
- uint16_t *fillup_count);
+ struct uk_netbuf **pkt);
static const struct uk_hwaddr *virtio_net_mac_get(struct
uk_netdev *n);
static __u16 virtio_net_mtu_get(struct uk_netdev *n);
static unsigned virtio_net_promisc_get(struct uk_netdev *n);
@@ -213,7 +214,7 @@ static int virtio_netdev_rxq_enqueue(struct
uk_netdev_rx_queue *rxq,
struct uk_netbuf *netbuf);
static int virtio_netdev_recv_done(struct virtqueue *vq, void
*priv);
static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
- struct uk_netbuf **netbuf, __u16 *count);
+ int notify);
/**
* Static global constants
@@ -262,39 +263,62 @@ static void virtio_netdev_xmit_free(struct
uk_netdev_tx_queue *txq)
uk_pr_debug("Free %"__PRIu16" descriptors\n", cnt);
}
-static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
- struct uk_netbuf **netbuf, __u16 *count)
+#define RX_FILLUP_BATCHLEN 64
+
+static int virtio_netdev_rx_fillup(struct uk_netdev_rx_queue *rxq,
int notify)
{
+ struct uk_netbuf *netbuf[RX_FILLUP_BATCHLEN];
int rc = 0;
- __u16 i = 0;
- __u16 cnt = 0;
+ __u16 i, j;
+ __u16 req;
+ __u16 cnt;
+ __u16 filled = 0;
/**
* Fixed amount of memory is allocated to each received
buffer. In
* our case since we don't support jumbo frame or LRO yet we
require
* that the buffer feed to the ring descriptor is atleast
* ethernet MTU + virtio net header.
+ * Because we using 2 descriptor for a single netbuf, our
effective
+ * queue size is just the half.
*/
- for (i = 0; i < *count; i++) {
- rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
- if (rc == -ENOSPC) {
- uk_pr_debug(
- "No more place available to add descriptors\n");
- rc = 0;
- break;
- } else if (unlikely(rc < 0)) {
- uk_pr_err("Failed to add a buffer to the virtqueue:
%d\n",
- rc);
- break;
+ while ((req = (virtqueue_avail(rxq->vq) / 2))) {
+ req = MIN(req, RX_FILLUP_BATCHLEN);
+ cnt = rxq->alloc_rxpkts(rxq->alloc_rxpkts_argp, netbuf, req);
+ for (i = 0; i < cnt; i++) {
+ uk_pr_debug("Enqueue netbuf %"PRIu16"/%"PRIu16" (%p)
to virtqueue %p...\n",
+ i + 1, cnt, netbuf[i], rxq);
+ rc = virtio_netdev_rxq_enqueue(rxq, netbuf[i]);
While enqueuing into the virtio-ring the API returns the number of
descriptor availble. The moment rc == 0, the queue is full.
As a suggestion we could also change "virtqueue_buffer_dequeue" to
return the number of descriptor available, thereby eliminating the
need for fetch the queue available.
int virtqueue_buffer_dequeue(struct virtqueue *vq, void **buf,
__u32 *len);
I agree, sounds right. I will look into this.
+ if (unlikely(rc < 0)) {
Do we want to report ring full as an error?
That should be now a case that shouldn't happen anymore because this
function is querying the number of available slots before asking for
allocation. It will exactly fit or less (in case we are in memory
pressure). If not, something really bad happened in the driver.
However, the general purpose error handling is also catching this kind
of failure. I don't think we need a separate error handling for this
anymore.
yeah seems reasonable
+ uk_pr_err("Failed to add a buffer to receive
virtqueue %p: %d\n",
+ rxq, rc);
+
+ /*
+ * Release netbufs that we are not going
+ * to use anymore
+ */
+ for (j = i; j < cnt; j++)
Should we free this memory or give it back to user? Wouldn't freeing
it up make it expensive?
Because we have a unlikely terrible situation, I think the performance
of the error clean-up is less important. The problem I see with your
suggestion is that we would need another callback to give the buffer
back to the user and that would bloat the API. However, the user is
actually able to hook into the free operation already: The destructor
callback of netbufs can be used for this.
agreed
+ uk_netbuf_free(netbuf[j]);
+ goto out;
+ }
+ filled++;
+ }
+
+ if (unlikely(cnt < req)) {
+ uk_pr_debug("Failed to complete fill-up of receive
virtqueue %p: Out of memory",
+ rxq);
+ goto out;
}
- cnt++;
}
- *count = *count - cnt;
+
+out:
+ uk_pr_debug("Programmed %"PRIu16" receive netbufs to receive
virtqueue %p\n",
+ filled, rxq);
/**
* Notify the host, when we submit new descriptor(s).
*/
- if (cnt)
+ if (notify && filled)
virtqueue_host_notify(rxq->vq);
return rc;
@@ -498,34 +522,29 @@ static int virtio_netdev_rxq_dequeue(struct
uk_netdev_rx_queue *rxq,
static int virtio_netdev_recv(struct uk_netdev *dev,
struct uk_netdev_rx_queue *queue,
- struct uk_netbuf **pkt,
- struct uk_netbuf *fillup[],
- uint16_t *fillup_count)
+ struct uk_netbuf **pkt)
{
int rc = 0;
int cnt = 0;
UK_ASSERT(dev && queue);
- UK_ASSERT(!fillup || (fillup && *fillup_count > 0));
+ UK_ASSERT(pkt);
- if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+ if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
virtqueue_intr_disable(queue->vq);
queue->intr_enabled &= ~(VTNET_INTR_EN);
}
- if (pkt) {
- rc = virtio_netdev_rxq_dequeue(queue, pkt);
- if (unlikely(rc < 0)) {
- uk_pr_err("Failed to dequeue the packet: %d\n", rc);
- goto err_exit;
- }
- cnt = rc;
+ rc = virtio_netdev_rxq_dequeue(queue, pkt);
+ if (unlikely(rc < 0)) {
+ uk_pr_err("Failed to dequeue the packet: %d\n", rc);
+ goto err_exit;
}
- if (fillup)
- virtio_netdev_rx_fillup(queue, fillup, fillup_count);
+ cnt = rc;
+ virtio_netdev_rx_fillup(queue, 1);
/* Enable interrupt only when user had previously enabled it */
- if (pkt && (queue->intr_enabled & VTNET_INTR_USR_EN_MASK)) {
+ if (queue->intr_enabled & VTNET_INTR_USR_EN_MASK) {
/* Need to enable the interrupt on the last packet */
rc = virtqueue_intr_enable(queue->vq);
if (rc == 1 && cnt == 0) {
@@ -542,11 +561,13 @@ static int virtio_netdev_recv(struct
uk_netdev *dev,
/* Need to enable the interrupt on the last packet */
rc = virtqueue_intr_enable(queue->vq);
cnt = (rc == 1) ? 2 : 1;
+ /* Since we received something, we need to fillup */
+ virtio_netdev_rx_fillup(queue, 1);
} else if (cnt > 0) {
/* When there is packet in the buffer */
cnt = (rc == 1) ? 2 : 1;
}
- } else if (pkt && cnt > 0) {
+ } else if (cnt > 0) {
/**
* For polling case, we report always there are further
* packets unless the queue is empty.
@@ -573,6 +594,8 @@ static struct uk_netdev_rx_queue
*virtio_netdev_rx_queue_setup(
UK_ASSERT(n);
UK_ASSERT(conf);
+ UK_ASSERT(conf->alloc_rxpkts);
+
vndev = to_virtionetdev(n);
if (queue_id >= vndev->max_vqueue_pairs) {
uk_pr_err("Invalid virtqueue identifier: %"__PRIu16"\n",
@@ -589,6 +612,11 @@ static struct uk_netdev_rx_queue
*virtio_netdev_rx_queue_setup(
goto err_exit;
}
rxq = &vndev->rxqs[rc];
+ rxq->alloc_rxpkts = conf->alloc_rxpkts;
+ rxq->alloc_rxpkts_argp = conf->alloc_rxpkts_argp;
+
+ /* Allocate receive buffers for this queue */
+ virtio_netdev_rx_fillup(rxq, 0);
exit:
return rxq;
diff --git a/plat/drivers/virtio/virtio_ring.c
b/plat/drivers/virtio/virtio_ring.c
index 02d568a..5eaa7e7 100644
--- a/plat/drivers/virtio/virtio_ring.c
+++ b/plat/drivers/virtio/virtio_ring.c
@@ -416,12 +416,17 @@ void virtqueue_destroy(struct virtqueue *vq,
struct uk_alloc *a)
uk_free(a, vrq);
}
-int virtqueue_is_full(struct virtqueue *vq)
+__u16 virtqueue_avail(struct virtqueue *vq)
{
struct virtqueue_vring *vrq;
UK_ASSERT(vq);
vrq = to_virtqueue_vring(vq);
- return (vrq->desc_avail == 0);
+ return vrq->desc_avail;
+}
+
+int virtqueue_is_full(struct virtqueue *vq)
+{
+ return (virtqueue_avail(vq) == 0);
}
_______________________________________________
Minios-devel mailing list
Minios-devel@xxxxxxxxxxxxxxxxxxxx
https://lists.xenproject.org/mailman/listinfo/minios-devel
|