[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] Re: [PATCH 6/6] Under conditions of high load, it was possible for xenvif to queue up very large numbers of packets before pushing them all upstream in one go. This was partially ameliorated by use of the NDIS_RECEIVE_FLAGS_RESOURCES flag.
On 20/07/2021 14:29, Martin Harvey wrote: Unfortunately, this was not always a sufficient fix, and in some low memory cases, NDIS and higher level drivers could bugcheck attempting to copy MDL's and packet contents (NETIO!RtlCopyMdlToMdlIndirect). This patch adds backpressure to the Rx path, such that it is only possible to indicate a certain number of receive buffers in any one DPC. I think you first ought to make a new version of VifReceiverQueuePacket() that can return a boolean value to say whether you should queue more or not. Then XENNET can be the one to apply the backpressure (as it should be). In such cases, another DPC is immediately scheduled, on the understanding that DPC's for higher level drivers are likely to consume those packets and free up resources in the interim. In such a backpressure state, the RX ring is not drained until all packets in the intermediate queue have been indicated up the stack. Signed off by: Martin Harvey <martin.harvey@xxxxxxxxxx> --- src/xenvif/receiver.c | 93 +++++++++++++++++++++++++++++++++---------- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/src/xenvif/receiver.c b/src/xenvif/receiver.c index 7295646..964207b 100644 --- a/src/xenvif/receiver.c +++ b/src/xenvif/receiver.c @@ -99,11 +99,14 @@ typedef struct _XENVIF_RECEIVER_RING { BOOLEAN Connected; BOOLEAN Enabled; BOOLEAN Stopped; + BOOLEAN Backpressured; + BOOLEAN FinalFlush; XENVIF_VIF_OFFLOAD_OPTIONS OffloadOptions; ULONG BackfillSize; PXENBUS_DEBUG_CALLBACK DebugCallback; PXENVIF_THREAD WatchdogThread; PLIST_ENTRY PacketQueue; + LONG PacketQueueEstSize; //Conservative estimate. KDPC QueueDpc; ULONG QueueDpcs; LIST_ENTRY PacketComplete; @@ -147,6 +150,8 @@ struct _XENVIF_RECEIVER {#define XENVIF_RECEIVER_TAG 'ECER' +#define RX_BUFFERING_MAX 2048 //X-ref XenNet IN_NDIS_MAX (half in NDIS half elsewhere)+ static FORCEINLINE PVOID __ReceiverAllocate( IN ULONG Length @@ -1337,43 +1342,56 @@ __ReceiverRingSwizzle( PXENVIF_VIF_CONTEXT Context; LIST_ENTRY List; PLIST_ENTRY ListEntry; + BOOLEAN AllFlushed; + ULONG PushedUpstream;Receiver = Ring->Receiver;Frontend = Receiver->Frontend; Context = PdoGetVifContext(FrontendGetPdo(Frontend)); + AllFlushed = TRUE; + PushedUpstream = 0;InitializeListHead(&List); - ListEntry = InterlockedExchangePointer(&Ring->PacketQueue, NULL);+ if (IsListEmpty(&Ring->PacketComplete) || Ring->FinalFlush) + { + Ring->PacketQueueEstSize = 0; + ListEntry = InterlockedExchangePointer(&Ring->PacketQueue, NULL);- // Packets are held in the queue in reverse order so that the most- // recent is always head of the list. This is necessary to allow - // addition to the list to be done atomically. + // Packets are held in the queue in reverse order so that the most + // recent is always head of the list. This is necessary to allow + // addition to the list to be done atomically.- while (ListEntry != NULL) {- PLIST_ENTRY NextEntry; + while (ListEntry != NULL) { + PLIST_ENTRY NextEntry;- NextEntry = ListEntry->Blink;- ListEntry->Flink = ListEntry->Blink = ListEntry; + NextEntry = ListEntry->Blink; + ListEntry->Flink = ListEntry->Blink = ListEntry;- InsertHeadList(&List, ListEntry);+ InsertHeadList(&List, ListEntry);- ListEntry = NextEntry;- } + ListEntry = NextEntry; + }- while (!IsListEmpty(&List)) {- PXENVIF_RECEIVER_PACKET Packet; + while (!IsListEmpty(&List)) { + PXENVIF_RECEIVER_PACKET Packet;- ListEntry = RemoveHeadList(&List);- ASSERT3P(ListEntry, !=, &List); + ListEntry = RemoveHeadList(&List); + ASSERT3P(ListEntry, !=, &List);- RtlZeroMemory(ListEntry, sizeof (LIST_ENTRY));+ RtlZeroMemory(ListEntry, sizeof (LIST_ENTRY));- Packet = CONTAINING_RECORD(ListEntry, XENVIF_RECEIVER_PACKET, ListEntry);- ReceiverRingProcessPacket(Ring, Packet); + Packet = CONTAINING_RECORD(ListEntry, XENVIF_RECEIVER_PACKET, ListEntry); + ReceiverRingProcessPacket(Ring, Packet); + } + } + else + { + AllFlushed = FALSE; } I think the part below should now be split out into a separate function now; it's all getting a bit long to be inline. You might also consider a separate DPC that just does this second part so you can just use that in the case you need to defer. Paul - while (!IsListEmpty(&Ring->PacketComplete)) { + while ((!IsListEmpty(&Ring->PacketComplete)) + && ((PushedUpstream < RX_BUFFERING_MAX) || Ring->FinalFlush)) { PXENVIF_RECEIVER_PACKET Packet; PXENVIF_PACKET_INFO Info; PUCHAR BaseVa; @@ -1381,6 +1399,8 @@ __ReceiverRingSwizzle( PETHERNET_ADDRESS DestinationAddress; ETHERNET_ADDRESS_TYPE Type;+ PushedUpstream++;+ ListEntry = RemoveHeadList(&Ring->PacketComplete); ASSERT3P(ListEntry, !=, &Ring->PacketComplete);@@ -1536,9 +1556,33 @@ __ReceiverRingSwizzle(Packet->TagControlInformation, &Packet->Info, &Packet->Hash, - !IsListEmpty(&Ring->PacketComplete) ? TRUE : FALSE, + ((!IsListEmpty(&Ring->PacketComplete)) + && ((PushedUpstream < RX_BUFFERING_MAX) || Ring->FinalFlush)) ? TRUE : FALSE, Packet); } + + if (!IsListEmpty(&Ring->PacketComplete)) + AllFlushed = FALSE; + + if (!AllFlushed) + { + //Re-run remainder from back of DPC queue. + Ring->Backpressured = TRUE; + if (KeInsertQueueDpc(&Ring->QueueDpc, NULL, NULL)) + Ring->QueueDpcs++; + } + else + { + if ((Ring->Backpressured) && !Ring->FinalFlush) + { + //Not any more - restart dataflow from initial ring poll. + Ring->Backpressured = FALSE; + + //PollDpc zeroed before final flush, don't queue it here. + if (KeInsertQueueDpc(&Ring->PollDpc, NULL, NULL)) + Ring->PollDpcs++; + } + } }static FORCEINLINE VOID@@ -1973,6 +2017,8 @@ __ReceiverRingQueuePacket( ListEntry->Blink = Ring->PacketQueue; New = ListEntry; } while (InterlockedCompareExchangePointer(&Ring->PacketQueue, (PVOID)New, (PVOID)Old) != Old); + + InterlockedIncrement(&Ring->PacketQueueEstSize); }static DECLSPEC_NOINLINE ULONG@@ -1992,6 +2038,9 @@ ReceiverRingPoll( if (!Ring->Enabled) goto done;+ if ((Ring->PacketQueueEstSize >= RX_BUFFERING_MAX) && !Ring->FinalFlush)+ goto done; + for (;;) { BOOLEAN Error; BOOLEAN Extra; @@ -2949,8 +2998,12 @@ __ReceiverRingTeardown( Ring->BackfillSize = 0; Ring->OffloadOptions.Value = 0;+ Ring->FinalFlush = TRUE;+ KeInsertQueueDpc(&Ring->QueueDpc, NULL, NULL); KeFlushQueuedDpcs(); RtlZeroMemory(&Ring->QueueDpc, sizeof (KDPC)); + Ring->Backpressured = FALSE; + Ring->FinalFlush = FALSE;ThreadAlert(Ring->WatchdogThread);ThreadJoin(Ring->WatchdogThread);
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |