[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [UNIKRAFT PATCH, v2, 03/15] lib/ukring: Adapt {ring.c, ring.h} to Unikraft



Hi Alexander,

Please see inline.

On 7/21/20 6:39 PM, Alexander Jung wrote:
> * Includes Unikraft-centric headers and types; and,
> * Disables ARM{32,64}-specific atomic actions as unsupported;
> 
> Signed-off-by: Alexander Jung <alexander.jung@xxxxxxxxx>
> ---
>  lib/ukring/include/uk/ring.h | 81 +++++++++++++++++++-----------------
>  lib/ukring/ring.c            | 39 ++++++++++-------
>  2 files changed, 66 insertions(+), 54 deletions(-)
> 
> diff --git a/lib/ukring/include/uk/ring.h b/lib/ukring/include/uk/ring.h
> index 4f0d80c..021331a 100644
> --- a/lib/ukring/include/uk/ring.h
> +++ b/lib/ukring/include/uk/ring.h
> @@ -32,13 +32,13 @@
>  #ifndef  _SYS_BUF_RING_H_
>  #define  _SYS_BUF_RING_H_
>  
> -#include <machine/cpu.h>
> -
> -#ifdef DEBUG_BUFRING
> -#include <sys/lock.h>
> -#include <sys/mutex.h>
> -#endif
> -
> +#include <errno.h>
> +#include <uk/mutex.h>
> +#include <uk/print.h>
> +#include <uk/config.h>
> +#include <uk/assert.h>
> +#include <uk/plat/lcpu.h>
> +#include <uk/arch/atomic.h>
>  
>  struct buf_ring {
>    volatile uint32_t  br_prod_head;
> @@ -51,7 +51,7 @@ struct buf_ring {
>    int                br_cons_size;
>    int                br_cons_mask;
>  #ifdef DEBUG_BUFRING
> -  struct mtx        *br_lock;
> +  struct uk_mutex   *br_lock;
>  #endif
>    void              *br_ring[0] __aligned(CACHE_LINE_SIZE);
>  };
> @@ -72,16 +72,16 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
>    /*
>     * Note: It is possible to encounter an mbuf that was removed
>     * via drbr_peek(), and then re-added via drbr_putback() and
> -   * trigger a spurious panic.
> +   * trigger spurious critical messages.

A rule of thumb is that when we import code from elsewhere we try to
change it as little as possible. Given that we should actually trigger a
panic, I thing it would be better to leave this as it was.

>     */
>    for (i = br->br_cons_head; i != br->br_prod_head;
>         i = ((i + 1) & br->br_cons_mask))
>      if (br->br_ring[i] == buf)
> -      panic("buf=%p already enqueue at %d prod=%d cons=%d",
> +      uk_pr_crit("buf=%p already enqueue at %d prod=%d cons=%d\n",
>            buf, i, br->br_prod_tail, br->br_cons_tail);

The 'panic' is a wide spread approach in OS and embedded development.
When we encounter some kind of catastrophic event or context we should
output a message and stop/abort/exit/crash. So, it's not ok to just
print a message and continue running. In unikraft the equivalent for
panic is UK_CRASH().

>  #endif
>  
> -  critical_enter();
> +  ukplat_lcpu_disable_irq();

Actually this is not fine. critical_enter() disables preemption and
calls a memory barrier, not disabling interrupt. We have the same
abstraction in Linux kernel, called preempt_disable() [1], doing the
same thing. If there is no preemption, as it is the case for unikraft
now, preempt_disable() is the same as barrier() [2].

So please define these 2 abstractions in some header under the global
`include/uk` and use them here. Personally I find the `preempt_disable`
name more suggestive, but it's up to you what name you choose. You can
even keep the critical_enter name as a wrapper name used only by code
imported from FreeBSD (e.g. #ifdef FREEBSD_CODE \ #define
critical_enter() preempt_disable()).

[1]
https://elixir.bootlin.com/linux/latest/source/include/linux/preempt.h#L169
[2]
https://elixir.bootlin.com/linux/latest/source/include/linux/preempt.h#L242

>  
>    do {
>      prod_head = br->br_prod_head;
> @@ -92,16 +92,16 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
>        rmb();
>        if (prod_head == br->br_prod_head && cons_tail == br->br_cons_tail) {
>          br->br_drops++;
> -        critical_exit();
> +        ukplat_lcpu_enable_irq();
>          return ENOBUFS;
>        }
>        continue;
>      }
> -  } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
> +  } while (!ukarch_compare_exchange_sync(&br->br_prod_head, prod_head, 
> prod_next));
>  
>  #ifdef DEBUG_BUFRING
>    if (br->br_ring[prod_head] != NULL)
> -    panic("dangling value in enqueue");
> +    uk_pr_crit("dangling value in enqueue\n");

Panic please.

>  #endif
>  
>    br->br_ring[prod_head] = buf;
> @@ -114,8 +114,8 @@ buf_ring_enqueue(struct buf_ring *br, void *buf)
>    while (br->br_prod_tail != prod_head)
>      cpu_spinwait();
>  
> -  atomic_store_rel_int(&br->br_prod_tail, prod_next);
> -  critical_exit();
> +  ukarch_store_n(&br->br_prod_tail, prod_next);
> +  ukplat_lcpu_enable_irq();
>  
>    return 0;
>  }
> @@ -130,17 +130,17 @@ buf_ring_dequeue_mc(struct buf_ring *br)
>    uint32_t cons_head, cons_next;
>    void *buf;
>  
> -  critical_enter();
> +  ukplat_lcpu_disable_irq();
>  
>    do {
>      cons_head = br->br_cons_head;
>      cons_next = (cons_head + 1) & br->br_cons_mask;
>  
>      if (cons_head == br->br_prod_tail) {
> -      critical_exit();
> +      ukplat_lcpu_enable_irq();
>        return NULL;
>      }
> -  } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
> +  } while (!ukarch_compare_exchange_sync(&br->br_cons_head, cons_head, 
> cons_next));
>  
>    buf = br->br_ring[cons_head];
>  
> @@ -156,8 +156,8 @@ buf_ring_dequeue_mc(struct buf_ring *br)
>    while (br->br_cons_tail != cons_head)
>      cpu_spinwait();
>  
> -  atomic_store_rel_int(&br->br_cons_tail, cons_next);
> -  critical_exit();
> +  ukarch_store_n(&br->br_cons_tail, cons_next);
> +  ukplat_lcpu_enable_irq();
>  
>    return buf;
>  }
> @@ -201,12 +201,16 @@ buf_ring_dequeue_sc(struct buf_ring *br)
>     *
>     * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered 
> (speculative readed) by CPU.
>     */
> -#if defined(__arm__) || defined(__aarch64__)
> -  cons_head = atomic_load_acq_32(&br->br_cons_head);
> +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
> +  /* TODO: Provide atomic_load_acq_32() */
> +  /* cons_head = atomic_load_acq_32(&br->br_cons_head); */
> +  cons_head = &br->br_cons_head;

An approach we used before was to insert an #error message in order to
force the developer to fix the issue if he wanted to use for an
unsupported architecture. Otherwise we will forget for sure to do this
and/or we will lose a lot of time debugging the thing.

>  #else
>    cons_head = br->br_cons_head;
>  #endif
> -  prod_tail = atomic_load_acq_32(&br->br_prod_tail);
> +  /* TODO: Provide atomic_load_acq_32() */
> +  /* prod_tail = atomic_load_acq_32(&br->br_prod_tail); */
> +  prod_tail = &br->br_prod_tail;

But actually we do have atomic operations defined for these ones, it's
ukarch_load_n().

>  
>    cons_next = (cons_head + 1) & br->br_cons_mask;
>  #ifdef PREFETCH_DEFINED
> @@ -230,11 +234,11 @@ buf_ring_dequeue_sc(struct buf_ring *br)
>  #ifdef DEBUG_BUFRING
>    br->br_ring[cons_head] = NULL;
>  
> -  if (!mtx_owned(br->br_lock))
> -    panic("lock not held on single consumer dequeue");
> +  if (!uk_mutex_is_locked(r->lock))
> +    uk_pr_crit("lock not held on single consumer dequeue\n");
>  
>    if (br->br_cons_tail != cons_head)
> -    panic("inconsistent list cons_tail=%d cons_head=%d",
> +    uk_pr_crit("inconsistent list cons_tail=%d cons_head=%d\n",
>          br->br_cons_tail, cons_head);

Panic please.

>  #endif
>  
> @@ -288,8 +292,8 @@ buf_ring_advance_sc(struct buf_ring *br)
>  static __inline void
>  buf_ring_putback_sc(struct buf_ring *br, void *new)
>  {
> -  KASSERT(br->br_cons_head != br->br_prod_tail,
> -    ("Buf-Ring has none in putback"));
> +  /* Buffer ring has none in putback */
> +  UK_ASSERT(br->br_cons_head != br->br_prod_tail);
>    br->br_ring[br->br_cons_head] = new;
>  }
>  
> @@ -302,8 +306,8 @@ static __inline void *
>  buf_ring_peek(struct buf_ring *br)
>  {
>  #ifdef DEBUG_BUFRING
> -  if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
> -    panic("lock not held on single consumer dequeue");
> +  if ((r->lock != NULL) && !uk_mutex_is_locked(r->lock))
> +    uk_pr_crit("lock not held on single consumer dequeue\n");

Panic please.

>  #endif
>  
>    /*
> @@ -324,14 +328,14 @@ buf_ring_peek_clear_sc(struct buf_ring *br)
>  #ifdef DEBUG_BUFRING
>    void *ret;
>  
> -  if (!mtx_owned(br->br_lock))
> -    panic("lock not held on single consumer dequeue");
> +  if (!uk_mutex_is_locked(r->lock))
> +    uk_pr_crit("lock not held on single consumer dequeue\n");

Panic please.

>  #endif
>  
>    if (br->br_cons_head == br->br_prod_tail)
>      return NULL;
>  
> -#if defined(__arm__) || defined(__aarch64__)
> +#if defined(CONFIG_ARCH_ARM_32) || defined(CONFIG_ARCH_ARM_64)
>    /*
>     * The barrier is required there on ARM and ARM64 to ensure, that
>     * br->br_ring[br->br_cons_head] will not be fetched before the above
> @@ -342,7 +346,8 @@ buf_ring_peek_clear_sc(struct buf_ring *br)
>     * conditional check will be true, so we will return previously fetched
>     * (and invalid) buffer.
>     */
> -  atomic_thread_fence_acq();
> +  /* TODO: Provide atomic_thread_fence_acq(); */
> +  /* atomic_thread_fence_acq(); */

Add an #error here please.

>  #endif
>  
>  #ifdef DEBUG_BUFRING
> @@ -378,8 +383,8 @@ buf_ring_count(struct buf_ring *br)
>        & br->br_prod_mask;
>  }
>  
> -struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int 
> flags,
> -    struct mtx *);
> -void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
> +struct buf_ring *buf_ring_alloc(struct uk_alloc *a, int count, int flags,
> +    struct uk_mutex *);

Actually I think it would be better to keep the original params order.
More than that, `struct uk_alloc *a` and `struct malloc_type *type` are
kind of semantically equivalent. Same for buf_ring_free().

> +void buf_ring_free(struct uk_alloc *a, struct buf_ring *br);
>  
>  #endif
> \ No newline at end of file
> diff --git a/lib/ukring/ring.c b/lib/ukring/ring.c
> index ed5e8f7..3f3e053 100644
> --- a/lib/ukring/ring.c
> +++ b/lib/ukring/ring.c
> @@ -26,28 +26,35 @@
>   * SUCH DAMAGE.
>   */
>  
> -#include <sys/cdefs.h>
> -__FBSDID("$FreeBSD$");
> -
>  #include <sys/param.h>
> -#include <sys/systm.h>
> -#include <sys/kernel.h>
> -#include <sys/malloc.h>
> -#include <sys/ktr.h>
> -#include <sys/buf_ring.h>
> +#include <uk/ring.h>
> +#include <uk/assert.h>
> +#include <uk/alloc.h>
> +#include <uk/mutex.h>
> +#include <uk/config.h>
> +#include <uk/print.h>
> +
> +#ifndef POWER_OF_2
> +#define POWER_OF_2(x)   (((x)) && (!((x) & ((x) - 1))))
> +#endif

We already have POWER_OF_2() in essentials.h.

>  
> -struct buf_ring *
> -buf_ring_alloc(int count, struct malloc_type *type, int flags, struct mtx 
> *lock)
> +struct uk_ring *
> +uk_ring_alloc(struct uk_alloc *a, int count, int flags, struct uk_mutex 
> *lock)
>  {
>    struct buf_ring *br;
>  
> -  KASSERT(powerof2(count), ("buf ring must be size power of 2"));
> +  if (a == NULL)
> +    a = uk_alloc_get_default();
> +
> +  /* Buf ring must be size power of 2 */
> +  UK_ASSERT(POWER_OF_2(count));
>  
> -  br = malloc(sizeof(struct buf_ring) + count*sizeof(caddr_t),
> -      type, flags|M_ZERO);
> +  r = uk_malloc(a, sizeof(struct uk_ring) + count * sizeof(caddr_t));
>  
> -  if (br == NULL)
> +  if (br == NULL) {
> +    uk_pr_err("Could not allocate ring: out of memory\n");

I keep seeing this 'out of memory' addition. I think it's redundant,
what other reason might be there for allocation failure?

>      return NULL;
> +  }
>  
>  #ifdef DEBUG_BUFRING
>    br->br_lock = lock;
> @@ -62,7 +69,7 @@ buf_ring_alloc(int count, struct malloc_type *type, int 
> flags, struct mtx *lock)
>  }
>  
>  void
> -buf_ring_free(struct buf_ring *br, struct malloc_type *type)
> +uk_ring_free(struct uk_alloc *a, struct uk_ring *r)
>  {
> -  free(br, type);
> +  uk_free(a, br);
>  }
> \ No newline at end of file
> 



 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.