[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v2] Limit the amount of work done for each Receiver DPC



From: Owen Smith <owen.smith@xxxxxxxxxx>

Change the receive algorithm, such that, for any particular DPC, only a
fixed number of packets can be pushed up the stack. Under higher load
conditions, another DPC is scheduled to process the remainder before
any more packets are removed from the descriptor ring (hence back-
pressuring the network backend).

This functionality currently local to XENVIF. A more complete solution,
where XENNET determines when to finish a batch of packets will be
included in a subsequent patch.

Signed-off-by: Martin Harvey <Martin.Harvey@xxxxxxxxxx>
Ported-by: Owen Smith <owen.smith@xxxxxxxxxx>

Some re-work.

Signed-off-by: Paul Durrant <paul@xxxxxxx>
---

v2:
 - Cosmetic tweaks
 - Got rid of 'AllFlushed' in __ReceiverRingSwizzle() and introduced
   'More'
---
 src/xenvif/receiver.c | 80 +++++++++++++++++++++++++++++++------------
 1 file changed, 59 insertions(+), 21 deletions(-)

diff --git a/src/xenvif/receiver.c b/src/xenvif/receiver.c
index 505505ee886e..050c86f0d1a8 100644
--- a/src/xenvif/receiver.c
+++ b/src/xenvif/receiver.c
@@ -99,6 +99,8 @@ typedef struct _XENVIF_RECEIVER_RING {
     BOOLEAN                     Connected;
     BOOLEAN                     Enabled;
     BOOLEAN                     Stopped;
+    BOOLEAN                     BackPressured;
+    BOOLEAN                     Flush;
     XENVIF_VIF_OFFLOAD_OPTIONS  OffloadOptions;
     ULONG                       BackfillSize;
     PXENBUS_DEBUG_CALLBACK      DebugCallback;
@@ -148,6 +150,9 @@ struct _XENVIF_RECEIVER {
 
 #define XENVIF_RECEIVER_TAG 'ECER'
 
+// Half of the maximum number of packets that XENNET will send up to NDIS
+#define RX_BUFFERING_MAX 2048
+
 static FORCEINLINE PVOID
 __ReceiverAllocate(
     IN  ULONG   Length
@@ -1338,43 +1343,54 @@ __ReceiverRingSwizzle(
     PXENVIF_VIF_CONTEXT         Context;
     LIST_ENTRY                  List;
     PLIST_ENTRY                 ListEntry;
+    ULONG                       Queued;
+    BOOLEAN                     More;
 
     Receiver = Ring->Receiver;
     Frontend = Receiver->Frontend;
     Context = PdoGetVifContext(FrontendGetPdo(Frontend));
+    Queued = 0;
 
     InitializeListHead(&List);
 
-    ListEntry = InterlockedExchangePointer(&Ring->PacketQueue, NULL);
+    //
+    // Only process the PacketQueue if there are no completed packets
+    // left to queue to XENNET, unless we are flushing the ring.
+    //
+    More = !IsListEmpty(&Ring->PacketComplete) ? TRUE : FALSE;
+    if (!More || Ring->Flush)
+    {
+        ListEntry = InterlockedExchangePointer(&Ring->PacketQueue, NULL);
 
-    // Packets are held in the queue in reverse order so that the most
-    // recent is always head of the list. This is necessary to allow
-    // addition to the list to be done atomically.
+        // Packets are held in the queue in reverse order so that the most
+        // recent is always head of the list. This is necessary to allow
+        // addition to the list to be done atomically.
 
-    while (ListEntry != NULL) {
-        PLIST_ENTRY NextEntry;
+        while (ListEntry != NULL) {
+            PLIST_ENTRY NextEntry;
 
-        NextEntry = ListEntry->Blink;
-        ListEntry->Flink = ListEntry->Blink = ListEntry;
+            NextEntry = ListEntry->Blink;
+            ListEntry->Flink = ListEntry->Blink = ListEntry;
 
-        InsertHeadList(&List, ListEntry);
+            InsertHeadList(&List, ListEntry);
 
-        ListEntry = NextEntry;
-    }
+            ListEntry = NextEntry;
+        }
 
-    while (!IsListEmpty(&List)) {
-        PXENVIF_RECEIVER_PACKET Packet;
+        while (!IsListEmpty(&List)) {
+            PXENVIF_RECEIVER_PACKET Packet;
 
-        ListEntry = RemoveHeadList(&List);
-        ASSERT3P(ListEntry, !=, &List);
+            ListEntry = RemoveHeadList(&List);
+            ASSERT3P(ListEntry, !=, &List);
 
-        RtlZeroMemory(ListEntry, sizeof (LIST_ENTRY));
+            RtlZeroMemory(ListEntry, sizeof (LIST_ENTRY));
 
-        Packet = CONTAINING_RECORD(ListEntry, XENVIF_RECEIVER_PACKET, 
ListEntry);
-        ReceiverRingProcessPacket(Ring, Packet);
+            Packet = CONTAINING_RECORD(ListEntry, XENVIF_RECEIVER_PACKET, 
ListEntry);
+            ReceiverRingProcessPacket(Ring, Packet);
+        }
     }
 
-    while (!IsListEmpty(&Ring->PacketComplete)) {
+    while (More) {
         PXENVIF_RECEIVER_PACKET Packet;
         PXENVIF_PACKET_INFO     Info;
         PUCHAR                  BaseVa;
@@ -1382,6 +1398,8 @@ __ReceiverRingSwizzle(
         PETHERNET_ADDRESS       DestinationAddress;
         ETHERNET_ADDRESS_TYPE   Type;
 
+        Queued++;
+
         ListEntry = RemoveHeadList(&Ring->PacketComplete);
         ASSERT3P(ListEntry, !=, &Ring->PacketComplete);
 
@@ -1527,6 +1545,10 @@ __ReceiverRingSwizzle(
 
         (VOID) InterlockedIncrement(&Receiver->Loaned);
 
+        More = !IsListEmpty(&Ring->PacketComplete) ? TRUE : FALSE;
+        if (!Ring->Flush && Queued == RX_BUFFERING_MAX)
+            More = FALSE;
+
         VifReceiverQueuePacket(Context,
                                Ring->Index,
                                &Packet->Mdl,
@@ -1537,9 +1559,22 @@ __ReceiverRingSwizzle(
                                Packet->TagControlInformation,
                                &Packet->Info,
                                &Packet->Hash,
-                               !IsListEmpty(&Ring->PacketComplete) ? TRUE : 
FALSE,
+                               More,
                                Packet);
     }
+
+    if (!IsListEmpty(&Ring->PacketComplete)) {
+        // Re-run for the remainder from the back of DPC queue.
+        Ring->BackPressured = TRUE;
+        if (KeInsertQueueDpc(&Ring->QueueDpc, NULL, NULL))
+            Ring->QueueDpcs++;
+    } else {
+        Ring->BackPressured = FALSE;
+
+        // PollDpc is zeroed before final flush, don't queue it here.
+        if (!Ring->Flush && KeInsertQueueDpc(&Ring->PollDpc, NULL, NULL))
+            Ring->PollDpcs++;
+    }
 }
 
 static FORCEINLINE VOID
@@ -1990,7 +2025,7 @@ ReceiverRingPoll(
 
     Count = 0;
 
-    if (!Ring->Enabled)
+    if (!Ring->Enabled || (Ring->BackPressured && !Ring->Flush))
         goto done;
 
     for (;;) {
@@ -2963,8 +2998,11 @@ __ReceiverRingTeardown(
     Ring->BackfillSize = 0;
     Ring->OffloadOptions.Value = 0;
 
+    Ring->Flush = TRUE;
+    KeInsertQueueDpc(&Ring->QueueDpc, NULL, NULL);
     KeFlushQueuedDpcs();
     RtlZeroMemory(&Ring->QueueDpc, sizeof (KDPC));
+    Ring->Flush = FALSE;
 
     ThreadAlert(Ring->WatchdogThread);
     ThreadJoin(Ring->WatchdogThread);
-- 
2.17.1




 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.