[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [xen master] tools/ocaml: Limit maximum in-flight requests / outstanding replies
commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96 Author: Edwin Török <edvin.torok@xxxxxxxxxx> AuthorDate: Wed Oct 12 19:13:04 2022 +0100 Commit: Andrew Cooper <andrew.cooper3@xxxxxxxxxx> CommitDate: Tue Nov 1 13:05:44 2022 +0000 tools/ocaml: Limit maximum in-flight requests / outstanding replies Introduce a limit on the number of outstanding reply packets in the xenbus queue. This limits the number of in-flight requests: when the output queue is full we'll stop processing inputs until the output queue has room again. To avoid a busy loop on the Unix socket we only add it to the watched input file descriptor set if we'd be able to call `input` on it. Even though Dom0 is trusted and exempt from quotas a flood of events might cause a backlog where events are produced faster than daemons in Dom0 can consume them, which could lead to an unbounded queue size and OOM. Therefore the xenbus queue limit must apply to all connections, Dom0 is not exempt from it, although if everything works correctly it will eventually catch up. This prevents a malicious guest from sending more commands while it has outstanding watch events or command replies in its input ring. However if it can cause the generation of watch events by other means (e.g. by Dom0, or another cooperative guest) and stop reading its own ring then watch events would've queued up without limit. The xenstore protocol doesn't have a back-pressure mechanism, and doesn't allow dropping watch events. In fact, dropping watch events is known to break some pieces of normal functionality. This leaves little choice to safely implement the xenstore protocol without exposing the xenstore daemon to out-of-memory attacks. Implement the fix as pipes with bounded buffers: * Use a bounded buffer for watch events * The watch structure will have a bounded receiving pipe of watch events * The source will have an "overflow" pipe of pending watch events it couldn't deliver Items are queued up on one end and are sent as far along the pipe as possible: source domain -> watch -> xenbus of target -> xenstore ring/socket of target If the pipe is "full" at any point then back-pressure is applied and we prevent more items from being queued up. For the source domain this means that we'll stop accepting new commands as long as its pipe buffer is not empty. Before we try to enqueue an item we first check whether it is possible to send it further down the pipe, by attempting to recursively flush the pipes. This ensures that we retain the order of events as much as possible. We might break causality of watch events if the target domain's queue is full and we need to start using the watch's queue. This is a breaking change in the xenstore protocol, but only for domains which are not processing their incoming ring as expected. When a watch is deleted its entire pending queue is dropped (no code is needed for that, because it is part of the 'watch' type). There is a cache of watches that have pending events that we attempt to flush at every cycle if possible. Introduce 3 limits here: * quota-maxwatchevents on watch event destination: when this is hit the source will not be allowed to queue up more watch events. * quota-maxoustanding which is the number of responses not read from the ring: once exceeded, no more inputs are processed until all outstanding replies are consumed by the client. * overflow queue on the watch event source: all watches that cannot be stored on destination are queued up here, a single command can trigger multiple watches (e.g. due to recursion). The overflow queue currently doesn't have an upper bound, it is difficult to accurately calculate one as it depends on whether you are Dom0 and how many watches each path has registered and how many watch events you can trigger with a single command (e.g. a commit). However these events were already using memory, this just moves them elsewhere, and as long as we correctly block a domain it shouldn't result in unbounded memory usage. Note that Dom0 is not excluded from these checks, it is important that Dom0 is especially not excluded when it is the source, since there are many ways in which a guest could trigger Dom0 to send it watch events. This should protect against malicious frontends as long as the backend follows the PV xenstore protocol and only exposes paths needed by the frontend, and changes those paths at most once as a reaction to guest events, or protocol state. The queue limits are per watch, and per domain-pair, so even if one communication channel would be "blocked", others would keep working, and the domain itself won't get blocked as long as it doesn't overflow the queue of watch events. Similarly a malicious backend could cause the frontend to get blocked, but this watch queue protects the frontend as well as long as it follows the PV protocol. (Although note that protection against malicious backends is only a best effort at the moment) This is part of XSA-326 / CVE-2022-42318. Signed-off-by: Edwin Török <edvin.torok@xxxxxxxxxx> Acked-by: Christian Lindig <christian.lindig@xxxxxxxxxx> --- tools/ocaml/libs/xb/xb.ml | 61 ++++++++++-- tools/ocaml/libs/xb/xb.mli | 11 ++- tools/ocaml/libs/xs/queueop.ml | 25 ++--- tools/ocaml/libs/xs/xsraw.ml | 4 +- tools/ocaml/xenstored/connection.ml | 155 ++++++++++++++++++++++++++++--- tools/ocaml/xenstored/connections.ml | 57 +++++++++--- tools/ocaml/xenstored/define.ml | 7 ++ tools/ocaml/xenstored/oxenstored.conf.in | 2 + tools/ocaml/xenstored/process.ml | 31 +++++-- tools/ocaml/xenstored/xenstored.ml | 2 + 10 files changed, 296 insertions(+), 59 deletions(-) diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml index 4197a3888a..b292ed7a87 100644 --- a/tools/ocaml/libs/xb/xb.ml +++ b/tools/ocaml/libs/xb/xb.ml @@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes +(* + separate capacity reservation for replies and watch events: + this allows a domain to keep working even when under a constant flood of + watch events +*) +type capacity = { maxoutstanding: int; maxwatchevents: int } + +module Queue = BoundedQueue + +type packet_class = + | CommandReply + | Watchevent + +let string_of_packet_class = function + | CommandReply -> "command_reply" + | Watchevent -> "watch_event" + type t = { backend: backend; - pkt_out: Packet.t Queue.t; + pkt_out: (Packet.t, packet_class) Queue.t; mutable partial_in: partial_buf; mutable partial_out: string; + capacity: capacity } +let to_read con = + match con.partial_in with + | HaveHdr partial_pkt -> Partial.to_complete partial_pkt + | NoHdr (i, _) -> i + +let debug t = + Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d bytes, pkt_out: %d packets, %s" + (to_read t) + (String.length t.partial_out) + (Queue.length t.pkt_out) + (BoundedQueue.debug string_of_packet_class t.pkt_out) + let init_partial_in () = NoHdr (Partial.header_size (), Bytes.make (Partial.header_size()) '\000') @@ -199,7 +229,8 @@ let output con = let s = if String.length con.partial_out > 0 then con.partial_out else if Queue.length con.pkt_out > 0 then - Packet.to_string (Queue.pop con.pkt_out) + let pkt = Queue.pop con.pkt_out in + Packet.to_string pkt else "" in (* send data from s, and save the unsent data to partial_out *) @@ -212,12 +243,15 @@ let output con = (* after sending one packet, partial is empty *) con.partial_out = "" +(* we can only process an input packet if we're guaranteed to have room + to store the response packet *) +let can_input con = Queue.can_push con.pkt_out CommandReply + (* NB: can throw Reconnect *) let input con = - let to_read = - match con.partial_in with - | HaveHdr partial_pkt -> Partial.to_complete partial_pkt - | NoHdr (i, _) -> i in + if not (can_input con) then None + else + let to_read = to_read con in (* try to get more data from input stream *) let b = Bytes.make to_read '\000' in @@ -243,11 +277,22 @@ let input con = None ) -let newcon backend = { +let classify t = + match t.Packet.ty with + | Op.Watchevent -> Watchevent + | _ -> CommandReply + +let newcon ~capacity backend = + let limit = function + | CommandReply -> capacity.maxoutstanding + | Watchevent -> capacity.maxwatchevents + in + { backend = backend; - pkt_out = Queue.create (); + pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + capacity.maxwatchevents) ~classify ~limit; partial_in = init_partial_in (); partial_out = ""; + capacity = capacity; } let open_fd fd = newcon (Fd { fd = fd; }) diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli index 91c682162c..71b2754ca7 100644 --- a/tools/ocaml/libs/xb/xb.mli +++ b/tools/ocaml/libs/xb/xb.mli @@ -66,10 +66,11 @@ type backend_mmap = { type backend_fd = { fd : Unix.file_descr; } type backend = Fd of backend_fd | Xenmmap of backend_mmap type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes +type capacity = { maxoutstanding: int; maxwatchevents: int } type t val init_partial_in : unit -> partial_buf val reconnect : t -> unit -val queue : t -> Packet.t -> unit +val queue : t -> Packet.t -> unit option val read_fd : backend_fd -> 'a -> bytes -> int -> int val read_mmap : backend_mmap -> 'a -> bytes -> int -> int val read : t -> bytes -> int -> int @@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int val write : t -> string -> int -> int val output : t -> bool val input : t -> Packet.t option -val newcon : backend -> t -val open_fd : Unix.file_descr -> t -val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t +val newcon : capacity:capacity -> backend -> t +val open_fd : Unix.file_descr -> capacity:capacity -> t +val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity -> t val close : t -> unit val is_fd : t -> bool val is_mmap : t -> bool val output_len : t -> int +val can_input: t -> bool val has_new_output : t -> bool val has_old_output : t -> bool val has_output : t -> bool @@ -93,3 +95,4 @@ val has_partial_input : t -> bool val has_more_input : t -> bool val is_selectable : t -> bool val get_fd : t -> Unix.file_descr +val debug: t -> string diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml index 9ff5bbd529..4e532cdaea 100644 --- a/tools/ocaml/libs/xs/queueop.ml +++ b/tools/ocaml/libs/xs/queueop.ml @@ -16,9 +16,10 @@ open Xenbus let data_concat ls = (String.concat "\000" ls) ^ "\000" +let queue con pkt = let r = Xb.queue con pkt in assert (r <> None) let queue_path ty (tid: int) (path: string) con = let data = data_concat [ path; ] in - Xb.queue con (Xb.Packet.create tid 0 ty data) + queue con (Xb.Packet.create tid 0 ty data) (* operations *) let directory tid path con = queue_path Xb.Op.Directory tid path con @@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con let getperms tid path con = queue_path Xb.Op.Getperms tid path con let debug commands con = - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands)) + queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands)) let watch path data con = let data = data_concat [ path; data; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data) + queue con (Xb.Packet.create 0 0 Xb.Op.Watch data) let unwatch path data con = let data = data_concat [ path; data; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data) + queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data) let transaction_start con = - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat [])) + queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat [])) let transaction_end tid commit con = let data = data_concat [ (if commit then "T" else "F"); ] in - Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data) + queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data) let introduce domid mfn port con = let data = data_concat [ Printf.sprintf "%u" domid; Printf.sprintf "%nu" mfn; string_of_int port; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data) + queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data) let release domid con = let data = data_concat [ Printf.sprintf "%u" domid; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data) + queue con (Xb.Packet.create 0 0 Xb.Op.Release data) let resume domid con = let data = data_concat [ Printf.sprintf "%u" domid; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data) + queue con (Xb.Packet.create 0 0 Xb.Op.Resume data) let getdomainpath domid con = let data = data_concat [ Printf.sprintf "%u" domid; ] in - Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data) + queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data) let write tid path value con = let data = path ^ "\000" ^ value (* no NULL at the end *) in - Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data) + queue con (Xb.Packet.create tid 0 Xb.Op.Write data) let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con let rm tid path con = queue_path Xb.Op.Rm tid path con let setperms tid path perms con = let data = data_concat [ path; perms ] in - Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data) + queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data) diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml index 451f8b38db..cbd1728060 100644 --- a/tools/ocaml/libs/xs/xsraw.ml +++ b/tools/ocaml/libs/xs/xsraw.ml @@ -36,8 +36,10 @@ type con = { let close con = Xb.close con.xb +let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; } + let open_fd fd = { - xb = Xb.open_fd fd; + xb = Xb.open_fd ~capacity fd; watchevents = Queue.create (); } diff --git a/tools/ocaml/xenstored/connection.ml b/tools/ocaml/xenstored/connection.ml index 3f6a8f1ad0..54f7f76516 100644 --- a/tools/ocaml/xenstored/connection.ml +++ b/tools/ocaml/xenstored/connection.ml @@ -20,12 +20,84 @@ open Stdext let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *) +type 'a bounded_sender = 'a -> unit option +(** a bounded sender accepts an ['a] item and returns: + None - if there is no room to accept the item + Some () - if it has successfully accepted/sent the item + *) + +module BoundedPipe : sig + type 'a t + + (** [create ~capacity ~destination] creates a bounded pipe with a + local buffer holding at most [capacity] items. Once the buffer is + full it will not accept further items. items from the pipe are + flushed into [destination] as long as it accepts items. The + destination could be another pipe. + *) + val create: capacity:int -> destination:'a bounded_sender -> 'a t + + (** [is_empty t] returns whether the local buffer of [t] is empty. *) + val is_empty : _ t -> bool + + (** [length t] the number of items in the internal buffer *) + val length: _ t -> int + + (** [flush_pipe t] sends as many items from the local buffer as possible, + which could be none. *) + val flush_pipe: _ t -> unit + + (** [push t item] tries to [flush_pipe] and then push [item] + into the pipe if its [capacity] allows. + Returns [None] if there is no more room + *) + val push : 'a t -> 'a bounded_sender +end = struct + (* items are enqueued in [q], and then flushed to [connect_to] *) + type 'a t = + { q: 'a Queue.t + ; destination: 'a bounded_sender + ; capacity: int + } + + let create ~capacity ~destination = + { q = Queue.create (); capacity; destination } + + let rec flush_pipe t = + if not Queue.(is_empty t.q) then + let item = Queue.peek t.q in + match t.destination item with + | None -> () (* no room *) + | Some () -> + (* successfully sent item to next stage *) + let _ = Queue.pop t.q in + (* continue trying to send more items *) + flush_pipe t + + let push t item = + (* first try to flush as many items from this pipe as possible to make room, + it is important to do this first to preserve the order of the items + *) + flush_pipe t; + if Queue.length t.q < t.capacity then begin + (* enqueue, instead of sending directly. + this ensures that [out] sees the items in the same order as we receive them + *) + Queue.push item t.q; + Some (flush_pipe t) + end else None + + let is_empty t = Queue.is_empty t.q + let length t = Queue.length t.q +end + type watch = { con: t; token: string; path: string; base: string; is_relative: bool; + pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t; } and t = { @@ -38,8 +110,36 @@ and t = { anonid: int; mutable stat_nb_ops: int; mutable perm: Perms.Connection.t; + pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t } +module Watch = struct + module T = struct + type t = watch + + let compare w1 w2 = + (* cannot compare watches from different connections *) + assert (w1.con == w2.con); + match String.compare w1.token w2.token with + | 0 -> String.compare w1.path w2.path + | n -> n + end + module Set = Set.Make(T) + + let flush_events t = + BoundedPipe.flush_pipe t.pending_watchevents; + not (BoundedPipe.is_empty t.pending_watchevents) + + let pending_watchevents t = + BoundedPipe.length t.pending_watchevents +end + +let source_flush_watchevents t = + BoundedPipe.flush_pipe t.pending_source_watchevents + +let source_pending_watchevents t = + BoundedPipe.length t.pending_source_watchevents + let mark_as_bad con = match con.dom with |None -> () @@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = { token = token; path = path; base = get_path con; - is_relative = path.[0] <> '/' && path.[0] <> '@' + is_relative = path.[0] <> '/' && path.[0] <> '@'; + pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb) } let get_con w = w.con @@ -93,6 +194,9 @@ let make_perm dom = Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid let create xbcon dom = + let destination (watch, pkt) = + BoundedPipe.push watch.pending_watchevents pkt + in let id = match dom with | None -> let old = !anon_id_next in incr anon_id_next; old @@ -109,6 +213,16 @@ let create xbcon dom = anonid = id; stat_nb_ops = 0; perm = make_perm dom; + + (* the actual capacity will be lower, this is used as an overflow + buffer: anything that doesn't fit elsewhere gets put here, only + limited by the amount of watches that you can generate with a + single xenstore command (which is finite, although possibly very + large in theory for Dom0). Once the pipe here has any contents the + domain is blocked from sending more commands until it is empty + again though. + *) + pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination } in Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con); @@ -127,11 +241,17 @@ let set_target con target_domid = let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb -let send_reply con tid rid ty data = +let packet_of con tid rid ty data = if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then - Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000") + Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000" else - Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data) + Xenbus.Xb.Packet.create tid rid ty data + +let send_reply con tid rid ty data = + let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in + (* should never happen: we only process an input packet when there is room for an output packet *) + (* and the limit for replies is different from the limit for watch events *) + assert (result <> None) let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000") let send_ack con tid rid ty = send_reply con tid rid ty "OK\000" @@ -181,11 +301,11 @@ let del_watch con path token = apath, w let del_watches con = - Hashtbl.clear con.watches; + Hashtbl.reset con.watches; con.nb_watches <- 0 let del_transactions con = - Hashtbl.clear con.transactions + Hashtbl.reset con.transactions let list_watches con = let ll = Hashtbl.fold @@ -208,21 +328,29 @@ let lookup_watch_perm path = function let lookup_watch_perms oldroot root path = lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root) -let fire_single_watch_unchecked watch = +let fire_single_watch_unchecked source watch = let data = Utils.join_by_null [watch.path; watch.token; ""] in - send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data + let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in + + match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with + | Some () -> () (* packet queued *) + | None -> + (* a well behaved Dom0 shouldn't be able to trigger this, + if it happens it is likely a Dom0 bug causing runaway memory usage + *) + failwith "watch event overflow, cannot happen" -let fire_single_watch (oldroot, root) watch = +let fire_single_watch source (oldroot, root) watch = let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in let perms = lookup_watch_perms oldroot root abspath in if Perms.can_fire_watch watch.con.perm perms then - fire_single_watch_unchecked watch + fire_single_watch_unchecked source watch else let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in let con = get_domstr watch.con in Logging.watch_not_fired ~con perms (Store.Path.to_string abspath) -let fire_watch roots watch path = +let fire_watch source roots watch path = let new_path = if watch.is_relative && path.[0] = '/' then begin @@ -232,7 +360,7 @@ let fire_watch roots watch path = end else path in - fire_single_watch roots { watch with path = new_path } + fire_single_watch source roots { watch with path = new_path } (* Search for a valid unused transaction id. *) let rec valid_transaction_id con proposed_id = @@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb let has_partial_input con = Xenbus.Xb.has_partial_input con.xb let has_more_input con = Xenbus.Xb.has_more_input con.xb +let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents let has_output con = Xenbus.Xb.has_output con.xb let has_old_output con = Xenbus.Xb.has_old_output con.xb let has_new_output con = Xenbus.Xb.has_new_output con.xb @@ -322,7 +451,7 @@ let prevents_live_update con = not (is_bad con) && (has_extra_connection_data con || has_transaction_data con) let has_more_work con = - has_more_input con || not (has_old_output con) && has_new_output con + (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1 diff --git a/tools/ocaml/xenstored/connections.ml b/tools/ocaml/xenstored/connections.ml index 3c7429fe7f..7d68c583b4 100644 --- a/tools/ocaml/xenstored/connections.ml +++ b/tools/ocaml/xenstored/connections.ml @@ -22,22 +22,30 @@ type t = { domains: (int, Connection.t) Hashtbl.t; ports: (Xeneventchn.t, Connection.t) Hashtbl.t; mutable watches: Connection.watch list Trie.t; + mutable has_pending_watchevents: Connection.Watch.Set.t } let create () = { anonymous = Hashtbl.create 37; domains = Hashtbl.create 37; ports = Hashtbl.create 37; - watches = Trie.create () + watches = Trie.create (); + has_pending_watchevents = Connection.Watch.Set.empty; } +let get_capacity () = + (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *) + { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents } + let add_anonymous cons fd = - let xbcon = Xenbus.Xb.open_fd fd in + let capacity = get_capacity () in + let xbcon = Xenbus.Xb.open_fd fd ~capacity in let con = Connection.create xbcon None in Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con let add_domain cons dom = - let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> Domain.notify dom) in + let capacity = get_capacity () in + let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in let con = Connection.create xbcon (Some dom) in Hashtbl.add cons.domains (Domain.get_id dom) con; match Domain.get_port dom with @@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons = Hashtbl.fold (fun _ con (ins, outs) -> if (only_if con) then ( let fd = Connection.get_fd con in - (fd :: ins, if Connection.has_output con then fd :: outs else outs) + let in_fds = if Connection.can_input con then fd :: ins else ins in + let out_fds = if Connection.has_output con then fd :: outs else outs in + in_fds, out_fds ) else (ins, outs) ) cons.anonymous ([], []) @@ -67,10 +77,17 @@ let del_watches_of_con con watches = | [] -> None | ws -> Some ws +let del_watches cons con = + Connection.del_watches con; + cons.watches <- Trie.map (del_watches_of_con con) cons.watches; + cons.has_pending_watchevents <- + cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w -> + Connection.get_con w != con + let del_anonymous cons con = try Hashtbl.remove cons.anonymous (Connection.get_fd con); - cons.watches <- Trie.map (del_watches_of_con con) cons.watches; + del_watches cons con; Connection.close con with exn -> debug "del anonymous %s" (Printexc.to_string exn) @@ -85,7 +102,7 @@ let del_domain cons id = | Some p -> Hashtbl.remove cons.ports p | None -> ()) | None -> ()); - cons.watches <- Trie.map (del_watches_of_con con) cons.watches; + del_watches cons con; Connection.close con with exn -> debug "del domain %u: %s" id (Printexc.to_string exn) @@ -136,31 +153,33 @@ let del_watch cons con path token = cons.watches <- Trie.set cons.watches key watches; watch -let del_watches cons con = - Connection.del_watches con; - cons.watches <- Trie.map (del_watches_of_con con) cons.watches - (* path is absolute *) -let fire_watches ?oldroot root cons path recurse = +let fire_watches ?oldroot source root cons path recurse = let key = key_of_path path in let path = Store.Path.to_string path in let roots = oldroot, root in let fire_watch _ = function | None -> () - | Some watches -> List.iter (fun w -> Connection.fire_watch roots w path) watches + | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches in let fire_rec _x = function | None -> () | Some watches -> - List.iter (Connection.fire_single_watch roots) watches + List.iter (Connection.fire_single_watch source roots) watches in Trie.iter_path fire_watch cons.watches key; if recurse then Trie.iter fire_rec (Trie.sub cons.watches key) +let send_watchevents cons con = + cons.has_pending_watchevents <- + cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events; + Connection.source_flush_watchevents con + let fire_spec_watches root cons specpath = + let source = find_domain cons 0 in iter cons (fun con -> - List.iter (Connection.fire_single_watch (None, root)) (Connection.get_watches con specpath)) + List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath)) let set_target cons domain target_domain = let con = find_domain cons domain in @@ -197,6 +216,16 @@ let debug cons = let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in String.concat "" (domains @ anonymous) +let debug_watchevents cons con = + (* == (physical equality) + has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular + comparison to fail due to having a 'functional value' which cannot be compared. + *) + let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in + let pending = s |> Connection.Watch.Set.elements + |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in + Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending + let filter ~f cons = let fold _ v acc = if f v then v :: acc else acc in [] diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml index ba63a8147e..327b6d795e 100644 --- a/tools/ocaml/xenstored/define.ml +++ b/tools/ocaml/xenstored/define.ml @@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir let maxwatch = ref (100) let maxtransaction = ref (10) let maxrequests = ref (1024) (* maximum requests per transaction *) +let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. in-flight requests / domain *) +let maxwatchevents = ref (1024) +(* + maximum outstanding watch events per watch, + recommended >= maxoutstanding to avoid blocking backend transactions due to + malicious frontends + *) let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *) let conflict_burst_limit = ref 5.0 diff --git a/tools/ocaml/xenstored/oxenstored.conf.in b/tools/ocaml/xenstored/oxenstored.conf.in index 4ae48e42d4..9d034e744b 100644 --- a/tools/ocaml/xenstored/oxenstored.conf.in +++ b/tools/ocaml/xenstored/oxenstored.conf.in @@ -62,6 +62,8 @@ quota-maxwatch = 100 quota-transaction = 10 quota-maxrequests = 1024 quota-path-max = 1024 +quota-maxoutstanding = 1024 +quota-maxwatchevents = 1024 # Activate filed base backend persistent = false diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml index 50ef05be41..6781088387 100644 --- a/tools/ocaml/xenstored/process.ml +++ b/tools/ocaml/xenstored/process.ml @@ -57,7 +57,7 @@ let split_one_path data con = | path :: "" :: [] -> Store.Path.create path (Connection.get_path con) | _ -> raise Invalid_Cmd_Args -let process_watch t cons = +let process_watch source t cons = let oldroot = t.Transaction.oldroot in let newroot = Store.get_root t.Transaction.store in let ops = Transaction.get_paths t |> List.rev in @@ -67,8 +67,9 @@ let process_watch t cons = | Xenbus.Xb.Op.Rm -> true, None, oldroot | Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot | _ -> raise (Failure "huh ?") in - Connections.fire_watches ?oldroot root cons (snd op) recurse in - List.iter (fun op -> do_op_watch op cons) ops + Connections.fire_watches ?oldroot source root cons (snd op) recurse in + List.iter (fun op -> do_op_watch op cons) ops; + Connections.send_watchevents cons source let create_implicit_path t perm path = let dirname = Store.Path.get_parent path in @@ -234,6 +235,20 @@ let do_debug con t _domains cons data = | "watches" :: _ -> let watches = Connections.debug cons in Some (watches ^ "\000") + | "xenbus" :: domid :: _ -> + let domid = int_of_string domid in + let con = Connections.find_domain cons domid in + let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, has_more_work: %b. pending: %s" + (Xenbus.Xb.debug con.xb) + (Connection.source_pending_watchevents con) + (Connection.can_input con) + (Connection.has_more_input con) + (Connection.has_old_output con) + (Connection.has_new_output con) + (Connection.has_more_work con) + (Connections.debug_watchevents cons con) + in + Some s | "mfn" :: domid :: _ -> let domid = int_of_string domid in let con = Connections.find_domain cons domid in @@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data = fct con t doms cons data; Packet.Ack (fun () -> if Transaction.get_id t = Transaction.none then - process_watch t cons + process_watch con t cons ) let reply_data fct con t doms cons data = @@ -501,7 +516,7 @@ let do_watch con _t _domains cons data = Packet.Ack (fun () -> (* xenstore.txt says this watch is fired immediately, implying even if path doesn't exist or is unreadable *) - Connection.fire_single_watch_unchecked watch) + Connection.fire_single_watch_unchecked con watch) let do_unwatch con _t _domains cons data = let (node, token) = @@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data = if not success then raise Transaction_again; if commit then begin - process_watch t cons; + process_watch con t cons; match t.Transaction.ty with | Transaction.No -> () (* no need to record anything *) @@ -701,7 +716,8 @@ let process_packet ~store ~cons ~doms ~con ~req = let do_input store cons doms con = let newpacket = try - Connection.do_input con + if Connection.can_input con then Connection.do_input con + else None with Xenbus.Xb.Reconnect -> info "%s requests a reconnect" (Connection.get_domstr con); History.reconnect con; @@ -729,6 +745,7 @@ let do_input store cons doms con = Connection.incr_ops con let do_output _store _cons _doms con = + Connection.source_flush_watchevents con; if Connection.has_output con then ( if Connection.has_new_output con then ( let packet = Connection.peek_output con in diff --git a/tools/ocaml/xenstored/xenstored.ml b/tools/ocaml/xenstored/xenstored.ml index 3a932f54a6..ffd43a4eee 100644 --- a/tools/ocaml/xenstored/xenstored.ml +++ b/tools/ocaml/xenstored/xenstored.ml @@ -103,6 +103,8 @@ let parse_config filename = ("quota-maxentity", Config.Set_int Quota.maxent); ("quota-maxsize", Config.Set_int Quota.maxsize); ("quota-maxrequests", Config.Set_int Define.maxrequests); + ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding); + ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents); ("quota-path-max", Config.Set_int Define.path_max); ("gc-max-overhead", Config.Set_int Define.gc_max_overhead); ("test-eagain", Config.Set_bool Transaction.test_eagain); -- generated by git-patchbot for /home/xen/git/xen.git#master
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |