[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [xen stable-4.13] tools/ocaml/xb: Add BoundedQueue
commit 723e24871833406093b6a8e2e5b9a3d8de2e5e04 Author: Edwin Török <edvin.torok@xxxxxxxxxx> AuthorDate: Wed Oct 12 19:13:03 2022 +0100 Commit: Andrew Cooper <andrew.cooper3@xxxxxxxxxx> CommitDate: Tue Nov 1 15:25:15 2022 +0000 tools/ocaml/xb: Add BoundedQueue Ensures we cannot store more than [capacity] elements in a [Queue]. Replacing all Queue with this module will then ensure at compile time that all Queues are correctly bound checked. Each element in the queue has a class with its own limits. This, in a subsequent change, will ensure that command responses can proceed during a flood of watch events. No functional change. This is part of XSA-326. Signed-off-by: Edwin Török <edvin.torok@xxxxxxxxxx> Acked-by: Christian Lindig <christian.lindig@xxxxxxxxxx> (cherry picked from commit 19171fb5d888b4467a7073e8febc5e05540956e9) --- tools/ocaml/libs/xb/xb.ml | 92 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml index 165fd4a1ed..4197a3888a 100644 --- a/tools/ocaml/libs/xb/xb.ml +++ b/tools/ocaml/libs/xb/xb.ml @@ -17,6 +17,98 @@ module Op = struct include Op end module Packet = struct include Packet end +module BoundedQueue : sig + type ('a, 'b) t + + (** [create ~capacity ~classify ~limit] creates a queue with maximum [capacity] elements. + This is burst capacity, each element is further classified according to [classify], + and each class can have its own [limit]. + [capacity] is enforced as an overall limit. + The [limit] can be dynamic, and can be smaller than the number of elements already queued of that class, + in which case those elements are considered to use "burst capacity". + *) + val create: capacity:int -> classify:('a -> 'b) -> limit:('b -> int) -> ('a, 'b) t + + (** [clear q] discards all elements from [q] *) + val clear: ('a, 'b) t -> unit + + (** [can_push q] when [length q < capacity]. *) + val can_push: ('a, 'b) t -> 'b -> bool + + (** [push e q] adds [e] at the end of queue [q] if [can_push q], or returns [None]. *) + val push: 'a -> ('a, 'b) t -> unit option + + (** [pop q] removes and returns first element in [q], or raises [Queue.Empty]. *) + val pop: ('a, 'b) t -> 'a + + (** [peek q] returns the first element in [q], or raises [Queue.Empty]. *) + val peek : ('a, 'b) t -> 'a + + (** [length q] returns the current number of elements in [q] *) + val length: ('a, 'b) t -> int + + (** [debug string_of_class q] prints queue usage statistics in an unspecified internal format. *) + val debug: ('b -> string) -> (_, 'b) t -> string +end = struct + type ('a, 'b) t = + { q: 'a Queue.t + ; capacity: int + ; classify: 'a -> 'b + ; limit: 'b -> int + ; class_count: ('b, int) Hashtbl.t + } + + let create ~capacity ~classify ~limit = + { capacity; q = Queue.create (); classify; limit; class_count = Hashtbl.create 3 } + + let get_count t classification = try Hashtbl.find t.class_count classification with Not_found -> 0 + + let can_push_internal t classification class_count = + Queue.length t.q < t.capacity && class_count < t.limit classification + + let ok = Some () + + let push e t = + let classification = t.classify e in + let class_count = get_count t classification in + if can_push_internal t classification class_count then begin + Queue.push e t.q; + Hashtbl.replace t.class_count classification (class_count + 1); + ok + end + else + None + + let can_push t classification = + can_push_internal t classification @@ get_count t classification + + let clear t = + Queue.clear t.q; + Hashtbl.reset t.class_count + + let pop t = + let e = Queue.pop t.q in + let classification = t.classify e in + let () = match get_count t classification - 1 with + | 0 -> Hashtbl.remove t.class_count classification (* reduces memusage *) + | n -> Hashtbl.replace t.class_count classification n + in + e + + let peek t = Queue.peek t.q + let length t = Queue.length t.q + + let debug string_of_class t = + let b = Buffer.create 128 in + Printf.bprintf b "BoundedQueue capacity: %d, used: {" t.capacity; + Hashtbl.iter (fun packet_class count -> + Printf.bprintf b " %s: %d" (string_of_class packet_class) count + ) t.class_count; + Printf.bprintf b "}"; + Buffer.contents b +end + + exception End_of_file exception Eagain exception Noent -- generated by git-patchbot for /home/xen/git/xen.git#stable-4.13
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |