[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 09/22] Add a very basic netchannel2 implementation.



This is functional, in the sense that packets can be sent and
received, but lacks any advanced features.

Signed-off-by: Steven Smith <steven.smith@xxxxxxxxxx>
---
 drivers/xen/Kconfig                            |   24 +
 drivers/xen/Makefile                           |    1 +
 drivers/xen/netchannel2/Makefile               |   12 +
 drivers/xen/netchannel2/chan.c                 |  659 ++++++++++++++++++++++++
 drivers/xen/netchannel2/netback2.c             |  354 +++++++++++++
 drivers/xen/netchannel2/netchan2.c             |   32 ++
 drivers/xen/netchannel2/netchannel2_core.h     |  351 +++++++++++++
 drivers/xen/netchannel2/netchannel2_endpoint.h |   63 +++
 drivers/xen/netchannel2/netfront2.c            |  488 ++++++++++++++++++
 drivers/xen/netchannel2/recv_packet.c          |  216 ++++++++
 drivers/xen/netchannel2/rscb.c                 |  385 ++++++++++++++
 drivers/xen/netchannel2/util.c                 |  230 +++++++++
 drivers/xen/netchannel2/xmit_packet.c          |  318 ++++++++++++
 include/xen/interface/io/netchannel2.h         |  106 ++++
 include/xen/interface/io/uring.h               |  426 +++++++++++++++
 15 files changed, 3665 insertions(+), 0 deletions(-)
 create mode 100644 drivers/xen/netchannel2/Makefile
 create mode 100644 drivers/xen/netchannel2/chan.c
 create mode 100644 drivers/xen/netchannel2/netback2.c
 create mode 100644 drivers/xen/netchannel2/netchan2.c
 create mode 100644 drivers/xen/netchannel2/netchannel2_core.h
 create mode 100644 drivers/xen/netchannel2/netchannel2_endpoint.h
 create mode 100644 drivers/xen/netchannel2/netfront2.c
 create mode 100644 drivers/xen/netchannel2/recv_packet.c
 create mode 100644 drivers/xen/netchannel2/rscb.c
 create mode 100644 drivers/xen/netchannel2/util.c
 create mode 100644 drivers/xen/netchannel2/xmit_packet.c
 create mode 100644 include/xen/interface/io/netchannel2.h
 create mode 100644 include/xen/interface/io/uring.h

diff --git a/drivers/xen/Kconfig b/drivers/xen/Kconfig
index ed4b89b..a081b73 100644
--- a/drivers/xen/Kconfig
+++ b/drivers/xen/Kconfig
@@ -210,6 +210,30 @@ config XEN_SCSI_FRONTEND
          The SCSI frontend driver allows the kernel to access SCSI Devices
          within another guest OS.
 
+config XEN_NETCHANNEL2
+       tristate "Net channel 2 support"
+       depends on XEN && NET
+       default y
+       help
+         Xen netchannel2 driver support.  This allows a domain to act as
+         either the backend or frontend part of a netchannel2 connection.
+         Unless you are building a dedicated device-driver domain, you
+         almost certainly want to say Y here.
+
+         If you say Y or M here, you should also say Y to one or both of
+         ``Net channel2 backend support'' and ``Net channel2 frontend
+         support'', below.
+
+config XEN_NETDEV2_BACKEND
+       bool "Net channel 2 backend support"
+       depends on XEN_BACKEND && XEN_NETCHANNEL2
+       default XEN_BACKEND
+
+config XEN_NETDEV2_FRONTEND
+       bool "Net channel 2 frontend support"
+       depends on XEN_NETCHANNEL2
+       default y
+
 config XEN_GRANT_DEV
        tristate "User-space granted page access driver"
        default XEN_PRIVILEGED_GUEST
diff --git a/drivers/xen/Makefile b/drivers/xen/Makefile
index 873e5a3..68eb231 100644
--- a/drivers/xen/Makefile
+++ b/drivers/xen/Makefile
@@ -30,4 +30,5 @@ obj-$(CONFIG_XEN_GRANT_DEV)   += gntdev/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_UTIL)                += sfc_netutil/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_FRONTEND)    += sfc_netfront/
 obj-$(CONFIG_XEN_NETDEV_ACCEL_SFC_BACKEND)     += sfc_netback/
+obj-$(CONFIG_XEN_NETCHANNEL2)          += netchannel2/
 obj-$(CONFIG_XEN_ACPI_WMI_WRAPPER)              += acpi-wmi/
diff --git a/drivers/xen/netchannel2/Makefile b/drivers/xen/netchannel2/Makefile
new file mode 100644
index 0000000..bdad6da
--- /dev/null
+++ b/drivers/xen/netchannel2/Makefile
@@ -0,0 +1,12 @@
+obj-$(CONFIG_XEN_NETCHANNEL2) += netchannel2.o
+
+netchannel2-objs := chan.o netchan2.o rscb.o util.o \
+       xmit_packet.o recv_packet.o
+
+ifeq ($(CONFIG_XEN_NETDEV2_BACKEND),y)
+netchannel2-objs += netback2.o
+endif
+
+ifeq ($(CONFIG_XEN_NETDEV2_FRONTEND),y)
+netchannel2-objs += netfront2.o
+endif
diff --git a/drivers/xen/netchannel2/chan.c b/drivers/xen/netchannel2/chan.c
new file mode 100644
index 0000000..e3ad981
--- /dev/null
+++ b/drivers/xen/netchannel2/chan.c
@@ -0,0 +1,659 @@
+#include <linux/kernel.h>
+#include <linux/kthread.h>
+#include <linux/gfp.h>
+#include <linux/etherdevice.h>
+#include <linux/interrupt.h>
+#include <linux/netdevice.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/delay.h>
+#include <linux/version.h>
+#include <xen/evtchn.h>
+#include <xen/xenbus.h>
+
+#include "netchannel2_endpoint.h"
+#include "netchannel2_core.h"
+
+static int process_ring(struct napi_struct *napi,
+                       int work_avail);
+
+static irqreturn_t nc2_int(int irq, void *dev_id)
+{
+       struct netchannel2_ring_pair *ncr = dev_id;
+
+       if (ncr->irq == -1)
+               return IRQ_HANDLED;
+       if (ncr->cons_ring.sring->prod != ncr->cons_ring.cons_pvt ||
+           ncr->interface->is_stopped)
+               nc2_kick(ncr);
+       return IRQ_HANDLED;
+}
+
+/* Process all incoming messages.  The function is given an
+   IRQ-disabled reference for the interface, and must dispose of it
+   (either by enabling the IRQ or re-introducing it to the pending
+   list).  Alternatively, the function can stop the ring being
+   processed again by leaking the reference (e.g. when the remote
+   endpoint is misbehaving). */
+/* Returns -1 if we used all the available work without finishing, or
+   the amount of work used otherwise. */
+static int process_messages(struct netchannel2_ring_pair *ncrp,
+                           int work_avail,
+                           struct sk_buff_head *pending_rx_queue)
+{
+       struct netchannel2_msg_hdr hdr;
+       RING_IDX prod;
+       struct netchannel2 *nc = ncrp->interface;
+       int work_done;
+
+       work_done = 1;
+
+retry:
+       prod = ncrp->cons_ring.sring->prod;
+       rmb();
+       while (work_done < work_avail &&
+              prod != ncrp->cons_ring.cons_pvt) {
+               nc2_copy_from_ring(&ncrp->cons_ring, &hdr, sizeof(hdr));
+               if (hdr.size < sizeof(hdr)) {
+                       printk(KERN_WARNING "Other end sent too-small message 
(%d)\n",
+                              hdr.size);
+                       goto done;
+               }
+               if (hdr.size > ncrp->cons_ring.payload_bytes) {
+                       /* This one message is bigger than the whole
+                          ring -> other end is clearly misbehaving.
+                          We won't take any more messages from this
+                          ring. */
+                       printk(KERN_WARNING "Other end sent enormous message 
(%d > %zd)\n",
+                              hdr.size,
+                              ncrp->cons_ring.payload_bytes);
+                       goto done;
+               }
+
+               switch (hdr.type) {
+               case NETCHANNEL2_MSG_SET_MAX_PACKETS:
+                       nc2_handle_set_max_packets_msg(ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_PACKET:
+                       nc2_handle_packet_msg(nc, ncrp, &hdr,
+                                             pending_rx_queue);
+                       break;
+               case NETCHANNEL2_MSG_FINISH_PACKET:
+                       nc2_handle_finish_packet_msg(nc, ncrp, &hdr);
+                       break;
+               case NETCHANNEL2_MSG_PAD:
+                       break;
+               default:
+                       /* Drop bad messages.  We should arguably stop
+                          processing the ring at this point, because
+                          the ring is probably corrupt.  However, if
+                          it is corrupt then one of the other checks
+                          will hit soon enough, and doing it this way
+                          should make it a bit easier to add new
+                          message types in future. */
+                       pr_debug("Bad message type %d from peer!\n",
+                                hdr.type);
+                       break;
+               }
+               hdr.size = (hdr.size + 7) & ~7;
+               ncrp->cons_ring.cons_pvt += hdr.size;
+
+               work_done++;
+               if (work_done == work_avail)
+                       return -1;
+       }
+
+       if (unlikely(prod != ncrp->cons_ring.sring->prod))
+               goto retry;
+
+       /* Dispose of our IRQ-disable reference. */
+done:
+       napi_complete(&ncrp->napi);
+       enable_irq(ncrp->irq);
+
+       if (nc2_final_check_for_messages(&ncrp->cons_ring,
+                                        prod)) {
+               /* More work to do still. */
+               nc2_kick(ncrp);
+       }
+
+       return work_done;
+}
+
+/* Flush out all pending metadata messages on ring @ncrp, and then
+   update the ring pointers to indicate that we've done so.  Fire the
+   event channel if necessary. */
+static void flush_rings(struct netchannel2_ring_pair *ncrp)
+{
+       int need_kick;
+
+       flush_hypercall_batcher(&ncrp->pending_rx_hypercalls,
+                               nc2_rscb_on_gntcopy_fail);
+       send_finish_packet_messages(ncrp);
+       if (ncrp->need_advertise_max_packets)
+               advertise_max_packets(ncrp);
+
+       need_kick = 0;
+       if (nc2_finish_messages(&ncrp->cons_ring)) {
+               need_kick = 1;
+               /* If we need an event on the consumer ring, we always
+                  need to notify the other end, even if we don't have
+                  any messages which would normally be considered
+                  urgent. */
+               ncrp->pending_time_sensitive_messages = 1;
+       }
+       if (nc2_flush_ring(&ncrp->prod_ring))
+               need_kick = 1;
+       if (need_kick ||
+           (ncrp->delayed_kick && ncrp->pending_time_sensitive_messages)) {
+               if (ncrp->pending_time_sensitive_messages) {
+                       notify_remote_via_irq(ncrp->irq);
+                       ncrp->delayed_kick = 0;
+               } else {
+                       ncrp->delayed_kick = 1;
+               }
+               ncrp->pending_time_sensitive_messages = 0;
+       }
+}
+
+/* Process incoming messages, and then flush outgoing metadata
+ * messages.  We also try to unjam the xmit queue if any of the
+ * incoming messages would give us permission to send more stuff. */
+/* This is given an IRQ-disable reference, and must dispose of it. */
+static int nc2_poll(struct netchannel2_ring_pair *ncrp, int work_avail,
+                   struct sk_buff_head *rx_queue)
+{
+       int work_done;
+
+       if (!ncrp->is_attached) {
+               napi_complete(&ncrp->napi);
+               enable_irq(ncrp->irq);
+               return 0;
+       }
+
+       work_done = process_messages(ncrp, work_avail, rx_queue);
+
+       flush_rings(ncrp);
+
+       if (work_done < 0)
+               return work_avail;
+       else
+               return work_done;
+}
+
+/* Like skb_queue_purge(), but use release_tx_packet() rather than
+   kfree_skb() */
+void nc2_queue_purge(struct netchannel2_ring_pair *ncrp,
+                    struct sk_buff_head *queue)
+{
+       struct sk_buff *skb;
+
+       while (!skb_queue_empty(queue)) {
+               skb = skb_dequeue(queue);
+               release_tx_packet(ncrp, skb);
+       }
+}
+
+/* struct net_device stop() method. */
+static int nc2_stop(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       spin_lock_bh(&nc->rings.lock);
+       nc->stats.tx_dropped += skb_queue_len(&nc->pending_skbs);
+       nc2_queue_purge(&nc->rings, &nc->pending_skbs);
+       spin_unlock_bh(&nc->rings.lock);
+
+       return 0;
+}
+
+/* Kick a netchannel2 interface so that the poll() method runs
+ * soon. */
+/* This has semi release-like semantics, so you can set flags
+   lock-free and be guaranteed that the poll() method will eventually
+   run and see the flag set, without doing any explicit locking. */
+void nc2_kick(struct netchannel2_ring_pair *ncrp)
+{
+       if (napi_schedule_prep(&ncrp->napi)) {
+               disable_irq_nosync(ncrp->irq);
+               __napi_schedule(&ncrp->napi);
+       }
+}
+
+static int nc2_open(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       nc2_kick(&nc->rings);
+       return 0;
+}
+
+/* Rad a mac address from an address in xenstore at @prefix/@node.
+ * Call not holding locks.  Returns 0 on success or <0 on error. */
+static int read_mac_address(const char *prefix, const char *node,
+                           unsigned char *addr)
+{
+       int err;
+       unsigned mac[6];
+       int i;
+
+       err = xenbus_scanf(XBT_NIL, prefix, node,
+                          "%x:%x:%x:%x:%x:%x",
+                          &mac[0],
+                          &mac[1],
+                          &mac[2],
+                          &mac[3],
+                          &mac[4],
+                          &mac[5]);
+       if (err < 0)
+               return err;
+       if (err != 6)
+               return -EINVAL;
+       for (i = 0; i < 6; i++) {
+               if (mac[i] >= 0x100)
+                       return -EINVAL;
+               addr[i] = mac[i];
+       }
+       return 0;
+}
+
+/* Release resources associated with a ring pair.  It is assumed that
+   the ring pair has already been detached (which stops the IRQ and
+   un-pends the ring). */
+void cleanup_ring_pair(struct netchannel2_ring_pair *ncrp)
+{
+       BUG_ON(ncrp->prod_ring.sring);
+       BUG_ON(ncrp->cons_ring.sring);
+
+       drop_pending_tx_packets(ncrp);
+       nc2_queue_purge(ncrp, &ncrp->release_on_flush_batcher);
+       if (ncrp->gref_pool != 0)
+               gnttab_free_grant_references(ncrp->gref_pool);
+       netif_napi_del(&ncrp->napi);
+}
+
+int init_ring_pair(struct netchannel2_ring_pair *ncrp,
+                  struct netchannel2 *nc)
+{
+       unsigned x;
+
+       ncrp->interface = nc;
+       spin_lock_init(&ncrp->lock);
+       ncrp->irq = -1;
+
+       for (x = 0; x < NR_TX_PACKETS - 1; x++)
+               txp_set_next_free(ncrp->tx_packets + x, x + 1);
+       txp_set_next_free(ncrp->tx_packets + x, INVALID_TXP_INDEX);
+       ncrp->head_free_tx_packet = 0;
+
+       skb_queue_head_init(&ncrp->pending_tx_queue);
+       skb_queue_head_init(&ncrp->release_on_flush_batcher);
+
+       if (gnttab_alloc_grant_references(NR_TX_PACKETS,
+                                         &ncrp->gref_pool) < 0)
+               return -1;
+
+       netif_napi_add(ncrp->interface->net_device, &ncrp->napi,
+                      process_ring, 64);
+       napi_enable(&ncrp->napi);
+
+       return 0;
+}
+
+static struct net_device_stats *nc2_get_stats(struct net_device *nd)
+{
+       struct netchannel2 *nc = netdev_priv(nd);
+
+       return &nc->stats;
+}
+
+/* Create a new netchannel2 structure. Call with no locks held.
+   Returns NULL on error.  The xenbus device must remain valid for as
+   long as the netchannel2 structure does.  The core does not take out
+   any kind of reference count on it, but will refer to it throughout
+   the returned netchannel2's life. */
+struct netchannel2 *nc2_new(struct xenbus_device *xd)
+{
+       struct net_device *netdev;
+       struct netchannel2 *nc;
+       int err;
+       int local_trusted;
+       int remote_trusted;
+       int filter_mac;
+
+       if (!gnttab_subpage_grants_available()) {
+               printk(KERN_ERR "netchannel2 needs version 2 grant tables\n");
+               return NULL;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "local-trusted",
+                        "%d", &local_trusted) != 1) {
+               printk(KERN_WARNING "Can't tell whether local endpoint is 
trusted; assuming it is.\n");
+               local_trusted = 1;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "remote-trusted",
+                        "%d", &remote_trusted) != 1) {
+               printk(KERN_WARNING "Can't tell whether local endpoint is 
trusted; assuming it isn't.\n");
+               remote_trusted = 0;
+       }
+
+       if (xenbus_scanf(XBT_NIL, xd->nodename, "filter-mac",
+                        "%d", &filter_mac) != 1) {
+               if (remote_trusted) {
+                       printk(KERN_WARNING "Can't tell whether to filter MAC 
addresses from remote domain; filtering off.\n");
+                       filter_mac = 0;
+               } else {
+                       printk(KERN_WARNING "Can't tell whether to filter MAC 
addresses from remote domain; filtering on.\n");
+                       filter_mac = 1;
+               }
+       }
+
+       netdev = alloc_etherdev(sizeof(*nc));
+       if (netdev == NULL)
+               return NULL;
+
+       nc = netdev_priv(netdev);
+       memset(nc, 0, sizeof(*nc));
+       nc->magic = NETCHANNEL2_MAGIC;
+       nc->net_device = netdev;
+       nc->xenbus_device = xd;
+
+       nc->remote_trusted = remote_trusted;
+       nc->local_trusted = local_trusted;
+       nc->rings.filter_mac = filter_mac;
+
+       skb_queue_head_init(&nc->pending_skbs);
+       if (init_ring_pair(&nc->rings, nc) < 0) {
+               nc2_release(nc);
+               return NULL;
+       }
+
+       netdev->open = nc2_open;
+       netdev->stop = nc2_stop;
+       netdev->hard_start_xmit = nc2_start_xmit;
+       netdev->get_stats = nc2_get_stats;
+
+       /* We need to hold the ring lock in order to send messages
+          anyway, so there's no point in Linux doing additional
+          synchronisation. */
+       netdev->features = NETIF_F_LLTX;
+
+       SET_NETDEV_DEV(netdev, &xd->dev);
+
+       err = read_mac_address(xd->nodename, "remote-mac",
+                              nc->rings.remote_mac);
+       if (err == 0)
+               err = read_mac_address(xd->nodename, "mac", netdev->dev_addr);
+       if (err == 0)
+               err = register_netdev(netdev);
+
+       if (err != 0) {
+               nc2_release(nc);
+               return NULL;
+       }
+
+       return nc;
+}
+
+/* Release a netchannel2 structure previously allocated with
+ * nc2_new().  Call with no locks held.         The rings will be
+ * automatically detach if necessary. */
+void nc2_release(struct netchannel2 *nc)
+{
+       netif_carrier_off(nc->net_device);
+
+       unregister_netdev(nc->net_device);
+
+       nc2_detach_rings(nc);
+
+       /* Unregistering the net device stops any netdev methods from
+          running, and detaching the rings stops the napi methods, so
+          we're now the only thing accessing this netchannel2
+          structure and we can tear it down with impunity. */
+
+       cleanup_ring_pair(&nc->rings);
+
+       nc2_queue_purge(&nc->rings, &nc->pending_skbs);
+
+       free_netdev(nc->net_device);
+}
+
+static void _nc2_attach_rings(struct netchannel2_ring_pair *ncrp,
+                             struct netchannel2_sring_cons *cons_sring,
+                             const volatile void *cons_payload,
+                             size_t cons_size,
+                             struct netchannel2_sring_prod *prod_sring,
+                             void *prod_payload,
+                             size_t prod_size,
+                             domid_t otherend_id)
+{
+       BUG_ON(prod_sring == NULL);
+       BUG_ON(cons_sring == NULL);
+
+       ncrp->prod_ring.sring = prod_sring;
+       ncrp->prod_ring.payload_bytes = prod_size;
+       ncrp->prod_ring.prod_pvt = 0;
+       ncrp->prod_ring.payload = prod_payload;
+
+       ncrp->cons_ring.sring = cons_sring;
+       ncrp->cons_ring.payload_bytes = cons_size;
+       ncrp->cons_ring.sring->prod_event = ncrp->cons_ring.sring->prod + 1;
+       ncrp->cons_ring.cons_pvt = 0;
+       ncrp->cons_ring.payload = cons_payload;
+
+       ncrp->otherend_id = otherend_id;
+
+       ncrp->is_attached = 1;
+
+       ncrp->need_advertise_max_packets = 1;
+}
+
+/* Attach a netchannel2 structure to a ring pair.  The endpoint is
+   also expected to set up an event channel after calling this before
+   using the interface.         Returns 0 on success or <0 on error. */
+int nc2_attach_rings(struct netchannel2 *nc,
+                    struct netchannel2_sring_cons *cons_sring,
+                    const volatile void *cons_payload,
+                    size_t cons_size,
+                    struct netchannel2_sring_prod *prod_sring,
+                    void *prod_payload,
+                    size_t prod_size,
+                    domid_t otherend_id)
+{
+       spin_lock_bh(&nc->rings.lock);
+       _nc2_attach_rings(&nc->rings, cons_sring, cons_payload, cons_size,
+                         prod_sring, prod_payload, prod_size, otherend_id);
+
+       spin_unlock_bh(&nc->rings.lock);
+
+       netif_carrier_on(nc->net_device);
+
+       /* Kick it to get it going. */
+       nc2_kick(&nc->rings);
+
+       return 0;
+}
+
+static void _detach_rings(struct netchannel2_ring_pair *ncrp)
+{
+       spin_lock_bh(&ncrp->lock);
+       /* We need to release all of the pending transmission packets,
+          because they're never going to complete now that we've lost
+          the ring. */
+       drop_pending_tx_packets(ncrp);
+
+       disable_irq(ncrp->irq);
+
+       BUG_ON(ncrp->nr_tx_packets_outstanding);
+       ncrp->max_tx_packets_outstanding = 0;
+
+       /* No way of sending pending finish messages now; drop
+        * them. */
+       ncrp->pending_finish.prod = 0;
+       ncrp->pending_finish.cons = 0;
+
+       ncrp->cons_ring.sring = NULL;
+       ncrp->prod_ring.sring = NULL;
+       ncrp->is_attached = 0;
+
+       spin_unlock_bh(&ncrp->lock);
+}
+
+/* Detach from the rings.  This includes unmapping them and stopping
+   the interrupt. */
+/* Careful: the netdev methods may still be running at this point. */
+/* This is not allowed to wait for the other end, because it might
+   have gone away (e.g. over suspend/resume). */
+static void nc2_detach_ring(struct netchannel2_ring_pair *ncrp)
+{
+       if (!ncrp->is_attached)
+               return;
+
+       napi_disable(&ncrp->napi);
+       _detach_rings(ncrp);
+}
+
+/* Trivial wrapper around nc2_detach_ring().  Make the ring no longer
+   used. */
+void nc2_detach_rings(struct netchannel2 *nc)
+{
+       nc2_detach_ring(&nc->rings);
+
+       /* Okay, all async access to the ring is stopped.  Kill the
+          irqhandlers.  (It might be better to do this from the
+          _detach_ring() functions, but you're not allowed to
+          free_irq() from interrupt context, and tasklets are close
+          enough to cause problems). */
+
+       if (nc->rings.irq >= 0)
+               unbind_from_irqhandler(nc->rings.irq, &nc->rings);
+       nc->rings.irq = -1;
+}
+
+#if defined(CONFIG_XEN_NETDEV2_BACKEND)
+/* Connect to an event channel port in a remote domain.         Returns 0 on
+   success or <0 on error.  The port is automatically disconnected
+   when the channel is released or if the rings are detached.  This
+   should not be called if the port is already open. */
+int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid,
+                      int evtchn)
+{
+       int err;
+
+       BUG_ON(nc->rings.irq >= 0);
+
+       err = bind_interdomain_evtchn_to_irqhandler(domid,
+                                                   evtchn,
+                                                   nc2_int,
+                                                   IRQF_SAMPLE_RANDOM,
+                                                   "netchannel2",
+                                                   &nc->rings);
+       if (err >= 0) {
+               nc->rings.irq = err;
+               nc->rings.evtchn = irq_to_evtchn_port(err);
+               return 0;
+       } else {
+               return err;
+       }
+}
+#endif
+
+#if defined(CONFIG_XEN_NETDEV2_FRONTEND)
+/* Listen for incoming event channel connections from domain domid.
+   Similar semantics to nc2_connect_evtchn(). */
+int nc2_listen_evtchn(struct netchannel2 *nc, domid_t domid)
+{
+       int err;
+
+       BUG_ON(nc->rings.irq >= 0);
+
+       err = bind_listening_port_to_irqhandler(domid,
+                                               nc2_int,
+                                               IRQF_SAMPLE_RANDOM,
+                                               "netchannel2",
+                                               &nc->rings);
+       if (err >= 0) {
+               nc->rings.irq = err;
+               nc->rings.evtchn = irq_to_evtchn_port(err);
+               return 0;
+       } else {
+               return err;
+       }
+}
+#endif
+
+/* Find the local event channel port which was allocated by
+ * nc2_listen_evtchn() or nc2_connect_evtchn().         It is an error to
+ * call this when there is no event channel connected. */
+int nc2_get_evtchn_port(struct netchannel2 *nc)
+{
+       BUG_ON(nc->rings.irq < 0);
+       return nc->rings.evtchn;
+}
+
+/* @ncrp has been recently nc2_kick()ed.  Do all of the necessary
+   stuff. */
+static int process_ring(struct napi_struct *napi,
+                       int work_avail)
+{
+       struct netchannel2_ring_pair *ncrp =
+               container_of(napi, struct netchannel2_ring_pair, napi);
+       struct netchannel2 *nc = ncrp->interface;
+       struct sk_buff *skb;
+       int work_done;
+       struct sk_buff_head rx_queue;
+
+       skb_queue_head_init(&rx_queue);
+
+       spin_lock(&ncrp->lock);
+
+       /* Pick up incoming messages. */
+       work_done = nc2_poll(ncrp, work_avail, &rx_queue);
+
+       /* Transmit pending packets. */
+       if (!skb_queue_empty(&ncrp->pending_tx_queue)) {
+               skb = __skb_dequeue(&ncrp->pending_tx_queue);
+               do {
+                       if (!nc2_really_start_xmit(ncrp, skb)) {
+                               /* Requeue the packet so that we will try
+                                  when the ring is less busy */
+                               __skb_queue_head(&ncrp->pending_tx_queue, skb);
+                               break;
+                       }
+                       skb = __skb_dequeue(&ncrp->pending_tx_queue);
+               } while (skb != NULL);
+
+               flush_rings(ncrp);
+
+               while ((skb = __skb_dequeue(&ncrp->release_on_flush_batcher)))
+                       release_tx_packet(ncrp, skb);
+       }
+
+       if (nc->is_stopped) {
+               /* If the other end has processed some messages, there
+                  may be space on the ring for a delayed send from
+                  earlier.  Process it now. */
+               while (1) {
+                       skb = skb_peek_tail(&nc->pending_skbs);
+                       if (!skb)
+                               break;
+                       if (prepare_xmit_allocate_resources(nc, skb) < 0) {
+                               /* Still stuck */
+                               break;
+                       }
+                       __skb_unlink(skb, &nc->pending_skbs);
+                       queue_packet_to_interface(skb, ncrp);
+               }
+               if (skb_queue_empty(&nc->pending_skbs)) {
+                       nc->is_stopped = 0;
+                       netif_wake_queue(nc->net_device);
+               }
+       }
+
+       spin_unlock(&ncrp->lock);
+
+       receive_pending_skbs(&rx_queue);
+
+       return work_done;
+}
diff --git a/drivers/xen/netchannel2/netback2.c 
b/drivers/xen/netchannel2/netback2.c
new file mode 100644
index 0000000..fd6f238
--- /dev/null
+++ b/drivers/xen/netchannel2/netback2.c
@@ -0,0 +1,354 @@
+#include <linux/kernel.h>
+#include <linux/gfp.h>
+#include <linux/vmalloc.h>
+#include <xen/gnttab.h>
+#include <xen/xenbus.h>
+#include <xen/interface/io/netchannel2.h>
+
+#include "netchannel2_core.h"
+#include "netchannel2_endpoint.h"
+
+#define NETBACK2_MAGIC 0xb5e99485
+struct netback2 {
+       unsigned magic;
+       struct xenbus_device *xenbus_device;
+
+       struct netchannel2 *chan;
+
+       struct grant_mapping b2f_mapping;
+       struct grant_mapping f2b_mapping;
+       struct grant_mapping control_mapping;
+
+       int attached;
+
+       struct xenbus_watch shutdown_watch;
+       int have_shutdown_watch;
+};
+
+static struct netback2 *xenbus_device_to_nb2(struct xenbus_device *xd)
+{
+       struct netback2 *nb = xd->dev.driver_data;
+       BUG_ON(nb->magic != NETBACK2_MAGIC);
+       return nb;
+}
+
+/* Read a range of grants out of xenstore and map them in gm.  Any
+   existing mapping in gm is released. Returns 0 on success or <0 on
+   error.  On error, gm is preserved, and xenbus_dev_fatal() is
+   called. */
+static int map_grants(struct netback2 *nd, const char *prefix,
+                     struct grant_mapping *gm)
+{
+       struct xenbus_device *xd = nd->xenbus_device;
+       int err;
+       char buf[32];
+       int i;
+       unsigned nr_pages;
+       grant_ref_t grefs[MAX_GRANT_MAP_PAGES];
+
+       sprintf(buf, "%s-nr-pages", prefix);
+       err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u", &nr_pages);
+       if (err == -ENOENT) {
+               nr_pages = 1;
+       } else if (err != 1) {
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err, "reading %s", buf);
+                       return err;
+               } else {
+                       xenbus_dev_fatal(xd, err, "reading %s as integer",
+                                        buf);
+                       return -EINVAL;
+               }
+       }
+
+       for (i = 0; i < nr_pages; i++) {
+               sprintf(buf, "%s-ref-%d", prefix, i);
+               err = xenbus_scanf(XBT_NIL, xd->otherend, buf, "%u",
+                                  &grefs[i]);
+               if (err != 1) {
+                       if (err < 0) {
+                               xenbus_dev_fatal(xd,
+                                                err,
+                                                "reading gref %d from %s/%s",
+                                                i,
+                                                xd->otherend,
+                                                buf);
+                       } else {
+                               xenbus_dev_fatal(xd,
+                                                -EINVAL,
+                                                "expected an integer at %s/%s",
+                                                xd->otherend,
+                                                buf);
+                               err = -EINVAL;
+                       }
+                       return err;
+               }
+       }
+
+       err = nc2_map_grants(gm, grefs, nr_pages, xd->otherend_id);
+       if (err < 0)
+               xenbus_dev_fatal(xd, err, "mapping ring %s from %s",
+                                prefix, xd->otherend);
+       return err;
+}
+
+/* Undo the effects of attach_to_frontend */
+static void detach_from_frontend(struct netback2 *nb)
+{
+       if (!nb->attached)
+               return;
+       nc2_detach_rings(nb->chan);
+       nc2_unmap_grants(&nb->b2f_mapping);
+       nc2_unmap_grants(&nb->f2b_mapping);
+       nc2_unmap_grants(&nb->control_mapping);
+       nb->attached = 0;
+}
+
+static int attach_to_frontend(struct netback2 *nd)
+{
+       int err;
+       int evtchn;
+       struct xenbus_device *xd = nd->xenbus_device;
+       struct netchannel2 *nc = nd->chan;
+       struct netchannel2_backend_shared *nbs;
+
+       if (nd->attached)
+               return 0;
+
+       /* Attach the shared memory bits */
+       err = map_grants(nd, "b2f-ring", &nd->b2f_mapping);
+       if (err)
+               return err;
+       err = map_grants(nd, "f2b-ring", &nd->f2b_mapping);
+       if (err)
+               return err;
+       err = map_grants(nd, "control", &nd->control_mapping);
+       if (err)
+               return err;
+       nbs = nd->control_mapping.mapping->addr;
+       err = nc2_attach_rings(nc,
+                              &nbs->cons,
+                              nd->f2b_mapping.mapping->addr,
+                              nd->f2b_mapping.nr_pages * PAGE_SIZE,
+                              &nbs->prod,
+                              nd->b2f_mapping.mapping->addr,
+                              nd->b2f_mapping.nr_pages * PAGE_SIZE,
+                              xd->otherend_id);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err, "attaching to rings");
+               return err;
+       }
+
+       /* Connect the event channel. */
+       err = xenbus_scanf(XBT_NIL, xd->otherend, "event-channel", "%u",
+                          &evtchn);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err,
+                       "reading %s/event-channel or {t,r}x-sring-pages",
+                       xd->otherend);
+               return err;
+       }
+       err = nc2_connect_evtchn(nd->chan, xd->otherend_id, evtchn);
+       if (err < 0) {
+               xenbus_dev_fatal(xd, err, "binding to event channel");
+               return err;
+       }
+
+       /* All done */
+       nd->attached = 1;
+
+       return 0;
+}
+
+static void frontend_changed(struct xenbus_device *xd,
+                            enum xenbus_state frontend_state)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+       int err;
+
+       switch (frontend_state) {
+       case XenbusStateInitialising:
+               /* If the frontend does a kexec following a crash, we
+                  can end up bounced back here even though we're
+                  attached.  Try to recover by detaching from the old
+                  rings. */
+               /* (A normal shutdown, and even a normal kexec, would
+                * have gone through Closed first, so we'll already be
+                * detached, and this is pointless but harmless.) */
+               detach_from_frontend(nb);
+
+               /* Tell the frontend what sort of rings we're willing
+                  to accept. */
+               xenbus_printf(XBT_NIL, nb->xenbus_device->nodename,
+                             "max-sring-pages", "%d", MAX_GRANT_MAP_PAGES);
+
+               /* Start the device bring-up bit of the state
+                * machine. */
+               xenbus_switch_state(nb->xenbus_device, XenbusStateInitWait);
+               break;
+
+       case XenbusStateInitWait:
+               /* Frontend doesn't use this state */
+               xenbus_dev_fatal(xd, EINVAL,
+                                "unexpected frontend state InitWait");
+               break;
+
+       case XenbusStateInitialised:
+       case XenbusStateConnected:
+               /* Frontend has advertised its rings to us */
+               err = attach_to_frontend(nb);
+               if (err >= 0)
+                       xenbus_switch_state(xd, XenbusStateConnected);
+               break;
+
+       case XenbusStateClosing:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               break;
+
+       case XenbusStateClosed:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               if (!xenbus_dev_is_online(xd))
+                       device_unregister(&xd->dev);
+               break;
+
+       case XenbusStateUnknown:
+               detach_from_frontend(nb);
+               xenbus_switch_state(xd, XenbusStateClosed);
+               device_unregister(&xd->dev);
+               break;
+
+       default:
+               /* Ignore transitions to unknown states */
+               break;
+       }
+}
+
+static int netback2_uevent(struct xenbus_device *xd,
+                          struct kobj_uevent_env *env)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+
+       add_uevent_var(env, "vif=%s", nb->chan->net_device->name);
+
+       return 0;
+}
+
+static void netback2_shutdown(struct xenbus_device *xd)
+{
+       xenbus_switch_state(xd, XenbusStateClosing);
+}
+
+static void shutdown_watch_callback(struct xenbus_watch *watch,
+                                   const char **vec,
+                                   unsigned int len)
+{
+       struct netback2 *nb =
+               container_of(watch, struct netback2, shutdown_watch);
+       char *type;
+
+       type = xenbus_read(XBT_NIL, nb->xenbus_device->nodename,
+                          "shutdown-request", NULL);
+       if (IS_ERR(type)) {
+               if (PTR_ERR(type) != -ENOENT)
+                       printk(KERN_WARNING "Cannot read %s/%s: %ld\n",
+                              nb->xenbus_device->nodename, "shutdown-request",
+                              PTR_ERR(type));
+               return;
+       }
+       if (strcmp(type, "force") == 0) {
+               detach_from_frontend(nb);
+               xenbus_switch_state(nb->xenbus_device, XenbusStateClosed);
+       } else if (strcmp(type, "normal") == 0) {
+               netback2_shutdown(nb->xenbus_device);
+       } else {
+               printk(KERN_WARNING "Unrecognised shutdown request %s from 
tools\n",
+                      type);
+       }
+       xenbus_rm(XBT_NIL, nb->xenbus_device->nodename, "shutdown-request");
+       kfree(type);
+}
+
+static int netback2_probe(struct xenbus_device *xd,
+                         const struct xenbus_device_id *id)
+{
+       struct netback2 *nb;
+
+       nb = kzalloc(sizeof(*nb), GFP_KERNEL);
+       if (nb == NULL)
+               goto err;
+       nb->magic = NETBACK2_MAGIC;
+       nb->xenbus_device = xd;
+
+       nb->shutdown_watch.node = kasprintf(GFP_KERNEL, "%s/shutdown-request",
+                                           xd->nodename);
+       if (nb->shutdown_watch.node == NULL)
+               goto err;
+       nb->shutdown_watch.callback = shutdown_watch_callback;
+       if (register_xenbus_watch(&nb->shutdown_watch))
+               goto err;
+       nb->have_shutdown_watch = 1;
+
+       nb->chan = nc2_new(xd);
+       if (!nb->chan)
+               goto err;
+
+       xd->dev.driver_data = nb;
+
+       kobject_uevent(&xd->dev.kobj, KOBJ_ONLINE);
+
+       return 0;
+
+err:
+       if (nb != NULL) {
+               if (nb->have_shutdown_watch)
+                       unregister_xenbus_watch(&nb->shutdown_watch);
+               kfree(nb->shutdown_watch.node);
+               kfree(nb);
+       }
+       xenbus_dev_fatal(xd, ENOMEM, "probing netdev");
+       return -ENOMEM;
+}
+
+static int netback2_remove(struct xenbus_device *xd)
+{
+       struct netback2 *nb = xenbus_device_to_nb2(xd);
+       kobject_uevent(&xd->dev.kobj, KOBJ_OFFLINE);
+       if (nb->chan != NULL)
+               nc2_release(nb->chan);
+       if (nb->have_shutdown_watch)
+               unregister_xenbus_watch(&nb->shutdown_watch);
+       kfree(nb->shutdown_watch.node);
+       nc2_unmap_grants(&nb->b2f_mapping);
+       nc2_unmap_grants(&nb->f2b_mapping);
+       nc2_unmap_grants(&nb->control_mapping);
+       kfree(nb);
+       return 0;
+}
+
+static const struct xenbus_device_id netback2_ids[] = {
+       { "vif2" },
+       { "" }
+};
+
+static struct xenbus_driver netback2 = {
+       .name = "vif2",
+       .ids = netback2_ids,
+       .probe = netback2_probe,
+       .remove = netback2_remove,
+       .otherend_changed = frontend_changed,
+       .uevent = netback2_uevent,
+};
+
+int __init netback2_init(void)
+{
+       int r;
+
+       r = xenbus_register_backend(&netback2);
+       if (r < 0) {
+               printk(KERN_ERR "error %d registering backend driver.\n",
+                      r);
+       }
+       return r;
+}
diff --git a/drivers/xen/netchannel2/netchan2.c 
b/drivers/xen/netchannel2/netchan2.c
new file mode 100644
index 0000000..b23b7e4
--- /dev/null
+++ b/drivers/xen/netchannel2/netchan2.c
@@ -0,0 +1,32 @@
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include "netchannel2_endpoint.h"
+
+static int __init netchan2_init(void)
+{
+       int r;
+
+       r = nc2_init();
+       if (r < 0)
+               return r;
+       r = netfront2_init();
+       if (r < 0)
+               return r;
+       r = netback2_init();
+       if (r < 0)
+               netfront2_exit();
+       return r;
+}
+module_init(netchan2_init);
+
+/* We can't unload if we're acting as a backend. */
+#ifndef CONFIG_XEN_NETDEV2_BACKEND
+static void __exit netchan2_exit(void)
+{
+       netfront2_exit();
+       nc2_exit();
+}
+module_exit(netchan2_exit);
+#endif
+
+MODULE_LICENSE("GPL");
diff --git a/drivers/xen/netchannel2/netchannel2_core.h 
b/drivers/xen/netchannel2/netchannel2_core.h
new file mode 100644
index 0000000..6ae273d
--- /dev/null
+++ b/drivers/xen/netchannel2/netchannel2_core.h
@@ -0,0 +1,351 @@
+#ifndef NETCHANNEL2_CORE_H__
+#define NETCHANNEL2_CORE_H__
+
+#include <xen/interface/xen.h>
+#include <xen/gnttab.h>
+#include <xen/interface/io/netchannel2.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+
+/* After we send this number of frags, we request the other end to
+ * notify us when sending the corresponding finish packet message */
+#define MAX_MAX_COUNT_FRAGS_NO_EVENT 192
+
+/* Very small packets (e.g. TCP pure acks) are sent inline in the
+ * ring, to avoid the hypercall overhead.  This is the largest packet
+ * which will be sent small, in bytes. It should be big enough to
+ * cover the normal headers (i.e. ethernet + IP + TCP = 66 bytes) plus
+ * a little bit of slop for options etc. */
+#define PACKET_PREFIX_SIZE 96
+
+/* How many packets can we have outstanding at any one time?  This
+ * must be small enough that it won't be confused with an sk_buff
+ * pointer; see the txp_slot stuff later. */
+#define NR_TX_PACKETS 256
+
+/* A way of keeping track of a mapping of a bunch of grant references
+   into a contigous chunk of virtual address space.  This is used for
+   things like multi-page rings. */
+#define MAX_GRANT_MAP_PAGES 4
+struct grant_mapping {
+       unsigned nr_pages;
+       grant_handle_t handles[MAX_GRANT_MAP_PAGES];
+       struct vm_struct *mapping;
+};
+
+enum transmit_policy {
+       transmit_policy_unknown = 0,
+       transmit_policy_first = 0xf001,
+       transmit_policy_grant = transmit_policy_first,
+       transmit_policy_small,
+       transmit_policy_last = transmit_policy_small
+};
+
+/* When we send a packet message, we need to tag it with an ID.         That
+   ID is an index into the TXP slot array.  Each slot contains either
+   a pointer to an sk_buff (if it's in use), or the index of the next
+   free slot (if it isn't).  A slot is in use if the contents is >
+   NR_TX_PACKETS, and free otherwise. */
+struct txp_slot {
+       unsigned long __contents;
+};
+
+typedef uint32_t nc2_txp_index_t;
+
+#define INVALID_TXP_INDEX ((nc2_txp_index_t)NR_TX_PACKETS)
+
+static inline int txp_slot_in_use(struct txp_slot *slot)
+{
+       if (slot->__contents <= NR_TX_PACKETS)
+               return 0;
+       else
+               return 1;
+}
+
+static inline void txp_set_skb(struct txp_slot *slot, struct sk_buff *skb)
+{
+       slot->__contents = (unsigned long)skb;
+}
+
+static inline struct sk_buff *txp_get_skb(struct txp_slot *slot)
+{
+       if (txp_slot_in_use(slot))
+               return (struct sk_buff *)slot->__contents;
+       else
+               return NULL;
+}
+
+static inline void txp_set_next_free(struct txp_slot *slot,
+                                    nc2_txp_index_t idx)
+{
+       slot->__contents = idx;
+}
+
+static inline nc2_txp_index_t txp_get_next_free(struct txp_slot *slot)
+{
+       return (nc2_txp_index_t)slot->__contents;
+}
+
+/* This goes in struct sk_buff::cb */
+struct skb_cb_overlay {
+       struct txp_slot *tp;
+       unsigned nr_fragments;
+       grant_ref_t gref_pool;
+       enum transmit_policy policy;
+       uint8_t failed;
+       uint8_t expecting_finish;
+       uint8_t type;
+       uint16_t inline_prefix_size;
+};
+
+#define CASSERT(x) typedef unsigned __cassert_ ## __LINE__ [(x)-1]
+CASSERT(sizeof(struct skb_cb_overlay) <= sizeof(((struct sk_buff *)0)->cb));
+
+static inline struct skb_cb_overlay *get_skb_overlay(struct sk_buff *skb)
+{
+       return (struct skb_cb_overlay *)skb->cb;
+}
+
+
+/* Packets for which we need to send FINISH_PACKET messages for as
+   soon as possible. */
+struct pending_finish_packets {
+#define MAX_PENDING_FINISH_PACKETS 256
+       uint32_t ids[MAX_PENDING_FINISH_PACKETS];
+       RING_IDX prod;
+       RING_IDX cons;
+};
+
+#define RX_GRANT_COPY_BATCH 32
+struct hypercall_batcher {
+       unsigned nr_pending_gops;
+       gnttab_copy_t gops[RX_GRANT_COPY_BATCH];
+       void *ctxt[RX_GRANT_COPY_BATCH];
+};
+
+struct netchannel2_ring_pair {
+       struct netchannel2 *interface;
+       /* Main ring lock.  Acquired from bottom halves. */
+       spinlock_t lock;
+
+       struct napi_struct napi;
+
+       /* Protected by the lock.  Initialised at attach_ring() time
+          and de-initialised at detach_ring() time. */
+       struct netchannel2_prod_ring prod_ring;
+       struct netchannel2_cons_ring cons_ring;
+       uint8_t is_attached; /* True if the rings are currently safe to
+                               access. */
+
+       unsigned max_count_frags_no_event;
+       unsigned expected_finish_messages;
+
+       domid_t otherend_id;
+
+       grant_ref_t gref_pool;
+
+       /* The IRQ corresponding to the event channel which is
+          connected to the other end.  This only changes from the
+          xenbus state change handler.  It is notified from lots of
+          other places.  Fortunately, it's safe to notify on an irq
+          after it's been released, so the lack of synchronisation
+          doesn't matter. */
+       int irq;
+       int evtchn;
+
+       /* The MAC address of our peer. */
+       unsigned char remote_mac[ETH_ALEN];
+
+       /* Set if we need to check the source MAC address on incoming
+          packets. */
+       int filter_mac;
+
+       /* A pool of free transmitted_packet structures, threaded on
+          the list member.  Protected by the lock. */
+       nc2_txp_index_t head_free_tx_packet;
+
+       /* Total number of packets on the allocated list.  Protected
+          by the lock. */
+       unsigned nr_tx_packets_outstanding;
+       /* Maximum number of packets which the other end will allow us
+          to keep outstanding at one time.  Valid whenever
+          is_attached is set. */
+       unsigned max_tx_packets_outstanding;
+
+       /* Count number of frags that we have sent to the other side
+          When we reach a max value we request that the other end
+          send an event when sending the corresponding finish message */
+       unsigned count_frags_no_event;
+
+       /* Set if we need to send a SET_MAX_PACKETS message.
+          Protected by the lock. */
+       uint8_t need_advertise_max_packets;
+
+       /* Set if there are messages on the ring which are considered
+          time-sensitive, so that it's necessary to notify the remote
+          endpoint as soon as possible. */
+       uint8_t pending_time_sensitive_messages;
+
+       /* Set if we've previously suppressed a remote notification
+          because none of the messages pending at the time of the
+          flush were time-sensitive.  The remote should be notified
+          as soon as the ring is flushed, even if the normal
+          filtering rules would suppress the event. */
+       uint8_t delayed_kick;
+
+       /* A list of packet IDs which we need to return to the other
+          end as soon as there is space on the ring.  Protected by
+          the lock. */
+       struct pending_finish_packets pending_finish;
+
+       /* transmitted_packet structures which are to be transmitted
+          next time the TX tasklet looks at this interface.
+          Protected by the lock. */
+       struct sk_buff_head pending_tx_queue;
+
+       /* Packets which we'll have finished transmitting as soon as
+          we flush the hypercall batcher.  Protected by the lock. */
+       struct sk_buff_head release_on_flush_batcher;
+
+       struct hypercall_batcher pending_rx_hypercalls;
+
+       /* A pre-allocated pool of TX packets.  The
+          allocated_tx_packets and free_tx_packets linked lists
+          contain elements of this array, and it can also be directly
+          indexed by packet ID.  Protected by the lock. */
+       struct txp_slot tx_packets[NR_TX_PACKETS];
+};
+
+struct netchannel2 {
+#define NETCHANNEL2_MAGIC 0x57c68c1d
+       unsigned magic;
+
+       /* Set when the structure is created and never changed */
+       struct net_device *net_device;
+       struct xenbus_device *xenbus_device;
+
+       /* Set if we trust the remote endpoint. */
+       int remote_trusted;
+       /* Set if the remote endpoint is expected to trust us.
+          There's no guarantee that this is actually correct, but
+          it's useful for optimisation. */
+       int local_trusted;
+
+       struct netchannel2_ring_pair rings;
+
+       /* Packets which we need to transmit soon */
+       struct sk_buff_head pending_skbs;
+
+       /* Flag to indicate that the interface is stopped
+          When the interface is stopped we need to run the tasklet
+          after we receive an interrupt so that we can wake it up */
+       uint8_t is_stopped;
+
+       /* Updates are protected by the lock.  This can be read at any
+        * time without holding any locks, and the rest of Linux is
+        * expected to cope. */
+       struct net_device_stats stats;
+};
+
+static inline void flush_prepared_grant_copies(struct hypercall_batcher *hb,
+                                              void (*on_fail)(void *ctxt,
+                                                              gnttab_copy_t 
*gop))
+{
+       unsigned x;
+
+       if (hb->nr_pending_gops == 0)
+               return;
+       if (HYPERVISOR_grant_table_op(GNTTABOP_copy, hb->gops,
+                                     hb->nr_pending_gops))
+               BUG();
+       for (x = 0; x < hb->nr_pending_gops; x++)
+               if (hb->gops[x].status != GNTST_okay)
+                       on_fail(hb->ctxt[x], &hb->gops[x]);
+       hb->nr_pending_gops = 0;
+}
+
+static inline gnttab_copy_t *hypercall_batcher_grant_copy(struct 
hypercall_batcher *hb,
+                                                         void *ctxt,
+                                                         void (*on_fail)(void 
*,
+                                                                         
gnttab_copy_t *gop))
+{
+       if (hb->nr_pending_gops == ARRAY_SIZE(hb->gops))
+               flush_prepared_grant_copies(hb, on_fail);
+       hb->ctxt[hb->nr_pending_gops] = ctxt;
+       return &hb->gops[hb->nr_pending_gops++];
+}
+
+static inline void flush_hypercall_batcher(struct hypercall_batcher *hb,
+                                          void (*on_fail)(void *,
+                                                          gnttab_copy_t *gop))
+{
+       flush_prepared_grant_copies(hb, on_fail);
+}
+
+struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc,
+                                           struct netchannel2_ring_pair *ncrp,
+                                           struct netchannel2_msg_packet *msg,
+                                           struct netchannel2_msg_hdr *hdr,
+                                           unsigned nr_frags,
+                                           unsigned frags_off);
+
+int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp,
+                                                      struct sk_buff *skb);
+int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb);
+void xmit_grant(struct netchannel2_ring_pair *ncrp,
+               struct sk_buff *skb,
+               volatile void *msg);
+
+void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
+                                uint32_t id, uint8_t flags);
+
+int allocate_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb);
+void release_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb);
+/* Releases the txp slot, the grant pool, and the skb */
+void release_tx_packet(struct netchannel2_ring_pair *ncrp,
+                      struct sk_buff *skb);
+
+void fetch_fragment(struct netchannel2_ring_pair *ncrp,
+                   unsigned idx,
+                   struct netchannel2_fragment *frag,
+                   unsigned off);
+
+void nc2_kick(struct netchannel2_ring_pair *ncrp);
+
+int nc2_map_grants(struct grant_mapping *gm,
+                  const grant_ref_t *grefs,
+                  unsigned nr_grefs,
+                  domid_t remote_domain);
+void nc2_unmap_grants(struct grant_mapping *gm);
+
+void queue_packet_to_interface(struct sk_buff *skb,
+                              struct netchannel2_ring_pair *ncrp);
+
+void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop);
+
+int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev);
+int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
+                          struct sk_buff *skb);
+int prepare_xmit_allocate_resources(struct netchannel2 *nc,
+                                   struct sk_buff *skb);
+void nc2_handle_finish_packet_msg(struct netchannel2 *nc,
+                                 struct netchannel2_ring_pair *ncrp,
+                                 struct netchannel2_msg_hdr *hdr);
+void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp,
+                                   struct netchannel2_msg_hdr *hdr);
+void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp);
+
+void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp);
+void nc2_handle_packet_msg(struct netchannel2 *nc,
+                          struct netchannel2_ring_pair *ncrp,
+                          struct netchannel2_msg_hdr *hdr,
+                          struct sk_buff_head *pending_rx_queue);
+void advertise_max_packets(struct netchannel2_ring_pair *ncrp);
+void receive_pending_skbs(struct sk_buff_head *rx_queue);
+void nc2_queue_purge(struct netchannel2_ring_pair *ncrp,
+                    struct sk_buff_head *queue);
+
+#endif /* !NETCHANNEL2_CORE_H__ */
diff --git a/drivers/xen/netchannel2/netchannel2_endpoint.h 
b/drivers/xen/netchannel2/netchannel2_endpoint.h
new file mode 100644
index 0000000..2525f23
--- /dev/null
+++ b/drivers/xen/netchannel2/netchannel2_endpoint.h
@@ -0,0 +1,63 @@
+/* Interface between the endpoint implementations (netfront2.c,
+   netback2.c) and the netchannel2 core (chan.c and the various
+   transmission modes).         */
+#ifndef NETCHANNEL2_ENDPOINT_H__
+#define NETCHANNEL2_ENDPOINT_H__
+
+#include <linux/init.h>
+#include <xen/interface/xen.h>
+
+struct netchannel2_sring_prod;
+struct netchannel2_sring_cons;
+struct netchannel2;
+struct xenbus_device;
+
+struct netchannel2 *nc2_new(struct xenbus_device *xd);
+void nc2_release(struct netchannel2 *nc);
+
+int nc2_attach_rings(struct netchannel2 *nc,
+                    struct netchannel2_sring_cons *cons_sring,
+                    const volatile void *cons_payload,
+                    size_t cons_size,
+                    struct netchannel2_sring_prod *prod_sring,
+                    void *prod_payload,
+                    size_t prod_size,
+                    domid_t otherend_id);
+void nc2_detach_rings(struct netchannel2 *nc);
+#if defined(CONFIG_XEN_NETDEV2_FRONTEND)
+int nc2_listen_evtchn(struct netchannel2 *nc, domid_t dom);
+#endif
+#if defined(CONFIG_XEN_NETDEV2_BACKEND)
+int nc2_connect_evtchn(struct netchannel2 *nc, domid_t domid,
+                      int evtchn);
+#endif
+int nc2_get_evtchn_port(struct netchannel2 *nc);
+void nc2_suspend(struct netchannel2 *nc);
+
+void nc2_set_nr_tx_buffers(struct netchannel2 *nc, unsigned nr_buffers);
+
+/* Interface which the endpoints provide to the core. */
+#ifdef CONFIG_XEN_NETDEV2_FRONTEND
+int __init netfront2_init(void);
+void __exit netfront2_exit(void);
+#else
+static inline int netfront2_init(void)
+{
+       return 0;
+}
+static inline void netfront2_exit(void)
+{
+}
+#endif
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+int __init netback2_init(void);
+#else
+static inline int netback2_init(void)
+{
+       return 0;
+}
+#endif
+int __init nc2_init(void);
+void __exit nc2_exit(void);
+
+#endif /* NETCHANNEL2_ENDPOINT_H__ */
diff --git a/drivers/xen/netchannel2/netfront2.c 
b/drivers/xen/netchannel2/netfront2.c
new file mode 100644
index 0000000..fb5d426
--- /dev/null
+++ b/drivers/xen/netchannel2/netfront2.c
@@ -0,0 +1,488 @@
+#include <linux/kernel.h>
+#include <linux/gfp.h>
+#include <linux/version.h>
+#include <xen/gnttab.h>
+#include <xen/xenbus.h>
+
+#include "netchannel2_core.h"
+#include "netchannel2_endpoint.h"
+
+#define MAX_SRING_PAGES 4
+
+struct netfront2 {
+#define NETFRONT2_MAGIC 0x9268e704
+       unsigned magic;
+       struct xenbus_device *xenbus_device;
+
+       void *f2b_sring;
+       grant_ref_t f2b_grefs[MAX_SRING_PAGES];
+       void *b2f_sring;
+       grant_ref_t b2f_grefs[MAX_SRING_PAGES];
+
+       struct netchannel2_frontend_shared *control_shared;
+       grant_ref_t control_shared_gref;
+
+       int nr_sring_pages;
+       int sring_order;
+
+       grant_ref_t rings_gref_pool; /* Some pre-allocated grant
+                                       references to cover the shared
+                                       rings. */
+
+       struct netchannel2 *chan;
+
+       int attached; /* True if the shared rings are ready to go. */
+};
+
+static struct netfront2 *xenbus_device_to_nf2(struct xenbus_device *xd)
+{
+       struct netfront2 *work = xd->dev.driver_data;
+       BUG_ON(work->magic != NETFRONT2_MAGIC);
+       return work;
+}
+
+/* Try to revoke a bunch of grant references and return the grefs to
+   the rings grefs pool.  Any cleared grefs are set to 0.  Returns 0
+   on success or <0 on error.  Ignores zero entries in the @grefs
+   list, and zeroes any entries which are successfully ended. */
+static int ungrant_access_to_ring(struct netfront2 *nf,
+                                 grant_ref_t *grefs,
+                                 int nr_pages)
+{
+       int i;
+       int succ;
+       int failed;
+
+       failed = 0;
+
+       for (i = 0; i < nr_pages; i++) {
+               if (grefs[i]) {
+                       succ = gnttab_end_foreign_access_ref(grefs[i]);
+                       if (!succ) {
+                               /* XXX we can't recover when this
+                                * happens.  Try to do something
+                                * vaguely plausible, but the device
+                                * is pretty much doomed. */
+                               printk(KERN_WARNING "Failed to end access to 
gref %d\n",
+                                      i);
+                               failed = 1;
+                               continue;
+                       }
+                       gnttab_release_grant_reference(&nf->rings_gref_pool,
+                                                      grefs[i]);
+                       grefs[i] = 0;
+               }
+       }
+
+       if (failed)
+               return -EBUSY;
+       else
+               return 0;
+}
+
+/* Allocate and initialise grant references to cover a bunch of pages.
+   @ring should be in the direct-mapped region.         The rings_gref_pool
+   on nf should contain at least @nr_pages references.
+   Already-populated slots in the @grefs list are left unchanged. */
+static void grant_access_to_ring(struct netfront2 *nf,
+                                domid_t otherend,
+                                void *ring,
+                                int *grefs,
+                                int nr_pages)
+{
+       void *p;
+       int i;
+       grant_ref_t ref;
+
+       for (i = 0; i < nr_pages; i++) {
+
+               if (grefs[i] != 0)
+                       continue;
+
+               p = (void *)((unsigned long)ring + PAGE_SIZE * i);
+
+               ref = gnttab_claim_grant_reference(&nf->rings_gref_pool);
+               /* There should be enough grefs in the pool to handle
+                  the rings. */
+               BUG_ON(ref < 0);
+               gnttab_grant_foreign_access_ref(ref,
+                                               otherend,
+                                               virt_to_mfn(p),
+                                               0);
+               grefs[i] = ref;
+       }
+}
+
+/* Push an already-granted ring into xenstore. */
+static int publish_ring(struct xenbus_transaction xbt,
+                       struct netfront2 *nf,
+                       const char *prefix,
+                       const int *grefs,
+                       int nr_grefs)
+{
+       int i;
+       char buf[32];
+       int err;
+
+       sprintf(buf, "%s-nr-pages", prefix);
+       err = xenbus_printf(xbt, nf->xenbus_device->nodename, buf,
+                           "%u", nr_grefs);
+       if (err)
+               return err;
+
+       for (i = 0; i < nr_grefs; i++) {
+               BUG_ON(grefs[i] == 0);
+               sprintf(buf, "%s-ref-%u", prefix, i);
+               err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                                   buf, "%u", grefs[i]);
+               if (err)
+                       return err;
+       }
+       return 0;
+}
+
+static int publish_rings(struct netfront2 *nf)
+{
+       int err;
+       struct xenbus_transaction xbt;
+       const char *msg;
+
+again:
+       err = xenbus_transaction_start(&xbt);
+       if (err) {
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "starting transaction");
+               return err;
+       }
+
+       err = publish_ring(xbt, nf, "f2b-ring", nf->f2b_grefs,
+                          nf->nr_sring_pages);
+       if (err) {
+               msg = "publishing f2b-ring";
+               goto abort;
+       }
+       err = publish_ring(xbt, nf, "b2f-ring", nf->b2f_grefs,
+                          nf->nr_sring_pages);
+       if (err) {
+               msg = "publishing b2f-ring";
+               goto abort;
+       }
+       err = publish_ring(xbt, nf, "control", &nf->control_shared_gref, 1);
+       if (err) {
+               msg = "publishing control";
+               goto abort;
+       }
+       err = xenbus_printf(xbt, nf->xenbus_device->nodename,
+                           "event-channel", "%u",
+                           nc2_get_evtchn_port(nf->chan));
+       if (err) {
+               msg = "publishing event channel";
+               goto abort;
+       }
+
+       err = xenbus_transaction_end(xbt, 0);
+       if (err) {
+               if (err == -EAGAIN)
+                       goto again;
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "completing transaction");
+       }
+
+       return err;
+
+abort:
+       xenbus_transaction_end(xbt, 1);
+       xenbus_dev_fatal(nf->xenbus_device, err, msg);
+       return err;
+}
+
+/* Release the rings.  WARNING: This will leak memory if the other end
+   still has the rings mapped. There isn't really anything we can do
+   about that; the alternative (giving the other end access to
+   whatever Linux puts in the memory after we released it) is probably
+   worse. */
+static void release_rings(struct netfront2 *nf)
+{
+       int have_outstanding_grants;
+
+       have_outstanding_grants = 0;
+
+       if (nf->f2b_sring) {
+               if (ungrant_access_to_ring(nf, nf->f2b_grefs,
+                                          nf->nr_sring_pages) >= 0) {
+                       free_pages((unsigned long)nf->f2b_sring,
+                                  nf->sring_order);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->f2b_sring = NULL;
+       }
+
+       if (nf->b2f_sring) {
+               if (ungrant_access_to_ring(nf, nf->b2f_grefs,
+                                          nf->nr_sring_pages) >= 0) {
+                       free_pages((unsigned long)nf->b2f_sring,
+                                  nf->sring_order);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->b2f_sring = NULL;
+       }
+
+       if (nf->control_shared) {
+               if (ungrant_access_to_ring(nf, &nf->control_shared_gref,
+                                          1) >= 0) {
+                       free_page((unsigned long)nf->control_shared);
+               } else {
+                       have_outstanding_grants = 1;
+               }
+               nf->control_shared = NULL;
+       }
+
+       if (have_outstanding_grants != 0) {
+               printk(KERN_WARNING
+                      "Released shared rings while the backend still had them 
mapped; leaking memory\n");
+       }
+
+       /* We can't release the gref pool if there are still
+          references outstanding against it. */
+       if (!have_outstanding_grants) {
+               if (nf->rings_gref_pool)
+                       gnttab_free_grant_references(nf->rings_gref_pool);
+               nf->rings_gref_pool = 0;
+       }
+
+       nf->attached = 0;
+}
+
+static int allocate_rings(struct netfront2 *nf, domid_t otherend)
+{
+       int err;
+       int max_sring_pages;
+       int sring_order;
+       int nr_sring_pages;
+       size_t sring_size;
+
+       /* Figure out how big our shared rings are going to be. */
+       err = xenbus_scanf(XBT_NIL, nf->xenbus_device->otherend,
+                          "max-sring-pages", "%d", &max_sring_pages);
+       if (err < 0) {
+               xenbus_dev_fatal(nf->xenbus_device, err,
+                                "reading %s/max-sring-pages",
+                                nf->xenbus_device->otherend);
+               return err;
+       }
+       if (max_sring_pages > MAX_SRING_PAGES)
+               max_sring_pages = MAX_SRING_PAGES;
+       sring_order = order_base_2(max_sring_pages);
+       nr_sring_pages = 1 << sring_order;
+       sring_size = nr_sring_pages * PAGE_SIZE;
+
+       release_rings(nf);
+
+       nf->nr_sring_pages = nr_sring_pages;
+       nf->sring_order = sring_order;
+
+       nf->f2b_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order);
+       if (!nf->f2b_sring)
+               return -ENOMEM;
+       memset(nf->f2b_sring, 0, sring_size);
+
+       nf->b2f_sring = (void *)__get_free_pages(GFP_KERNEL, sring_order);
+       if (!nf->b2f_sring)
+               return -ENOMEM;
+       memset(nf->b2f_sring, 0, sring_size);
+
+       nf->control_shared = (void *)get_zeroed_page(GFP_KERNEL);
+       if (!nf->control_shared)
+               return -ENOMEM;
+
+       /* Pre-allocate enough grant references to be sure that we can
+          grant access to both rings without an error. */
+       err = gnttab_alloc_grant_references(nr_sring_pages * 2 + 1,
+                                           &nf->rings_gref_pool);
+       if (err < 0)
+               return err;
+
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->b2f_sring,
+                            nf->b2f_grefs,
+                            nr_sring_pages);
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->f2b_sring,
+                            nf->f2b_grefs,
+                            nr_sring_pages);
+       grant_access_to_ring(nf,
+                            otherend,
+                            nf->control_shared,
+                            &nf->control_shared_gref,
+                            1);
+       err = nc2_listen_evtchn(nf->chan, otherend);
+       if (err < 0)
+               return err;
+
+       nf->attached = 1;
+
+       return 0;
+}
+
+static void backend_changed(struct xenbus_device *xd,
+                           enum xenbus_state backend_state)
+{
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+       int err;
+
+       switch (backend_state) {
+       case XenbusStateInitialising:
+               /* Backend isn't ready yet, don't do anything. */
+               break;
+
+       case XenbusStateInitWait:
+               /* Backend has advertised the ring protocol.  Allocate
+                  the rings, and tell the backend about them. */
+
+               err = 0;
+               if (!nf->attached)
+                       err = allocate_rings(nf, xd->otherend_id);
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err, "allocating shared rings");
+                       break;
+               }
+               err = publish_rings(nf);
+               if (err >= 0)
+                       xenbus_switch_state(xd, XenbusStateInitialised);
+               break;
+
+       case XenbusStateInitialised:
+               /* Backend isn't supposed to use this state. */
+               xenbus_dev_fatal(xd, EINVAL,
+                                "unexpected backend state Initialised");
+               break;
+
+       case XenbusStateConnected:
+               /* All ready */
+               err = nc2_attach_rings(nf->chan,
+                                      &nf->control_shared->cons,
+                                      nf->b2f_sring,
+                                      nf->nr_sring_pages * PAGE_SIZE,
+                                      &nf->control_shared->prod,
+                                      nf->f2b_sring,
+                                      nf->nr_sring_pages * PAGE_SIZE,
+                                      nf->xenbus_device->otherend_id);
+               if (err < 0) {
+                       xenbus_dev_fatal(xd, err,
+                                        "failed to attach to rings");
+               } else {
+                       xenbus_switch_state(xd, XenbusStateConnected);
+               }
+               break;
+
+       case XenbusStateClosing:
+               xenbus_switch_state(xd, XenbusStateClosing);
+               break;
+
+       case XenbusStateClosed:
+               /* Tell the tools that it's safe to remove the device
+                  from the bus. */
+               xenbus_frontend_closed(xd);
+               /* Note that we don't release the rings here.  This
+                  means that if the backend moves to a different
+                  domain, we won't be able to reconnect, but it also
+                  limits the amount of memory which can be wasted in
+                  the release_rings() leak if the backend is faulty
+                  or malicious.  It's not obvious which is more
+                  useful, and so I choose the safer but less
+                  featureful approach. */
+               /* This is only a problem if you're using driver
+                  domains and trying to recover from a driver error
+                  by rebooting the backend domain.  The rest of the
+                  tools don't support that, so it's a bit
+                  theoretical.  The memory leaks aren't, though. */
+               break;
+
+       case XenbusStateUnknown:
+               /* The tools have removed the device area from the
+                  store.  Do nothing and rely on xenbus core to call
+                  our remove method. */
+               break;
+
+       default:
+               /* Ignore transitions to unknown states */
+               break;
+       }
+}
+
+static int __devinit netfront_probe(struct xenbus_device *xd,
+                                   const struct xenbus_device_id *id)
+{
+       struct netfront2 *nf;
+
+       nf = kzalloc(sizeof(*nf), GFP_KERNEL);
+       if (nf == NULL)
+               goto err;
+       nf->magic = NETFRONT2_MAGIC;
+       nf->xenbus_device = xd;
+       nf->chan = nc2_new(xd);
+       if (nf->chan == NULL)
+               goto err;
+
+       xd->dev.driver_data = nf;
+
+       return 0;
+
+err:
+       kfree(nf);
+       xenbus_dev_fatal(xd, ENOMEM, "probing netdev");
+       return -ENOMEM;
+}
+
+static int netfront_resume(struct xenbus_device *xd)
+{
+       /* We've been suspended and come back.  The rings are
+          therefore dead.  Tear them down. */
+       /* We rely on the normal xenbus state machine to bring them
+          back to life. */
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+
+       nc2_detach_rings(nf->chan);
+       release_rings(nf);
+
+       return 0;
+}
+
+static int __devexit netfront_remove(struct xenbus_device *xd)
+{
+       struct netfront2 *nf = xenbus_device_to_nf2(xd);
+       if (nf->chan != NULL)
+               nc2_release(nf->chan);
+       release_rings(nf);
+       kfree(nf);
+       return 0;
+}
+
+static const struct xenbus_device_id netfront_ids[] = {
+       { "vif2" },
+       { "" }
+};
+MODULE_ALIAS("xen:vif2");
+
+static struct xenbus_driver netfront2 = {
+       .name = "vif2",
+       .ids = netfront_ids,
+       .probe = netfront_probe,
+       .remove = __devexit_p(netfront_remove),
+       .otherend_changed = backend_changed,
+       .resume = netfront_resume,
+};
+
+int __init netfront2_init(void)
+{
+       return xenbus_register_frontend(&netfront2);
+}
+
+void __exit netfront2_exit(void)
+{
+       xenbus_unregister_driver(&netfront2);
+}
diff --git a/drivers/xen/netchannel2/recv_packet.c 
b/drivers/xen/netchannel2/recv_packet.c
new file mode 100644
index 0000000..4678c28
--- /dev/null
+++ b/drivers/xen/netchannel2/recv_packet.c
@@ -0,0 +1,216 @@
+/* Support for receiving individual packets, and all the stuff which
+ * goes with that. */
+#include <linux/kernel.h>
+#include <linux/etherdevice.h>
+#include <linux/version.h>
+#include "netchannel2_core.h"
+
+/* Send as many finish packet messages as will fit on the ring. */
+void send_finish_packet_messages(struct netchannel2_ring_pair *ncrp)
+{
+       struct pending_finish_packets *pfp = &ncrp->pending_finish;
+       struct netchannel2_msg_finish_packet msg;
+       RING_IDX cons;
+
+       while (pfp->prod != pfp->cons &&
+              nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg))) {
+               cons = pfp->cons;
+               msg.id = pfp->ids[pfp->cons % MAX_PENDING_FINISH_PACKETS];
+               pfp->cons++;
+               nc2_send_message(&ncrp->prod_ring,
+                                NETCHANNEL2_MSG_FINISH_PACKET,
+                                0,
+                                &msg,
+                                sizeof(msg));
+       }
+}
+
+/* Add a packet ID to the finish packet queue. The caller should
+   arrange that send_finish_packet_messages is sent soon to flush the
+   requests out. */
+void queue_finish_packet_message(struct netchannel2_ring_pair *ncrp,
+                                uint32_t id, uint8_t flags)
+{
+       struct pending_finish_packets *pfp = &ncrp->pending_finish;
+       RING_IDX prod;
+
+       prod = pfp->prod;
+       pfp->ids[prod % MAX_PENDING_FINISH_PACKETS] = id;
+       pfp->prod++;
+
+       if (flags & NC2_PACKET_FLAG_need_event)
+               ncrp->pending_time_sensitive_messages = 1;
+}
+
+/* Handle a packet message from the other end.  On success, queues the
+   new skb to the pending skb list.  If the packet is invalid, it is
+   discarded without generating a FINISH message. */
+/* Caution: this drops and re-acquires the ring lock. */
+void nc2_handle_packet_msg(struct netchannel2 *nc,
+                          struct netchannel2_ring_pair *ncrp,
+                          struct netchannel2_msg_hdr *hdr,
+                          struct sk_buff_head *pending_rx_queue)
+{
+       unsigned nr_frags;
+       struct netchannel2_msg_packet msg;
+       struct sk_buff *skb;
+       const unsigned frags_off = sizeof(msg);
+       unsigned frags_bytes;
+
+       if (ncrp->pending_finish.prod - ncrp->pending_finish.cons ==
+           MAX_PENDING_FINISH_PACKETS) {
+               pr_debug("Remote endpoint sent too many packets!\n");
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       if (hdr->size < sizeof(msg)) {
+               pr_debug("Packet message too small (%d < %zd)\n", hdr->size,
+                        sizeof(msg));
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       if (hdr->size & 7) {
+               pr_debug("Packet size in ring not multiple of 8: %d\n",
+                        hdr->size);
+               nc->stats.rx_errors++;
+               return;
+       }
+
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+
+       frags_bytes = hdr->size - sizeof(msg) - msg.prefix_size;
+       nr_frags = frags_bytes / sizeof(struct netchannel2_fragment);
+
+       switch (msg.type) {
+       case NC2_PACKET_TYPE_small:
+               if (nr_frags != 0) {
+                       /* Small packets, by definition, have no
+                        * fragments */
+                       pr_debug("Received small packet with %d frags?\n",
+                                nr_frags);
+                       nc->stats.rx_errors++;
+                       return;
+               }
+               /* Any of the receiver functions can handle small
+                  packets as a trivial special case.  Use receiver
+                  copy, since that's the simplest. */
+               skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                 nr_frags, frags_off);
+               /* No finish message */
+               break;
+       case NC2_PACKET_TYPE_receiver_copy:
+               skb = handle_receiver_copy_packet(nc, ncrp, &msg, hdr,
+                                                 nr_frags, frags_off);
+               queue_finish_packet_message(ncrp, msg.id, msg.flags);
+               break;
+       default:
+               pr_debug("Unknown packet type %d\n", msg.type);
+               nc->stats.rx_errors++;
+               skb = NULL;
+               break;
+       }
+       if (skb != NULL) {
+               nc->stats.rx_bytes += skb->len;
+               nc->stats.rx_packets++;
+               skb->dev = nc->net_device;
+
+               if (ncrp->filter_mac &&
+                   skb_headlen(skb) >= sizeof(struct ethhdr) &&
+                   memcmp(((struct ethhdr *)skb->data)->h_source,
+                          ncrp->remote_mac,
+                          ETH_ALEN)) {
+                       /* We're in filter MACs mode and the source
+                          MAC on this packet is wrong.  Drop it. */
+                       /* (We know that any packet big enough to
+                          contain an ethernet header at all will
+                          contain it in the head space because we do
+                          a pull_through at the end of the type
+                          handler.) */
+                       nc->stats.rx_missed_errors++;
+                       goto err;
+               }
+
+               __skb_queue_tail(pending_rx_queue, skb);
+
+               if (ncrp->pending_rx_hypercalls.nr_pending_gops >=
+                   RX_GRANT_COPY_BATCH) {
+                       
flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls,
+                                                   nc2_rscb_on_gntcopy_fail);
+                       /* since receive could generate ACKs to the
+                          start_xmit() function we need to release
+                          the ring lock */
+                       spin_unlock(&ncrp->lock);
+                       /* we should receive the packet as soon as the
+                          copy is complete to benefit from cache
+                          locality */
+                       receive_pending_skbs(pending_rx_queue);
+                       spin_lock(&ncrp->lock);
+
+               }
+
+       }
+       return;
+
+err:
+       /* If the receive succeeded part-way, there may be references
+          to the skb in the hypercall batcher.  Flush them out before
+          we release it.  This is a slow path, so we don't care that
+          much about performance. */
+       flush_prepared_grant_copies(&ncrp->pending_rx_hypercalls,
+                                   nc2_rscb_on_gntcopy_fail);
+
+       /* We may need to send a FINISH message here if this was a
+          receiver-map packet.  That should be handled automatically
+          by the kfree_skb(). */
+       kfree_skb(skb);
+       nc->stats.rx_errors++;
+       return;
+}
+
+/* If there is space on the ring, tell the other end how many packets
+   its allowed to send at one time and clear the
+   need_advertise_max_packets flag. */
+void advertise_max_packets(struct netchannel2_ring_pair *ncrp)
+{
+       struct netchannel2_msg_set_max_packets msg;
+
+       if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, sizeof(msg)))
+               return;
+       msg.max_outstanding_packets = MAX_PENDING_FINISH_PACKETS;
+       nc2_send_message(&ncrp->prod_ring,
+                        NETCHANNEL2_MSG_SET_MAX_PACKETS,
+                        0,
+                        &msg,
+                        sizeof(msg));
+       ncrp->need_advertise_max_packets = 0;
+       ncrp->pending_time_sensitive_messages = 1;
+}
+
+void receive_pending_skbs(struct sk_buff_head *pending_rx_queue)
+{
+       struct sk_buff *skb;
+       struct skb_cb_overlay *sco;
+       while (!skb_queue_empty(pending_rx_queue)) {
+               skb = __skb_dequeue(pending_rx_queue);
+               sco = get_skb_overlay(skb);
+               if (unlikely(sco->failed))
+                       kfree_skb(skb);
+               else {
+                       skb->protocol = eth_type_trans(skb, skb->dev);
+                       netif_receive_skb(skb);
+               }
+       }
+}
+
+
+/* These don't really belong here, but it's as good a place as any. */
+int __init nc2_init(void)
+{
+       return 0;
+}
+
+void __exit nc2_exit(void)
+{
+}
diff --git a/drivers/xen/netchannel2/rscb.c b/drivers/xen/netchannel2/rscb.c
new file mode 100644
index 0000000..8984f90
--- /dev/null
+++ b/drivers/xen/netchannel2/rscb.c
@@ -0,0 +1,385 @@
+/* Receiver-side copy buffer support */
+#include <linux/kernel.h>
+#include <linux/skbuff.h>
+#include <linux/netdevice.h>
+#include <linux/version.h>
+#include <xen/gnttab.h>
+#include <xen/live_maps.h>
+
+#include "netchannel2_core.h"
+
+/* -------------------------- Receive -------------------------------- */
+
+/* This is called whenever an RSCB grant copy fails. */
+void nc2_rscb_on_gntcopy_fail(void *ctxt, gnttab_copy_t *gop)
+{
+       struct sk_buff *skb = ctxt;
+       struct skb_cb_overlay *sco = get_skb_overlay(skb);
+       if (!sco->failed && net_ratelimit())
+               printk(KERN_WARNING "Dropping RX packet because of copy 
error\n");
+       sco->failed = 1;
+}
+
+
+/* Copy @size bytes from @offset in grant ref @gref against domain
+   @domid and shove them on the end of @skb. Fails if it the head
+   does not have enough space or if the copy would span multiple
+   pages. */
+static int nc2_grant_copy(struct netchannel2_ring_pair *ncrp,
+                         struct sk_buff *skb,
+                         unsigned offset,
+                         unsigned size,
+                         grant_ref_t gref,
+                         domid_t domid)
+{
+       gnttab_copy_t *gop;
+       void *tail;
+       void *end;
+
+       if (size > PAGE_SIZE)
+               return 0;
+
+       tail = skb_tail_pointer(skb);
+       end = skb_end_pointer(skb);
+
+       if (unlikely(size > (end-tail)))
+               return 0;
+
+       if (unlikely(offset_in_page(tail) + size > PAGE_SIZE)) {
+               unsigned f1 = PAGE_SIZE - offset_in_page(tail);
+               /* Recursive, but only ever to depth 1, so okay */
+               if (!nc2_grant_copy(ncrp, skb, offset, f1, gref, domid))
+                       return 0;
+               offset += f1;
+               size -= f1;
+               tail += f1;
+       }
+
+       /* Copy this fragment into the header. */
+       gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls,
+                                          skb,
+                                          nc2_rscb_on_gntcopy_fail);
+       gop->flags = GNTCOPY_source_gref;
+       gop->source.domid = domid;
+       gop->source.offset = offset;
+       gop->source.u.ref = gref;
+       gop->dest.domid = DOMID_SELF;
+       gop->dest.offset = offset_in_page(tail);
+       gop->dest.u.gmfn = virt_to_mfn(tail);
+       gop->len = size;
+
+       skb_put(skb, size);
+
+       return 1;
+}
+
+/* We've received a receiver-copy packet message from the remote.
+   Parse it up, build an sk_buff, and return it.  Returns NULL on
+   error. */
+struct sk_buff *handle_receiver_copy_packet(struct netchannel2 *nc,
+                                           struct netchannel2_ring_pair *ncrp,
+                                           struct netchannel2_msg_packet *msg,
+                                           struct netchannel2_msg_hdr *hdr,
+                                           unsigned nr_frags,
+                                           unsigned frags_off)
+{
+       struct netchannel2_fragment frag;
+       unsigned nr_bytes;
+       unsigned x;
+       struct sk_buff *skb;
+       unsigned skb_headsize;
+       int first_frag, first_frag_size;
+       gnttab_copy_t *gop;
+       struct skb_shared_info *shinfo;
+       struct page *new_page;
+
+       if (msg->prefix_size > NETCHANNEL2_MAX_INLINE_BYTES) {
+               pr_debug("Inline prefix too big! (%d > %d)\n",
+                        msg->prefix_size, NETCHANNEL2_MAX_INLINE_BYTES);
+               return NULL;
+       }
+
+       /* Count the number of bytes in the packet.  Be careful: the
+          other end can still access the packet on the ring, so the
+          size could change later. */
+       nr_bytes = msg->prefix_size;
+       for (x = 0; x < nr_frags; x++) {
+               fetch_fragment(ncrp, x, &frag, frags_off);
+               nr_bytes += frag.size;
+       }
+       if (nr_bytes > NETCHANNEL2_MAX_PACKET_BYTES) {
+               pr_debug("Packet too big! (%d > %d)\n", nr_bytes,
+                        NETCHANNEL2_MAX_PACKET_BYTES);
+               return NULL;
+       }
+       if (nr_bytes < 64) {
+               /* Linux sometimes has problems with very small SKBs.
+                  Impose a minimum size of 64 bytes. */
+               nr_bytes = 64;
+       }
+
+       first_frag = 0;
+       if (nr_frags > 0) {
+               fetch_fragment(ncrp, 0, &frag, frags_off);
+               first_frag_size = frag.size;
+               first_frag = 1;
+       } else {
+               first_frag_size = 0;
+               first_frag = 0;
+       }
+
+       /* We try to have both prefix and the first frag in the skb head
+          if they do not exceed the page size */
+       skb_headsize = msg->prefix_size + first_frag_size + NET_IP_ALIGN;
+       if (skb_headsize >
+           ((PAGE_SIZE - sizeof(struct skb_shared_info) - NET_SKB_PAD) &
+            ~(SMP_CACHE_BYTES - 1))) {
+               skb_headsize = msg->prefix_size + NET_IP_ALIGN;
+               first_frag = 0;
+       }
+
+       skb = dev_alloc_skb(skb_headsize);
+       if (!skb) {
+               /* Drop the packet. */
+               pr_debug("Couldn't allocate a %d byte skb.\n", nr_bytes);
+               nc->stats.rx_dropped++;
+               return NULL;
+       }
+
+       /* Arrange that the IP header is nicely aligned in memory. */
+       skb_reserve(skb, NET_IP_ALIGN);
+
+       /* The inline prefix should always fit in the SKB head. */
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              skb_put(skb, msg->prefix_size),
+                              msg->prefix_size,
+                              frags_off + nr_frags * sizeof(frag));
+
+       /* copy first frag into skb head if it does not cross a
+          page boundary */
+       if (first_frag == 1) {
+               fetch_fragment(ncrp, 0, &frag, frags_off);
+               if (!nc2_grant_copy(ncrp, skb, frag.off, frag.size,
+                                   frag.receiver_copy.gref,
+                                   ncrp->otherend_id)) {
+                       get_skb_overlay(skb)->failed = 1;
+                       return skb;
+               }
+       }
+
+       shinfo = skb_shinfo(skb);
+       for (x = first_frag; x < nr_frags; x++) {
+               fetch_fragment(ncrp, x, &frag, frags_off);
+
+               /* Allocate a new page for the fragment */
+               new_page = alloc_page(GFP_ATOMIC);
+               if (!new_page) {
+                       get_skb_overlay(skb)->failed = 1;
+                       return skb;
+               }
+
+               gop = hypercall_batcher_grant_copy(&ncrp->pending_rx_hypercalls,
+                                                  skb,
+                                                  nc2_rscb_on_gntcopy_fail);
+               gop->flags = GNTCOPY_source_gref;
+               gop->source.domid = ncrp->otherend_id;
+               gop->source.offset = frag.off;
+               gop->source.u.ref = frag.receiver_copy.gref;
+               gop->dest.domid = DOMID_SELF;
+               gop->dest.offset = 0;
+               gop->dest.u.gmfn = pfn_to_mfn(page_to_pfn(new_page));
+               gop->len = frag.size;
+
+               shinfo->frags[x-first_frag].page = new_page;
+               shinfo->frags[x-first_frag].page_offset = 0;
+               shinfo->frags[x-first_frag].size = frag.size;
+               shinfo->nr_frags++;
+
+               skb->truesize += frag.size;
+               skb->data_len += frag.size;
+               skb->len += frag.size;
+       }
+       return skb;
+}
+
+
+
+/* ------------------------------- Transmit ---------------------------- */
+
+struct grant_packet_plan {
+       volatile struct netchannel2_fragment *out_fragment;
+       grant_ref_t gref_pool;
+       unsigned prefix_avail;
+};
+
+static inline int nfrags_skb(struct sk_buff *skb, int prefix_size)
+{
+       unsigned long start_grant;
+       unsigned long end_grant;
+
+       if (skb_headlen(skb) <= prefix_size)
+               return skb_shinfo(skb)->nr_frags;
+
+       start_grant = ((unsigned long)skb->data + prefix_size) &
+               ~(PAGE_SIZE-1);
+       end_grant = ((unsigned long)skb->data +
+                    skb_headlen(skb) +  PAGE_SIZE - 1) &
+               ~(PAGE_SIZE-1);
+       return ((end_grant - start_grant) >> PAGE_SHIFT)
+               + skb_shinfo(skb)->nr_frags;
+}
+
+int prepare_xmit_allocate_grant(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       unsigned nr_fragments;
+       grant_ref_t gref_pool;
+       int err;
+       unsigned inline_prefix_size;
+
+       if (allocate_txp_slot(ncrp, skb) < 0)
+               return -1;
+
+       /* We're going to have to get the remote to issue a grant copy
+          hypercall anyway, so there's no real benefit to shoving the
+          headers inline. */
+       /* (very small packets won't go through here, so there's no
+          chance that we could completely eliminate the grant
+          copy.) */
+       inline_prefix_size = sizeof(struct ethhdr);
+
+       if (skb_co->nr_fragments == 0) {
+               nr_fragments = nfrags_skb(skb, inline_prefix_size);
+
+               /* No-fragments packets should be policy small, not
+                * policy grant. */
+               BUG_ON(nr_fragments == 0);
+
+               skb_co->nr_fragments = nr_fragments;
+       }
+
+       /* Grab the grant references. */
+       err = gnttab_suballoc_grant_references(skb_co->nr_fragments,
+                                              &ncrp->gref_pool,
+                                              &gref_pool);
+       if (err < 0) {
+               release_txp_slot(ncrp, skb);
+               /* Leave skb_co->nr_fragments set, so that we don't
+                  have to recompute it next time around. */
+               return -1;
+       }
+       skb_co->gref_pool = gref_pool;
+       skb_co->inline_prefix_size = inline_prefix_size;
+
+       skb_co->type = NC2_PACKET_TYPE_receiver_copy;
+
+       return 0;
+}
+
+static void prepare_subpage_grant(struct netchannel2_ring_pair *ncrp,
+                                 struct page *page,
+                                 unsigned off_in_page,
+                                 unsigned size,
+                                 struct grant_packet_plan *plan)
+{
+       volatile struct netchannel2_fragment *frag;
+       domid_t trans_domid;
+       grant_ref_t trans_gref;
+       grant_ref_t gref;
+
+       if (size <= plan->prefix_avail) {
+               /* This fragment is going to be inline -> nothing to
+                * do. */
+               plan->prefix_avail -= size;
+               return;
+       }
+       if (plan->prefix_avail > 0) {
+               /* Part inline, part in payload. */
+               size -= plan->prefix_avail;
+               off_in_page += plan->prefix_avail;
+               plan->prefix_avail = 0;
+       }
+       frag = plan->out_fragment;
+       gref = gnttab_claim_grant_reference(&plan->gref_pool);
+       frag->receiver_copy.gref = gref;
+       if (page_is_tracked(page)) {
+               lookup_tracker_page(page, &trans_domid, &trans_gref);
+               gnttab_grant_foreign_access_ref_trans(gref,
+                                                     ncrp->otherend_id,
+                                                     GTF_readonly,
+                                                     trans_domid,
+                                                     trans_gref);
+       } else {
+               gnttab_grant_foreign_access_ref_subpage(gref,
+                                                       ncrp->otherend_id,
+                                                       
virt_to_mfn(page_address(page)),
+                                                       GTF_readonly,
+                                                       off_in_page,
+                                                       size);
+       }
+
+       frag->off = off_in_page;
+       frag->size = size;
+       plan->out_fragment++;
+}
+
+static int grant_data_area(struct netchannel2_ring_pair *ncrp,
+                          struct sk_buff *skb,
+                          struct grant_packet_plan *plan)
+{
+       void *ptr = skb->data;
+       unsigned len = skb_headlen(skb);
+       unsigned off;
+       unsigned this_time;
+
+       for (off = 0; off < len; off += this_time) {
+               this_time = len - off;
+               if (this_time + offset_in_page(ptr + off) > PAGE_SIZE)
+                       this_time = PAGE_SIZE - offset_in_page(ptr + off);
+               prepare_subpage_grant(ncrp,
+                                     virt_to_page(ptr + off),
+                                     offset_in_page(ptr + off),
+                                     this_time,
+                                     plan);
+       }
+       return 0;
+}
+
+void xmit_grant(struct netchannel2_ring_pair *ncrp,
+               struct sk_buff *skb,
+               volatile void *msg_buf)
+{
+       volatile struct netchannel2_msg_packet *msg = msg_buf;
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct grant_packet_plan plan;
+       unsigned x;
+       struct skb_shared_info *shinfo;
+       skb_frag_t *frag;
+
+       memset(&plan, 0, sizeof(plan));
+       plan.prefix_avail = skb_co->inline_prefix_size;
+       plan.out_fragment = msg->frags;
+       plan.gref_pool = skb_co->gref_pool;
+
+       ncrp->count_frags_no_event += skb_co->nr_fragments;
+       if (ncrp->count_frags_no_event >= ncrp->max_count_frags_no_event) {
+               msg->flags |= NC2_PACKET_FLAG_need_event;
+               ncrp->count_frags_no_event = 0;
+       }
+
+       grant_data_area(ncrp, skb, &plan);
+
+       shinfo = skb_shinfo(skb);
+       for (x = 0; x < shinfo->nr_frags; x++) {
+               frag = &shinfo->frags[x];
+               prepare_subpage_grant(ncrp,
+                                     frag->page,
+                                     frag->page_offset,
+                                     frag->size,
+                                     &plan);
+       }
+
+       skb_co->nr_fragments = plan.out_fragment - msg->frags;
+}
+
diff --git a/drivers/xen/netchannel2/util.c b/drivers/xen/netchannel2/util.c
new file mode 100644
index 0000000..302dfc1
--- /dev/null
+++ b/drivers/xen/netchannel2/util.c
@@ -0,0 +1,230 @@
+#include <linux/kernel.h>
+#include <linux/list.h>
+#include <linux/skbuff.h>
+#include <linux/version.h>
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+#include <xen/driver_util.h>
+#endif
+#include <xen/gnttab.h>
+#include "netchannel2_core.h"
+
+int allocate_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp;
+
+       BUG_ON(skb_co->tp);
+
+       if (ncrp->head_free_tx_packet == INVALID_TXP_INDEX ||
+           ncrp->nr_tx_packets_outstanding ==
+                   ncrp->max_tx_packets_outstanding) {
+               return -1;
+       }
+
+       tp = &ncrp->tx_packets[ncrp->head_free_tx_packet];
+       ncrp->head_free_tx_packet = txp_get_next_free(tp);
+
+       txp_set_skb(tp, skb);
+       skb_co->tp = tp;
+       ncrp->nr_tx_packets_outstanding++;
+       return 0;
+}
+
+static void nc2_free_skb(struct netchannel2 *nc,
+                        struct sk_buff *skb)
+{
+       dev_kfree_skb(skb);
+}
+
+void release_txp_slot(struct netchannel2_ring_pair *ncrp,
+                     struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp = skb_co->tp;
+
+       BUG_ON(txp_get_skb(tp) != skb);
+
+       /* Try to keep the free TX packet list in order as far as
+        * possible, since that gives slightly better cache behaviour.
+        * It's not worth spending a lot of effort getting this right,
+        * though, so just use a simple heuristic: if we're freeing a
+        * packet, and the previous packet is already free, chain this
+        * packet directly after it, rather than putting it at the
+        * head of the list.  This isn't perfect by any means, but
+        * it's enough that you get nice long runs of contiguous
+        * packets in the free list, and that's all we really need.
+        * Runs much bigger than a cache line aren't really very
+        * useful, anyway. */
+       if (tp != ncrp->tx_packets && !txp_slot_in_use(tp - 1)) {
+               txp_set_next_free(tp, txp_get_next_free(tp - 1));
+               txp_set_next_free(tp - 1, tp - ncrp->tx_packets);
+       } else {
+               txp_set_next_free(tp, ncrp->head_free_tx_packet);
+               ncrp->head_free_tx_packet = tp - ncrp->tx_packets;
+       }
+       skb_co->tp = NULL;
+       ncrp->nr_tx_packets_outstanding--;
+}
+
+void release_tx_packet(struct netchannel2_ring_pair *ncrp,
+                      struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct txp_slot *tp = skb_co->tp;
+       grant_ref_t gref;
+       int r;
+       unsigned cntr;
+
+       if (skb_co->type == NC2_PACKET_TYPE_receiver_copy) {
+               while (1) {
+                       r = gnttab_claim_grant_reference(&skb_co->gref_pool);
+                       if (r == -ENOSPC)
+                               break;
+                       gref = (grant_ref_t)r;
+                       /* It's a subpage grant reference, so Xen
+                          guarantees to release it quickly.  Sit and
+                          wait for it to do so. */
+                       cntr = 0;
+                       while (!gnttab_end_foreign_access_ref(gref)) {
+                               cpu_relax();
+                               if (++cntr % 65536 == 0)
+                                       printk(KERN_WARNING "Having trouble 
ending gref %d for receiver copy.\n",
+                                              gref);
+                       }
+                       gnttab_release_grant_reference(&ncrp->gref_pool, gref);
+               }
+       } else if (skb_co->gref_pool != 0) {
+               gnttab_subfree_grant_references(skb_co->gref_pool,
+                                               &ncrp->gref_pool);
+       }
+
+       if (tp != NULL)
+               release_txp_slot(ncrp, skb);
+
+       nc2_free_skb(ncrp->interface, skb);
+}
+
+void fetch_fragment(struct netchannel2_ring_pair *ncrp,
+                   unsigned idx,
+                   struct netchannel2_fragment *frag,
+                   unsigned off)
+{
+       nc2_copy_from_ring_off(&ncrp->cons_ring,
+                              frag,
+                              sizeof(*frag),
+                              off + idx * sizeof(*frag));
+}
+
+/* Copy @count bytes from the skb's data area into its head, updating
+ * the pointers as appropriate.         The caller should ensure that there
+ * is actually enough space in the head. */
+void pull_through(struct sk_buff *skb, unsigned count)
+{
+       unsigned frag = 0;
+       unsigned this_frag;
+       void *buf;
+       void *va;
+
+       while (count != 0 && frag < skb_shinfo(skb)->nr_frags) {
+               this_frag = skb_shinfo(skb)->frags[frag].size;
+               if (this_frag > count)
+                       this_frag = count;
+               va = page_address(skb_shinfo(skb)->frags[frag].page);
+               buf = skb->tail;
+               memcpy(buf, va + skb_shinfo(skb)->frags[frag].page_offset,
+                      this_frag);
+               skb->tail += this_frag;
+               BUG_ON(skb->tail > skb->end);
+               skb_shinfo(skb)->frags[frag].size -= this_frag;
+               skb_shinfo(skb)->frags[frag].page_offset += this_frag;
+               skb->data_len -= this_frag;
+               count -= this_frag;
+               frag++;
+       }
+       for (frag = 0;
+            frag < skb_shinfo(skb)->nr_frags &&
+                    skb_shinfo(skb)->frags[frag].size == 0;
+            frag++) {
+               put_page(skb_shinfo(skb)->frags[frag].page);
+       }
+       skb_shinfo(skb)->nr_frags -= frag;
+       memmove(skb_shinfo(skb)->frags,
+               skb_shinfo(skb)->frags+frag,
+               sizeof(skb_shinfo(skb)->frags[0]) *
+               skb_shinfo(skb)->nr_frags);
+}
+
+#ifdef CONFIG_XEN_NETDEV2_BACKEND
+
+/* Zap a grant_mapping structure, releasing all mappings and the
+   reserved virtual address space.  Prepare the grant_mapping for
+   re-use. */
+void nc2_unmap_grants(struct grant_mapping *gm)
+{
+       struct gnttab_unmap_grant_ref op[MAX_GRANT_MAP_PAGES];
+       int i;
+
+       if (gm->mapping == NULL)
+               return;
+       for (i = 0; i < gm->nr_pages; i++) {
+               gnttab_set_unmap_op(&op[i],
+                                   (unsigned long)gm->mapping->addr +
+                                           i * PAGE_SIZE,
+                                   GNTMAP_host_map,
+                                   gm->handles[i]);
+       }
+       if (HYPERVISOR_grant_table_op(GNTTABOP_unmap_grant_ref, op, i))
+               BUG();
+       free_vm_area(gm->mapping);
+       memset(gm, 0, sizeof(*gm));
+}
+
+int nc2_map_grants(struct grant_mapping *gm,
+                  const grant_ref_t *grefs,
+                  unsigned nr_grefs,
+                  domid_t remote_domain)
+{
+       struct grant_mapping work;
+       struct gnttab_map_grant_ref op[MAX_GRANT_MAP_PAGES];
+       int i;
+
+       memset(&work, 0, sizeof(work));
+
+       if (nr_grefs > MAX_GRANT_MAP_PAGES || nr_grefs == 0)
+               return -EINVAL;
+
+       if (nr_grefs & (nr_grefs-1)) {
+               /* Must map a power-of-two number of pages. */
+               return -EINVAL;
+       }
+
+       work.nr_pages = nr_grefs;
+       work.mapping = alloc_vm_area(PAGE_SIZE * work.nr_pages);
+       if (!work.mapping)
+               return -ENOMEM;
+       for (i = 0; i < nr_grefs; i++)
+               gnttab_set_map_op(&op[i],
+                                 (unsigned long)work.mapping->addr +
+                                         i * PAGE_SIZE,
+                                 GNTMAP_host_map,
+                                 grefs[i],
+                                 remote_domain);
+
+       if (HYPERVISOR_grant_table_op(GNTTABOP_map_grant_ref, op, nr_grefs))
+               BUG();
+
+       for (i = 0; i < nr_grefs; i++) {
+               if (op[i].status) {
+                       work.nr_pages = i;
+                       nc2_unmap_grants(&work);
+                       return -EFAULT;
+               }
+               work.handles[i] = op[i].handle;
+       }
+
+       nc2_unmap_grants(gm);
+       *gm = work;
+       return 0;
+}
+#endif
diff --git a/drivers/xen/netchannel2/xmit_packet.c 
b/drivers/xen/netchannel2/xmit_packet.c
new file mode 100644
index 0000000..92fbabf
--- /dev/null
+++ b/drivers/xen/netchannel2/xmit_packet.c
@@ -0,0 +1,318 @@
+/* Things related to actually sending packet messages, and which is
+   shared across all transmit modes. */
+#include <linux/kernel.h>
+#include <linux/version.h>
+#include "netchannel2_core.h"
+
+/* We limit the number of transmitted packets which can be in flight
+   at any one time, as a somewhat paranoid safety catch. */
+#define MAX_TX_PACKETS MAX_PENDING_FINISH_PACKETS
+
+static enum transmit_policy transmit_policy(struct netchannel2 *nc,
+                                           struct sk_buff *skb)
+{
+       if (skb->len <= PACKET_PREFIX_SIZE && !skb_is_nonlinear(skb))
+               return transmit_policy_small;
+       else
+               return transmit_policy_grant;
+}
+
+/* Allocate resources for a small packet.  The entire thing will be
+   transmitted in the ring.  This is only called for small, linear
+   SKBs.  It always succeeds, but has an int return type for symmetry
+   with the other prepare_xmit_*() functions. */
+int prepare_xmit_allocate_small(struct netchannel2_ring_pair *ncrp,
+                               struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+
+       BUG_ON(skb_is_nonlinear(skb));
+       BUG_ON(skb->len > NETCHANNEL2_MAX_INLINE_BYTES);
+
+       skb_co->type = NC2_PACKET_TYPE_small;
+       skb_co->gref_pool = 0;
+       skb_co->inline_prefix_size = skb->len;
+
+       return 0;
+}
+
+/* Figure out how much space @tp will take up on the ring. */
+static unsigned get_transmitted_packet_msg_size(struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       return (sizeof(struct netchannel2_msg_packet) +
+               sizeof(struct netchannel2_fragment) * skb_co->nr_fragments +
+               skb_co->inline_prefix_size + 7) & ~7;
+}
+
+/* Do the minimum amount of work to be certain that when we come to
+   transmit this packet we won't run out of resources. This includes
+   figuring out how we're going to fragment the packet for
+   transmission, which buffers we're going to use, etc. Return <0 if
+   insufficient resources are available right now, or 0 if we
+   succeed. */
+/* Careful: this may allocate e.g. a TXP slot and then discover that
+   it can't reserve ring space.  In that case, the TXP remains
+   allocated.  The expected case is that the caller will arrange for
+   us to retry the allocation later, in which case we'll pick up the
+   already-allocated buffers. */
+int prepare_xmit_allocate_resources(struct netchannel2 *nc,
+                                   struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       enum transmit_policy policy;
+       unsigned msg_size;
+       int r;
+
+       if (skb_co->policy == transmit_policy_unknown) {
+               policy = transmit_policy(nc, skb);
+               switch (policy) {
+               case transmit_policy_small:
+                       r = prepare_xmit_allocate_small(&nc->rings, skb);
+                       break;
+               case transmit_policy_grant:
+                       r = prepare_xmit_allocate_grant(&nc->rings, skb);
+                       break;
+               default:
+                       BUG();
+                       /* Shut the compiler up. */
+                       r = -1;
+               }
+               if (r < 0)
+                       return r;
+               skb_co->policy = policy;
+       }
+
+       msg_size = get_transmitted_packet_msg_size(skb);
+       if (nc2_reserve_payload_bytes(&nc->rings.prod_ring, msg_size))
+               return 0;
+
+       return -1;
+}
+
+/* Transmit a packet which has previously been prepared with
+   prepare_xmit_allocate_resources(). */
+/* Once this has been called, the ring must not be flushed until the
+   TX hypercall batcher is (assuming this ring has a hypercall
+   batcher). */
+int nc2_really_start_xmit(struct netchannel2_ring_pair *ncrp,
+                         struct sk_buff *skb)
+{
+       struct skb_cb_overlay *skb_co = get_skb_overlay(skb);
+       struct netchannel2 *nc = ncrp->interface;
+       unsigned msg_size;
+       volatile struct netchannel2_msg_packet *msg;
+
+       msg_size = get_transmitted_packet_msg_size(skb);
+       /* Un-reserve the space we reserved for the packet. */
+       BUG_ON(ncrp->prod_ring.reserve < msg_size);
+       ncrp->prod_ring.reserve -= msg_size;
+       if (!nc2_can_send_payload_bytes(&ncrp->prod_ring, msg_size)) {
+               /* Aw, crud.  We had to transmit a PAD message at just
+                  the wrong time, and our attempt to reserve ring
+                  space failed.  Delay transmiting this packet
+                  Make sure we redo the space reserve */
+               ncrp->prod_ring.reserve += msg_size;
+               return 0;
+       }
+       __nc2_avoid_ring_wrap(&ncrp->prod_ring, msg_size);
+
+       /* Set up part of the message.  We do the message header
+          itself and the inline prefix.  The individual xmit_*
+          methods are responsible for the fragments.  They may also
+          set some more msg flags. */
+       msg = __nc2_get_message_ptr(&ncrp->prod_ring);
+       msg->hdr.type = NETCHANNEL2_MSG_PACKET;
+       msg->hdr.flags = 0;
+       msg->id = skb_co->tp - ncrp->tx_packets;
+       msg->type = skb_co->type;
+       msg->flags = 0;
+       msg->prefix_size = skb_co->inline_prefix_size;
+
+       /* We cast away the volatile to avoid compiler warnings, and
+          then use barrier()s to discourage gcc from using msg->frags
+          in CSE or somesuch.  It's kind of unlikely that it would,
+          but better to make sure. */
+       barrier();
+       memcpy((void *)(msg->frags + skb_co->nr_fragments),
+              skb->data,
+              skb_co->inline_prefix_size);
+       barrier();
+
+       switch (skb_co->policy) {
+       case transmit_policy_small:
+               /* Nothing to do */
+               break;
+       case transmit_policy_grant:
+               xmit_grant(ncrp, skb, msg);
+               break;
+       default:
+               BUG();
+       }
+
+       /* The transmission method may have decided not to use all the
+          fragments it reserved, which changes the message size. */
+       msg_size = get_transmitted_packet_msg_size(skb);
+       msg->hdr.size = msg_size;
+
+       ncrp->prod_ring.prod_pvt += msg_size;
+
+       BUG_ON(ncrp->prod_ring.bytes_available < msg_size);
+
+       ncrp->prod_ring.bytes_available -= msg_size;
+
+       ncrp->pending_time_sensitive_messages = 1;
+
+       if (skb_co->tp) {
+               ncrp->expected_finish_messages++;
+               /* We're now ready to accept a FINISH message for this
+                  packet. */
+               skb_co->expecting_finish = 1;
+       } else {
+               /* This packet doesn't need a FINISH message.  Queue
+                  it up to be released as soon as we flush the
+                  hypercall batcher and the ring. */
+               nc->stats.tx_bytes += skb->len;
+               nc->stats.tx_packets++;
+               __skb_queue_tail(&ncrp->release_on_flush_batcher, skb);
+       }
+
+       return 1;
+}
+
+/* Arrange that @skb will be sent on ring @ncrp soon.  Assumes that
+   prepare_xmit_allocate_resources() has been successfully called on
+   @skb already. */
+void queue_packet_to_interface(struct sk_buff *skb,
+                              struct netchannel2_ring_pair *ncrp)
+{
+       __skb_queue_tail(&ncrp->pending_tx_queue, skb);
+       if (ncrp->pending_tx_queue.qlen == 1)
+               nc2_kick(ncrp);
+}
+
+int nc2_start_xmit(struct sk_buff *skb, struct net_device *dev)
+{
+       struct netchannel2 *nc = netdev_priv(dev);
+       struct skb_cb_overlay *sco = get_skb_overlay(skb);
+       int r;
+
+       memset(sco, 0, sizeof(*sco));
+
+       spin_lock_bh(&nc->rings.lock);
+
+       if (!nc->rings.is_attached) {
+               spin_unlock_bh(&nc->rings.lock);
+               dev_kfree_skb(skb);
+               nc->stats.tx_dropped++;
+               return NETDEV_TX_OK;
+       }
+
+       r = prepare_xmit_allocate_resources(nc, skb);
+       if (r < 0)
+               goto out_busy;
+       queue_packet_to_interface(skb, &nc->rings);
+       spin_unlock_bh(&nc->rings.lock);
+
+       return NETDEV_TX_OK;
+
+out_busy:
+       /* Some more buffers may have arrived, so kick the worker
+        * thread to go and have a look. */
+       nc2_kick(&nc->rings);
+
+       __skb_queue_tail(&nc->pending_skbs, skb);
+       nc->is_stopped = 1;
+       netif_stop_queue(dev);
+       spin_unlock_bh(&nc->rings.lock);
+       return NETDEV_TX_OK;
+}
+
+
+void nc2_handle_finish_packet_msg(struct netchannel2 *nc,
+                                 struct netchannel2_ring_pair *ncrp,
+                                 struct netchannel2_msg_hdr *hdr)
+{
+       struct skb_cb_overlay *sco;
+       struct netchannel2_msg_finish_packet msg;
+       struct txp_slot *tp;
+       struct sk_buff *skb;
+
+       if (hdr->size < sizeof(msg)) {
+               pr_debug("Packet finish message had strange size %d\n",
+                        hdr->size);
+               return;
+       }
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+       if (msg.id > NR_TX_PACKETS) {
+               pr_debug("Other end tried to end bad packet id %d\n",
+                        msg.id);
+               return;
+       }
+       tp = &ncrp->tx_packets[msg.id];
+       skb = txp_get_skb(tp);
+       if (!skb) {
+               pr_debug("Other end tried to end packet id %d which wasn't in 
use\n",
+                        msg.id);
+               return;
+       }
+       sco = get_skb_overlay(skb);
+       /* Careful: if the remote is malicious, they may try to end a
+          packet after we allocate it but before we send it (e.g. if
+          we've had to back out because we didn't have enough ring
+          space). */
+       if (!sco->expecting_finish) {
+               pr_debug("Other end finished packet before we sent it?\n");
+               return;
+       }
+       nc->stats.tx_bytes += skb->len;
+       nc->stats.tx_packets++;
+       release_tx_packet(ncrp, skb);
+       ncrp->expected_finish_messages--;
+}
+
+
+/* ------------------------ Control-path operations ---------------------- */
+void nc2_handle_set_max_packets_msg(struct netchannel2_ring_pair *ncrp,
+                                   struct netchannel2_msg_hdr *hdr)
+{
+       struct netchannel2_msg_set_max_packets msg;
+
+       if (hdr->size != sizeof(msg)) {
+               pr_debug("Set max packets message had strange size %d\n",
+                        hdr->size);
+               return;
+       }
+       if (ncrp->max_tx_packets_outstanding != 0) {
+               pr_debug("Other end tried to change number of outstanding 
packets from %d.\n",
+                        ncrp->max_tx_packets_outstanding);
+               return;
+       }
+       nc2_copy_from_ring(&ncrp->cons_ring, &msg, sizeof(msg));
+       /* Limit the number of outstanding packets to something sane.
+          This is a little bit paranoid (it should be safe to set
+          this arbitrarily high), but limiting it avoids nasty
+          surprises in untested configurations. */
+       if (msg.max_outstanding_packets > MAX_TX_PACKETS) {
+               pr_debug("Other end tried to set max outstanding to %d, 
limiting to %d.\n",
+                        msg.max_outstanding_packets, MAX_TX_PACKETS);
+               ncrp->max_tx_packets_outstanding = MAX_TX_PACKETS;
+       } else {
+               ncrp->max_tx_packets_outstanding = msg.max_outstanding_packets;
+       }
+}
+
+/* Release all packets on the transmitted and pending_tx lists. */
+void drop_pending_tx_packets(struct netchannel2_ring_pair *ncrp)
+{
+       struct sk_buff *skb;
+       unsigned x;
+
+       nc2_queue_purge(ncrp, &ncrp->pending_tx_queue);
+       for (x = 0; x < NR_TX_PACKETS; x++) {
+               skb = txp_get_skb(&ncrp->tx_packets[x]);
+               if (skb)
+                       release_tx_packet(ncrp, skb);
+       }
+}
+
diff --git a/include/xen/interface/io/netchannel2.h 
b/include/xen/interface/io/netchannel2.h
new file mode 100644
index 0000000..c45963e
--- /dev/null
+++ b/include/xen/interface/io/netchannel2.h
@@ -0,0 +1,106 @@
+#ifndef __NETCHANNEL2_H__
+#define __NETCHANNEL2_H__
+
+#include <xen/interface/io/uring.h>
+
+/* Tell the other end how many packets its allowed to have
+ * simultaneously outstanding for transmission.         An endpoint must not
+ * send PACKET messages which would take it over this limit.
+ *
+ * The SET_MAX_PACKETS message must be sent before any PACKET
+ * messages.  It should only be sent once, unless the ring is
+ * disconnected and reconnected.
+ */
+#define NETCHANNEL2_MSG_SET_MAX_PACKETS 1
+struct netchannel2_msg_set_max_packets {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t max_outstanding_packets;
+};
+
+/* Pass a packet to the other end.  The packet consists of a header,
+ * followed by a bunch of fragment descriptors, followed by an inline
+ * packet prefix.  Every fragment descriptor in a packet must be the
+ * same type, and the type is determined by the header.         The receiving
+ * endpoint should respond with a finished_packet message as soon as
+ * possible.  The prefix may be no more than
+ * NETCHANNEL2_MAX_INLINE_BYTES.  Packets may contain no more than
+ * NETCHANNEL2_MAX_PACKET_BYTES bytes of data, including all fragments
+ * and the prefix.
+ */
+#define NETCHANNEL2_MSG_PACKET 2
+#define NETCHANNEL2_MAX_PACKET_BYTES 65536
+#define NETCHANNEL2_MAX_INLINE_BYTES 256
+struct netchannel2_fragment {
+       uint16_t size;
+       /* The offset is always relative to the start of the page.
+          For pre_posted packet types, it is not relative to the
+          start of the buffer (although the fragment range will
+          obviously be within the buffer range). */
+       uint16_t off;
+       union {
+               struct {
+                       grant_ref_t gref;
+               } receiver_copy;
+       };
+};
+struct netchannel2_msg_packet {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t id; /* Opaque ID which is echoed into the finished
+                       packet message. */
+       uint8_t type;
+       uint8_t flags;
+       uint8_t pad0;
+       uint8_t pad1;
+       uint16_t prefix_size;
+       uint16_t pad2;
+       uint16_t pad3;
+       uint16_t pad4;
+       /* Variable-size array.  The number of elements is determined
+          by the size of the message. */
+       /* Until we support scatter-gather, this will be either 0 or 1
+          element. */
+       struct netchannel2_fragment frags[0];
+};
+
+/* If set, the transmitting domain requires an event urgently when
+ * this packet's finish message is sent.  Otherwise, the event can be
+ * delayed. */
+#define NC2_PACKET_FLAG_need_event 8
+
+/* The mechanism which should be used to receive the data part of
+ * a packet:
+ *
+ * receiver_copy -- The transmitting domain has granted the receiving
+ *                 domain access to the original RX buffers using
+ *                 copy-only grant references.  The receiving domain
+ *                 should copy the data out of the buffers and issue
+ *                 a FINISH message.
+ *
+ *                 Due to backend bugs, it is in not safe to use this
+ *                 packet type except on bypass rings.
+ *
+ * small -- The packet does not have any fragment descriptors
+ *         (i.e. the entire thing is inline in the ring).  The receiving
+ *         domain should simply the copy the packet out of the ring
+ *         into a locally allocated buffer.  No FINISH message is required
+ *         or allowed.
+ *
+ *         This packet type may be used on any ring.
+ *
+ * All endpoints must be able to receive all packet types, but note
+ * that it is correct to treat receiver_map and small packets as
+ * receiver_copy ones. */
+#define NC2_PACKET_TYPE_receiver_copy 1
+#define NC2_PACKET_TYPE_small 4
+
+/* Tell the other end that we're finished with a message it sent us,
+   and it can release the transmit buffers etc.         This must be sent in
+   response to receiver_copy and receiver_map packets. It must not be
+   sent in response to pre_posted or small packets. */
+#define NETCHANNEL2_MSG_FINISH_PACKET 3
+struct netchannel2_msg_finish_packet {
+       struct netchannel2_msg_hdr hdr;
+       uint32_t id;
+};
+
+#endif /* !__NETCHANNEL2_H__ */
diff --git a/include/xen/interface/io/uring.h b/include/xen/interface/io/uring.h
new file mode 100644
index 0000000..663c3d7
--- /dev/null
+++ b/include/xen/interface/io/uring.h
@@ -0,0 +1,426 @@
+#ifndef __XEN_PUBLIC_IO_URING_H__
+#define __XEN_PUBLIC_IO_URING_H__
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <asm/system.h>
+
+typedef unsigned RING_IDX;
+
+#define NETCHANNEL2_MSG_PAD 255
+
+/* The sring structures themselves.     The _cons and _prod variants are
+   different views of the same bit of shared memory, and are supposed
+   to provide better checking of the expected use patterns.     Fields in
+   the shared ring are owned by either the producer end or the
+   consumer end.  If a field is owned by your end, the other end will
+   never modify it.     If it's owned by the other end, the other end is
+   allowed to modify it whenever it likes, and you can never do so.
+
+   Fields owned by the other end are always const (because you can't
+   change them).  They're also volatile, because there are a bunch
+   of places where we go:
+
+   local_x = sring->x;
+   validate(local_x);
+   use(local_x);
+
+   and it would be very bad if the compiler turned that into:
+
+   local_x = sring->x;
+   validate(sring->x);
+   use(local_x);
+
+   because that contains a potential TOCTOU race (hard to exploit, but
+   still present).     The compiler is only allowed to do that
+   optimisation because it knows that local_x == sring->x at the start
+   of the call to validate(), and it only knows that if it can reorder
+   the read of sring->x over the sequence point at the end of the
+   first statement.     In other words, it can only do the bad
+   optimisation if it knows that reads of sring->x are side-effect
+   free.  volatile stops it from making that assumption.
+
+   We don't need a full memory barrier here, because it's sufficient
+   to copy the volatile data into stable guest-local storage, and
+   volatile achieves that.     i.e. we don't need local_x to be precisely
+   sring->x, but we do need it to be a stable snapshot of some
+   previous valud of sring->x.
+
+   Note that there are still plenty of other places where we *do* need
+   full barriers.  volatile just deals with this one, specific, case.
+
+   We could also deal with it by putting compiler barriers in all over
+   the place.  The downside of that approach is that you need to put
+   the barrier()s in lots of different places (basically, everywhere
+   which needs to access these fields), and it's easy to forget one.
+   barrier()s also have somewhat heavier semantics than volatile
+   (because they prevent all reordering, rather than just reordering
+   on this one field), although that's pretty much irrelevant because
+   gcc usually treats pretty much any volatile access as a call to
+   barrier().
+*/
+
+/* Messages are sent over sring pairs. Each sring in a pair provides
+ * a unidirectional byte stream which can generate events when either
+ * the producer or consumer pointers cross a particular threshold.
+ *
+ * We define both sring_prod and sring_cons structures.         The two
+ * structures will always map onto the same physical bytes in memory,
+ * but they provide different views of that memory which are
+ * appropriate to either producers or consumers.
+ *
+ * Obviously, the endpoints need to agree on which end produces
+ * messages on which ring.     The endpoint which provided the memory
+ * backing the ring always produces on the first sring, and the one
+ * which just mapped the ring produces on the second.  By convention,
+ * these are known as the frontend and backend, respectively.
+ */
+
+/* For both rings, the producer (consumer) pointers point at the
+ * *next* byte which is going to be produced (consumed).  An endpoint
+ * must generate an event on the event channel port if it moves the
+ * producer pointer (consumer pointer) across prod_event (cons_event).
+ *
+ * i.e if an endpoint ever updates a pointer so that the old pointer
+ * is strictly less than the event, and the new pointer is greater
+ * than or equal to the event then the remote must be notified.         If
+ * the pointer overflows the ring, treat the new value as if it were
+ * (actual new value) + (1 << 32).
+ */
+struct netchannel2_sring_prod {
+       RING_IDX prod;
+       volatile const RING_IDX cons;
+       volatile const RING_IDX prod_event;
+       RING_IDX cons_event;
+       unsigned char pad[48];
+};
+
+struct netchannel2_sring_cons {
+       volatile const RING_IDX prod;
+       RING_IDX cons;
+       RING_IDX prod_event;
+       volatile const RING_IDX cons_event;
+       unsigned char pad[48];
+};
+
+struct netchannel2_frontend_shared {
+       struct netchannel2_sring_prod prod;
+       struct netchannel2_sring_cons cons;
+};
+
+struct netchannel2_backend_shared {
+       struct netchannel2_sring_cons cons;
+       struct netchannel2_sring_prod prod;
+};
+
+struct netchannel2_prod_ring {
+       struct netchannel2_sring_prod *sring;
+       void *payload;
+       RING_IDX prod_pvt;
+       /* This is the number of bytes available after prod_pvt last
+          time we checked, minus the number of bytes which we've
+          consumed since then.  It's used to a avoid a bunch of
+          memory barriers when checking for ring space. */
+       unsigned bytes_available;
+       /* Number of bytes reserved by nc2_reserve_payload_bytes() */
+       unsigned reserve;
+       size_t payload_bytes;
+};
+
+struct netchannel2_cons_ring {
+       struct netchannel2_sring_cons *sring;
+       const volatile void *payload;
+       RING_IDX cons_pvt;
+       size_t payload_bytes;
+};
+
+/* A message header.  There is one of these at the start of every
+ * message.     @type is one of the #define's below, and @size is the
+ * size of the message, including the header and any padding.
+ * size should be a multiple of 8 so we avoid unaligned memory copies.
+ * structs defining message formats should have sizes multiple of 8
+ * bytes and should use paddding fields if needed.
+ */
+struct netchannel2_msg_hdr {
+       uint8_t type;
+       uint8_t flags;
+       uint16_t size;
+};
+
+/* Copy some bytes from the shared ring to a stable local buffer,
+ * starting at the private consumer pointer.  Does not update the
+ * private consumer pointer.
+ */
+static inline void nc2_copy_from_ring_off(struct netchannel2_cons_ring *ring,
+                                         void *buf,
+                                         size_t nbytes,
+                                         unsigned off)
+{
+       unsigned start, end;
+
+       start = (ring->cons_pvt + off) & (ring->payload_bytes-1);
+       end = (ring->cons_pvt + nbytes + off) & (ring->payload_bytes-1);
+       /* We cast away the volatile modifier to get rid of an
+          irritating compiler warning, and compensate with a
+          barrier() at the end. */
+       memcpy(buf, (const void *)ring->payload + start, nbytes);
+       barrier();
+}
+
+static inline void nc2_copy_from_ring(struct netchannel2_cons_ring *ring,
+                                     void *buf,
+                                     size_t nbytes)
+{
+       nc2_copy_from_ring_off(ring, buf, nbytes, 0);
+}
+
+
+/* Copy some bytes to the shared ring, starting at the private
+ * producer pointer.  Does not update the private pointer.
+ */
+static inline void nc2_copy_to_ring_off(struct netchannel2_prod_ring *ring,
+                                       const void *src,
+                                       unsigned nr_bytes,
+                                       unsigned off)
+{
+       unsigned start, end;
+
+       start = (ring->prod_pvt + off) & (ring->payload_bytes-1);
+       end = (ring->prod_pvt + nr_bytes + off) & (ring->payload_bytes-1);
+       memcpy(ring->payload + start, src, nr_bytes);
+}
+
+static inline void nc2_copy_to_ring(struct netchannel2_prod_ring *ring,
+                                   const void *src,
+                                   unsigned nr_bytes)
+{
+       nc2_copy_to_ring_off(ring, src, nr_bytes, 0);
+}
+
+static inline void __nc2_send_pad(struct netchannel2_prod_ring *ring,
+                                 unsigned nr_bytes)
+{
+       struct netchannel2_msg_hdr msg;
+       msg.type = NETCHANNEL2_MSG_PAD;
+       msg.flags = 0;
+       msg.size = nr_bytes;
+       nc2_copy_to_ring(ring, &msg, sizeof(msg));
+       ring->prod_pvt += nr_bytes;
+       ring->bytes_available -= nr_bytes;
+}
+
+static inline int __nc2_ring_would_wrap(struct netchannel2_prod_ring *ring,
+                                       unsigned nr_bytes)
+{
+       RING_IDX mask;
+       mask = ~(ring->payload_bytes - 1);
+       return (ring->prod_pvt & mask) != ((ring->prod_pvt + nr_bytes) & mask);
+}
+
+static inline unsigned __nc2_pad_needed(struct netchannel2_prod_ring *ring)
+{
+       return ring->payload_bytes -
+               (ring->prod_pvt & (ring->payload_bytes - 1));
+}
+
+static inline void __nc2_avoid_ring_wrap(struct netchannel2_prod_ring *ring,
+                                        unsigned nr_bytes)
+{
+       if (!__nc2_ring_would_wrap(ring, nr_bytes))
+               return;
+       __nc2_send_pad(ring, __nc2_pad_needed(ring));
+
+}
+
+/* Prepare a message for the other end and place it on the shared
+ * ring, updating the private producer pointer.         You need to call
+ * nc2_flush_messages() before the message is actually made visible to
+ * the other end.  It is permissible to send several messages in a
+ * batch and only flush them once.
+ */
+static inline void nc2_send_message(struct netchannel2_prod_ring *ring,
+                                   unsigned type,
+                                   unsigned flags,
+                                   const void *msg,
+                                   size_t size)
+{
+       struct netchannel2_msg_hdr *hdr = (struct netchannel2_msg_hdr *)msg;
+
+       __nc2_avoid_ring_wrap(ring, size);
+
+       hdr->type = type;
+       hdr->flags = flags;
+       hdr->size = size;
+
+       nc2_copy_to_ring(ring, msg, size);
+       ring->prod_pvt += size;
+       BUG_ON(ring->bytes_available < size);
+       ring->bytes_available -= size;
+}
+
+static inline volatile void *__nc2_get_message_ptr(struct 
netchannel2_prod_ring *ncrp)
+{
+       return (volatile void *)ncrp->payload +
+               (ncrp->prod_pvt & (ncrp->payload_bytes-1));
+}
+
+/* Copy the private producer pointer to the shared producer pointer,
+ * with a suitable memory barrier such that all messages placed on the
+ * ring are stable before we do the copy.  This effectively pushes any
+ * messages which we've just sent out to the other end.         Returns 1 if
+ * we need to notify the other end and 0 otherwise.
+ */
+static inline int nc2_flush_ring(struct netchannel2_prod_ring *ring)
+{
+       RING_IDX old_prod, new_prod;
+
+       old_prod = ring->sring->prod;
+       new_prod = ring->prod_pvt;
+
+       wmb();
+
+       ring->sring->prod = new_prod;
+
+       /* We need the update to prod to happen before we read
+        * event. */
+       mb();
+
+       /* We notify if the producer pointer moves across the event
+        * pointer. */
+       if ((RING_IDX)(new_prod - ring->sring->prod_event) <
+           (RING_IDX)(new_prod - old_prod))
+               return 1;
+       else
+               return 0;
+}
+
+/* Copy the private consumer pointer to the shared consumer pointer,
+ * with a memory barrier so that any previous reads from the ring
+ * complete before the pointer is updated.     This tells the other end
+ * that we're finished with the messages, and that it can re-use the
+ * ring space for more messages.  Returns 1 if we need to notify the
+ * other end and 0 otherwise.
+ */
+static inline int nc2_finish_messages(struct netchannel2_cons_ring *ring)
+{
+       RING_IDX old_cons, new_cons;
+
+       old_cons = ring->sring->cons;
+       new_cons = ring->cons_pvt;
+
+       /* Need to finish reading from the ring before updating
+          cons */
+       mb();
+       ring->sring->cons = ring->cons_pvt;
+
+       /* Need to publish our new consumer pointer before checking
+          event. */
+       mb();
+       if ((RING_IDX)(new_cons - ring->sring->cons_event) <
+           (RING_IDX)(new_cons - old_cons))
+               return 1;
+       else
+               return 0;
+}
+
+/* Check whether there are any unconsumed messages left on the shared
+ * ring.  Returns 1 if there are, and 0 if there aren't.  If there are
+ * no more messages, set the producer event so that we'll get a
+ * notification as soon as another one gets sent.  It is assumed that
+ * all messages up to @prod have been processed, and none of the ones
+ * after it have been. */
+static inline int nc2_final_check_for_messages(struct netchannel2_cons_ring 
*ring,
+                                              RING_IDX prod)
+{
+       if (prod != ring->sring->prod)
+               return 1;
+       /* Request an event when more stuff gets poked on the ring. */
+       ring->sring->prod_event = prod + 1;
+
+       /* Publish event before final check for responses. */
+       mb();
+       if (prod != ring->sring->prod)
+               return 1;
+       else
+               return 0;
+}
+
+/* Can we send a message with @nr_bytes payload bytes? Returns 1 if
+ * we can or 0 if we can't.     If there isn't space right now, set the
+ * consumer event so that we'll get notified when space is
+ * available. */
+static inline int nc2_can_send_payload_bytes(struct netchannel2_prod_ring 
*ring,
+                                            unsigned nr_bytes)
+{
+       unsigned space;
+       RING_IDX cons;
+       BUG_ON(ring->bytes_available > ring->payload_bytes);
+       /* Times 2 because we might need to send a pad message */
+       if (likely(ring->bytes_available > nr_bytes * 2 + ring->reserve))
+               return 1;
+       if (__nc2_ring_would_wrap(ring, nr_bytes))
+               nr_bytes += __nc2_pad_needed(ring);
+retry:
+       cons = ring->sring->cons;
+       space = ring->payload_bytes - (ring->prod_pvt - cons);
+       if (likely(space >= nr_bytes + ring->reserve)) {
+               /* We have enough space to send the message. */
+
+               /* Need to make sure that the read of cons happens
+                  before any following memory writes. */
+               mb();
+
+               ring->bytes_available = space;
+
+               return 1;
+       } else {
+               /* Not enough space available.  Set an event pointer
+                  when cons changes.  We need to be sure that the
+                  @cons used here is the same as the cons used to
+                  calculate @space above, and the volatile modifier
+                  on sring->cons achieves that. */
+               ring->sring->cons_event = cons + 1;
+
+               /* Check whether more space became available while we
+                  were messing about. */
+
+               /* Need the event pointer to be stable before we do
+                  the check. */
+               mb();
+               if (unlikely(cons != ring->sring->cons)) {
+                       /* Cons pointer changed.  Try again. */
+                       goto retry;
+               }
+
+               /* There definitely isn't space on the ring now, and
+                  an event has been set such that we'll be notified
+                  if more space becomes available. */
+               /* XXX we get a notification as soon as any more space
+                  becomes available.  We could maybe optimise by
+                  setting the event such that we only get notified
+                  when we know that enough space is available.  The
+                  main complication is handling the case where you
+                  try to send a message of size A, fail due to lack
+                  of space, and then try to send one of size B, where
+                  B < A.  It's not clear whether you want to set the
+                  event for A bytes or B bytes.  The obvious answer
+                  is B, but that means moving the event pointer
+                  backwards, and it's not clear that that's always
+                  safe.  Always setting for a single byte is safe, so
+                  stick with that for now. */
+               return 0;
+       }
+}
+
+static inline int nc2_reserve_payload_bytes(struct netchannel2_prod_ring *ring,
+                                           unsigned nr_bytes)
+{
+       if (nc2_can_send_payload_bytes(ring, nr_bytes)) {
+               ring->reserve += nr_bytes;
+               return 1;
+       } else {
+               return 0;
+       }
+}
+
+#endif /* __XEN_PUBLIC_IO_URING_H__ */
-- 
1.6.3.1


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.