[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[UNIKRAFT PATCH 2/5] lib/ukmpi: Initial port of FreeBSD's buf_ring.h



From: Alexander Jung <alexander.jung@xxxxxxxxx>

This commit introduces a Unikraft-centric implementation of
FreeBSD's buf_ring.h message passing ring buffer header.  Most of
the functionality is provided inline with this header.

Signed-off-by: Alexander Jung <alexander.jung@xxxxxxxxx>
---
 lib/ukmpi/include/uk/ring.h | 474 ++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 474 insertions(+)
 create mode 100644 lib/ukmpi/include/uk/ring.h

diff --git a/lib/ukmpi/include/uk/ring.h b/lib/ukmpi/include/uk/ring.h
new file mode 100644
index 0000000..6673244
--- /dev/null
+++ b/lib/ukmpi/include/uk/ring.h
@@ -0,0 +1,474 @@
+/* SPDX-License-Identifier: BSD-2-Clause-FreeBSD */
+/*
+ * Authors: Kip Macy <kmacy@xxxxxxxxxxx>
+ *          Alexander Jung <alexander.jung@xxxxxxxxx>
+ *
+ * Copyright (c) 2007-2009, Kip Macy <kmacy@xxxxxxxxxxx>
+ *               2020, NEC Laboratories Europe GmbH, NEC Corporation.
+ *               All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+/*
+ * Simple ring implementation to handle object references.
+ *
+ * Inspired by FreeBSD and modified (commit-id: c45cce1).
+ */
+#ifndef __UK_RING_H__
+#define __UK_RING_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+#include <errno.h>
+#include <uk/mutex.h>
+#include <uk/print.h>
+#include <uk/config.h>
+#include <uk/assert.h>
+#include <uk/plat/lcpu.h>
+#include <uk/arch/atomic.h>
+
+
+struct uk_ring {
+  volatile uint32_t  prod_head;
+  volatile uint32_t  prod_tail;  
+  int                prod_size;
+  int                prod_mask;
+  uint64_t           drops;
+  volatile uint32_t  cons_head;
+  volatile uint32_t  cons_tail;
+  int                cons_size;
+  int                cons_mask;
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  struct uk_mutex   *lock;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+  void              *ring[0];
+};
+
+
+/**
+ * Multi-producer safe lock-free ring buffer enqueue.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ * @param buf
+ *  Buffer size for the ring.
+ */
+static __inline int
+uk_ring_enqueue(struct uk_ring *br, void *buf)
+{
+  uint32_t prod_head, prod_next, cons_tail;
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  int i;
+  /*
+   * Note: It is possible to encounter an mbuf that was removed via drpeek(),
+   * and then re-added via drputback() and trigger a spurious panic.
+   */
+  for (i = br->cons_head; i != br->prod_head;
+       i = ((i + 1) & br->cons_mask))
+    if(br->ring[i] == buf)
+      uk_pr_crit("buf=%p already enqueue at %d prod=%d cons=%d\n",
+          buf, i, br->prod_tail, br->cons_tail);
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  ukplat_lcpu_disable_irq();
+
+  do {
+    prod_head = br->prod_head;
+    prod_next = (prod_head + 1) & br->prod_mask;
+    cons_tail = br->cons_tail;
+
+    if (prod_next == cons_tail) {
+      rmb();
+      if (prod_head == br->prod_head &&
+          cons_tail == br->cons_tail) {
+        br->drops++;
+        ukplat_lcpu_enable_irq();
+        return -ENOBUFS;
+      }
+      continue;
+    }
+  } while (!ukarch_compare_exchange_sync(&br->prod_head, prod_head, 
prod_next));
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  if (br->ring[prod_head] != NULL)
+    uk_pr_crit("dangling value in enqueue\n");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  br->ring[prod_head] = buf;
+
+  /*
+   * If there are other enqueues in progress that preceded us, we need to wait
+   * for them to complete.
+   */
+  /* TODO: Provide cpu_spinwait() */
+#if 0
+  while (br->prod_tail != prod_head)
+    cpu_spinwait();
+#endif
+
+  ukarch_store_n(&br->prod_tail, prod_next);
+  ukplat_lcpu_enable_irq();
+
+  return 0;
+}
+
+
+/**
+ * Multi-consumer safe dequeue.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_dequeue(struct uk_ring *br)
+{
+  void *buf;
+  uint32_t cons_head, cons_next;
+
+  ukplat_lcpu_disable_irq();
+
+  do {
+    cons_head = br->cons_head;
+    cons_next = (cons_head + 1) & br->cons_mask;
+
+    if (cons_head == br->prod_tail) {
+      ukplat_lcpu_enable_irq();
+      return (NULL);
+    }
+  } while (!ukarch_compare_exchange_sync(&br->cons_head, cons_head, 
cons_next));
+
+  buf = br->ring[cons_head];
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  br->ring[cons_head] = NULL;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  /*
+   * If there are other dequeues in progress that preceded us, we need to wait
+   * for them to complete.
+   */
+  /* TODO: Provide cpu_spinwait() */
+#if 0
+  while (br->cons_tail != cons_head)
+    cpu_spinwait();
+#endif
+
+  ukarch_store_n(&br->cons_tail, cons_next);
+
+  ukplat_lcpu_enable_irq();
+
+  return buf;
+}
+
+
+/**
+ * Single-consumer dequeue use where dequeue is protected by a lock e.g. a
+ * network driver's tx queue lock.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_dequeue_single(struct uk_ring *br)
+{
+  void *buf;
+  uint32_t cons_head, cons_next, prod_tail;
+
+#ifdef PREFETCH_DEFINED
+  uint32_t cons_next_next;
+#endif
+
+  /* TODO: Provide atomic_load_acq_32() */
+#if 0
+  /*
+   * This is a workaround to allow using uk_ring on ARM and ARM64.
+   *
+   * ARM64TODO: Fix uk_ring in a generic way.
+   *
+   * REMARKS: It is suspected that cons_head does not require
+   *   load_acq operation, but this change was extensively tested
+   *   and confirmed it's working. To be reviewed once again in
+   *   FreeBSD-12.
+   *
+   * Preventing following situation:
+   *
+   * | uk_ring_enqueue()                            | uk_ring_dequeue_single() 
                                  |
+   * 
|----------------------------------------------+-----------------------------------------------------|
+   * |                                              |                          
                           |
+   * |                                              | cons_head = 
br->cons_head;                          |
+   * | atomic_cmpset_acq_32(&br->prod_head, ...));  |                          
                           |
+   * |                                              | buf = 
br->ring[cons_head];   (see <1>)              |
+   * | br->ring[prod_head] = buf;                   |                          
                           |
+   * | atomic_store_rel_32(&br->prod_tail, ...);    |                          
                           |
+   * |                                              | prod_tail = 
br->prod_tail;                          |
+   * |                                              | if (cons_head == 
prod_tail)                         |
+   * |                                              |   return NULL;           
                           |
+   * |                                              | <condition is false and 
code uses invalid(old) buf> |
+   *
+   * <1> Load (on core 1) from br->ring[cons_head] can be reordered 
(speculative readed) by CPU.
+   */  
+#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
+  cons_head = atomic_load_acq_32(&br->cons_head);
+#else
+  cons_head = br->cons_head;
+#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */
+#else
+  cons_head = br->cons_head;
+#endif /* 0 */
+
+  // prod_tail = atomic_load_acq_32(&br->prod_tail);  
+  prod_tail = br->prod_tail;
+  cons_next = (cons_head + 1) & br->cons_mask;
+  
+  if (cons_head == prod_tail) 
+    return NULL;
+
+  br->cons_head = cons_next;
+  buf = br->ring[cons_head];
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  br->ring[cons_head] = NULL;
+
+  if (!uk_mutex_is_locked(br->lock))
+    uk_pr_crit("lock not held on single consumer dequeue\n");
+
+  if (br->cons_tail != cons_head)
+    uk_pr_crit("inconsistent list cons_tail=%d cons_head=%d\n",
+        br->cons_tail, cons_head);
+
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  br->cons_tail = cons_next;
+
+  return buf;
+}
+
+
+/**
+ * Single-consumer advance after a peek use where it is protected by a lock 
e.g.
+ * a network driver's tx queue lock.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline void
+uk_ring_advance_single(struct uk_ring *br)
+{
+  uint32_t cons_head, cons_next;
+  uint32_t prod_tail;
+
+  debug_ring(br);
+  
+  cons_head = br->cons_head;
+  prod_tail = br->prod_tail;
+  
+  cons_next = (cons_head + 1) & br->cons_mask;
+
+  if (cons_head == prod_tail) 
+    return;
+
+  br->cons_head = cons_next;
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  br->ring[cons_head] = NULL;
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  br->cons_tail = cons_next;
+}
+
+
+/**
+ * Used to return a buffer (most likely already there) to the top of the ring.
+ * The caller should *not* have used any dequeue to pull it out of the ring but
+ * instead should have used the peek() function.  This is normally used where
+ * the transmit queue of a driver is full, and an mbuf must be returned.  Most
+ * likely whats in the ring-buffer is what is being put back (since it was not
+ * removed), but sometimes the lower transmit function may have done a pullup 
or
+ * other function that will have changed it. As an optimization we always put 
it
+ * back (since jhb says the store is probably cheaper), if we have to do a
+ * multi-queue version we will need the compare and an atomic.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ * @param new
+ *  
+ */
+static __inline void
+uk_ring_putback_single(struct uk_ring *br, void *new)
+{
+  /* Buffer ring has none in putback */
+  UK_ASSERT(br->cons_head != br->prod_tail);
+  br->ring[br->cons_head] = new;
+}
+
+
+/**
+ * Return a pointer to the first entry in the ring without modifying it, or 
NULL
+ * if the ring is empty race-prone if not protected by a lock.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_peek(struct uk_ring *br)
+{
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  if ((br->lock != NULL) && !uk_mutex_is_locked(br->lock))
+    uk_pr_crit("lock not held on single consumer dequeue\n");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  /*
+   * It is safe to not have a memory barrier here because we control cons and
+   * tail is worst case a lagging indicator so we worst case we might return
+   * NULL immediately after a buffer has been enqueued.
+   */
+  if (br->cons_head == br->prod_tail)
+    return NULL;
+  
+  return br->ring[br->cons_head];
+}
+
+
+/**
+ * Single-consumer clear the pointer to the first entry of the ring, or NULL if
+ * the ring is empty.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline void *
+uk_ring_peek_clear_single(struct uk_ring *br)
+{
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  void *ret;
+
+  if (!mtx_owned(br->lock))
+    panic("lock not held on single consumer dequeue");
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+
+  if (br->cons_head == br->prod_tail)
+    return (NULL);
+
+  /* TODO: Provide atomic_thread_fence_acq */
+#if 0
+#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
+  /*
+   * The barrier is required there on ARM and ARM64 to ensure, that
+   * br->ring[br->cons_head] will not be fetched before the above condition is
+   * checked.  Without the barrier, it is possible, that buffer will be fetched
+   * before the enqueue will put mbuf into br, then, in the meantime, the
+   * enqueue will update the array and the prod_tail, and the conditional check
+   * will be true, so we will return previously fetched (and invalid) buffer.
+   */
+  atomic_thread_fence_acq();
+#endif /* defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64) */
+#endif /* 0 */
+
+#ifdef CONFIG_LIBUKMPI_RING_DEBUG
+  /*
+   * Single consumer, i.e. cons_head will not move while we are
+   * running, so atomic_swap_ptr() is not necessary here.
+   */
+  ret = br->ring[br->cons_head];
+  br->ring[br->cons_head] = NULL;
+  return (ret);
+#else
+  return (br->ring[br->cons_head]);
+#endif /* CONFIG_LIBUKMPI_RING_DEBUG */
+}
+
+
+/**
+ * Return whether the ring buffer is full.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline int
+uk_ring_full(struct uk_ring *br)
+{
+  return ((br->prod_head + 1) & br->prod_mask) == br->cons_tail;
+}
+
+
+/**
+ * Return whether the ring buffer is empty.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline int
+uk_ring_empty(struct uk_ring *br)
+{
+  return br->cons_head == br->prod_tail;
+}
+
+
+/**
+ * Return the queue size in the ring buffer.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ */
+static __inline int
+uk_ring_count(struct uk_ring *br)
+{
+  return (br->prod_size + br->prod_tail - br->cons_tail) & br->prod_mask;
+}
+
+
+/**
+ * Create a new ring buffer.
+ *
+ * @param count
+ *  The size of the ring buffer.
+ * @param a
+ *  The memory allocator to use when creating the ring buffer.
+ * @param flags
+ *  Additional flags to specify to the ring.
+ * @param lock
+ *  The mutex to use when debugging the ring buffer.
+ */
+struct uk_ring *
+uk_ring_alloc(int count, struct uk_alloc *a, int flags, struct uk_mutex *lock);
+
+
+/**
+ * Free the ring from use.
+ *
+ * @param br
+ *  Reference to the ring structure.
+ * @param a
+ *  The memory allocator to use when freeing the object.
+ */
+void
+uk_ring_free(struct uk_ring *br, struct uk_alloc *a);
+
+
+#endif /* __UK_RING_H__ */
-- 
2.11.0




 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.