[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[xen staging] tools/ocaml: Limit maximum in-flight requests / outstanding replies



commit 9284ae0c40fb5b9606947eaaec23dc71d0540e96
Author:     Edwin Török <edvin.torok@xxxxxxxxxx>
AuthorDate: Wed Oct 12 19:13:04 2022 +0100
Commit:     Andrew Cooper <andrew.cooper3@xxxxxxxxxx>
CommitDate: Tue Nov 1 13:05:44 2022 +0000

    tools/ocaml: Limit maximum in-flight requests / outstanding replies
    
    Introduce a limit on the number of outstanding reply packets in the xenbus
    queue.  This limits the number of in-flight requests: when the output queue 
is
    full we'll stop processing inputs until the output queue has room again.
    
    To avoid a busy loop on the Unix socket we only add it to the watched input
    file descriptor set if we'd be able to call `input` on it.  Even though Dom0
    is trusted and exempt from quotas a flood of events might cause a backlog
    where events are produced faster than daemons in Dom0 can consume them, 
which
    could lead to an unbounded queue size and OOM.
    
    Therefore the xenbus queue limit must apply to all connections, Dom0 is not
    exempt from it, although if everything works correctly it will eventually
    catch up.
    
    This prevents a malicious guest from sending more commands while it has
    outstanding watch events or command replies in its input ring.  However if 
it
    can cause the generation of watch events by other means (e.g. by Dom0, or
    another cooperative guest) and stop reading its own ring then watch events
    would've queued up without limit.
    
    The xenstore protocol doesn't have a back-pressure mechanism, and doesn't
    allow dropping watch events.  In fact, dropping watch events is known to 
break
    some pieces of normal functionality.  This leaves little choice to safely
    implement the xenstore protocol without exposing the xenstore daemon to
    out-of-memory attacks.
    
    Implement the fix as pipes with bounded buffers:
    * Use a bounded buffer for watch events
    * The watch structure will have a bounded receiving pipe of watch events
    * The source will have an "overflow" pipe of pending watch events it 
couldn't
      deliver
    
    Items are queued up on one end and are sent as far along the pipe as 
possible:
    
      source domain -> watch -> xenbus of target -> xenstore ring/socket of 
target
    
    If the pipe is "full" at any point then back-pressure is applied and we 
prevent
    more items from being queued up.  For the source domain this means that 
we'll
    stop accepting new commands as long as its pipe buffer is not empty.
    
    Before we try to enqueue an item we first check whether it is possible to 
send
    it further down the pipe, by attempting to recursively flush the pipes. This
    ensures that we retain the order of events as much as possible.
    
    We might break causality of watch events if the target domain's queue is 
full
    and we need to start using the watch's queue.  This is a breaking change in
    the xenstore protocol, but only for domains which are not processing their
    incoming ring as expected.
    
    When a watch is deleted its entire pending queue is dropped (no code is 
needed
    for that, because it is part of the 'watch' type).
    
    There is a cache of watches that have pending events that we attempt to 
flush
    at every cycle if possible.
    
    Introduce 3 limits here:
    * quota-maxwatchevents on watch event destination: when this is hit the
      source will not be allowed to queue up more watch events.
    * quota-maxoustanding which is the number of responses not read from the 
ring:
      once exceeded, no more inputs are processed until all outstanding replies
      are consumed by the client.
    * overflow queue on the watch event source: all watches that cannot be 
stored
      on destination are queued up here, a single command can trigger multiple
      watches (e.g. due to recursion).
    
    The overflow queue currently doesn't have an upper bound, it is difficult to
    accurately calculate one as it depends on whether you are Dom0 and how many
    watches each path has registered and how many watch events you can trigger
    with a single command (e.g. a commit).  However these events were already
    using memory, this just moves them elsewhere, and as long as we correctly
    block a domain it shouldn't result in unbounded memory usage.
    
    Note that Dom0 is not excluded from these checks, it is important that Dom0 
is
    especially not excluded when it is the source, since there are many ways in
    which a guest could trigger Dom0 to send it watch events.
    
    This should protect against malicious frontends as long as the backend 
follows
    the PV xenstore protocol and only exposes paths needed by the frontend, and
    changes those paths at most once as a reaction to guest events, or protocol
    state.
    
    The queue limits are per watch, and per domain-pair, so even if one
    communication channel would be "blocked", others would keep working, and the
    domain itself won't get blocked as long as it doesn't overflow the queue of
    watch events.
    
    Similarly a malicious backend could cause the frontend to get blocked, but
    this watch queue protects the frontend as well as long as it follows the PV
    protocol.  (Although note that protection against malicious backends is 
only a
    best effort at the moment)
    
    This is part of XSA-326 / CVE-2022-42318.
    
    Signed-off-by: Edwin Török <edvin.torok@xxxxxxxxxx>
    Acked-by: Christian Lindig <christian.lindig@xxxxxxxxxx>
---
 tools/ocaml/libs/xb/xb.ml                |  61 ++++++++++--
 tools/ocaml/libs/xb/xb.mli               |  11 ++-
 tools/ocaml/libs/xs/queueop.ml           |  25 ++---
 tools/ocaml/libs/xs/xsraw.ml             |   4 +-
 tools/ocaml/xenstored/connection.ml      | 155 ++++++++++++++++++++++++++++---
 tools/ocaml/xenstored/connections.ml     |  57 +++++++++---
 tools/ocaml/xenstored/define.ml          |   7 ++
 tools/ocaml/xenstored/oxenstored.conf.in |   2 +
 tools/ocaml/xenstored/process.ml         |  31 +++++--
 tools/ocaml/xenstored/xenstored.ml       |   2 +
 10 files changed, 296 insertions(+), 59 deletions(-)

diff --git a/tools/ocaml/libs/xb/xb.ml b/tools/ocaml/libs/xb/xb.ml
index 4197a3888a..b292ed7a87 100644
--- a/tools/ocaml/libs/xb/xb.ml
+++ b/tools/ocaml/libs/xb/xb.ml
@@ -134,14 +134,44 @@ type backend = Fd of backend_fd | Xenmmap of backend_mmap
 
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
 
+(*
+       separate capacity reservation for replies and watch events:
+       this allows a domain to keep working even when under a constant flood of
+       watch events
+*)
+type capacity = { maxoutstanding: int; maxwatchevents: int }
+
+module Queue = BoundedQueue
+
+type packet_class =
+       | CommandReply
+       | Watchevent
+
+let string_of_packet_class = function
+       | CommandReply -> "command_reply"
+       | Watchevent -> "watch_event"
+
 type t =
 {
        backend: backend;
-       pkt_out: Packet.t Queue.t;
+       pkt_out: (Packet.t, packet_class) Queue.t;
        mutable partial_in: partial_buf;
        mutable partial_out: string;
+       capacity: capacity
 }
 
+let to_read con =
+       match con.partial_in with
+               | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
+               | NoHdr   (i, _)    -> i
+
+let debug t =
+       Printf.sprintf "XenBus state: partial_in: %d needed, partial_out: %d 
bytes, pkt_out: %d packets, %s"
+               (to_read t)
+               (String.length t.partial_out)
+               (Queue.length t.pkt_out)
+               (BoundedQueue.debug string_of_packet_class t.pkt_out)
+
 let init_partial_in () = NoHdr
        (Partial.header_size (), Bytes.make (Partial.header_size()) '\000')
 
@@ -199,7 +229,8 @@ let output con =
        let s = if String.length con.partial_out > 0 then
                        con.partial_out
                else if Queue.length con.pkt_out > 0 then
-                       Packet.to_string (Queue.pop con.pkt_out)
+                       let pkt = Queue.pop con.pkt_out in
+                       Packet.to_string pkt
                else
                        "" in
        (* send data from s, and save the unsent data to partial_out *)
@@ -212,12 +243,15 @@ let output con =
        (* after sending one packet, partial is empty *)
        con.partial_out = ""
 
+(* we can only process an input packet if we're guaranteed to have room
+   to store the response packet *)
+let can_input con = Queue.can_push con.pkt_out CommandReply
+
 (* NB: can throw Reconnect *)
 let input con =
-       let to_read =
-               match con.partial_in with
-               | HaveHdr partial_pkt -> Partial.to_complete partial_pkt
-               | NoHdr   (i, _)    -> i in
+       if not (can_input con) then None
+       else
+       let to_read = to_read con in
 
        (* try to get more data from input stream *)
        let b = Bytes.make to_read '\000' in
@@ -243,11 +277,22 @@ let input con =
                None
        )
 
-let newcon backend = {
+let classify t =
+       match t.Packet.ty with
+       | Op.Watchevent -> Watchevent
+       | _ -> CommandReply
+
+let newcon ~capacity backend =
+       let limit = function
+               | CommandReply -> capacity.maxoutstanding
+               | Watchevent -> capacity.maxwatchevents
+       in
+       {
        backend = backend;
-       pkt_out = Queue.create ();
+       pkt_out = Queue.create ~capacity:(capacity.maxoutstanding + 
capacity.maxwatchevents) ~classify ~limit;
        partial_in = init_partial_in ();
        partial_out = "";
+       capacity = capacity;
        }
 
 let open_fd fd = newcon (Fd { fd = fd; })
diff --git a/tools/ocaml/libs/xb/xb.mli b/tools/ocaml/libs/xb/xb.mli
index 91c682162c..71b2754ca7 100644
--- a/tools/ocaml/libs/xb/xb.mli
+++ b/tools/ocaml/libs/xb/xb.mli
@@ -66,10 +66,11 @@ type backend_mmap = {
 type backend_fd = { fd : Unix.file_descr; }
 type backend = Fd of backend_fd | Xenmmap of backend_mmap
 type partial_buf = HaveHdr of Partial.pkt | NoHdr of int * bytes
+type capacity = { maxoutstanding: int; maxwatchevents: int }
 type t
 val init_partial_in : unit -> partial_buf
 val reconnect : t -> unit
-val queue : t -> Packet.t -> unit
+val queue : t -> Packet.t -> unit option
 val read_fd : backend_fd -> 'a -> bytes -> int -> int
 val read_mmap : backend_mmap -> 'a -> bytes -> int -> int
 val read : t -> bytes -> int -> int
@@ -78,13 +79,14 @@ val write_mmap : backend_mmap -> 'a -> string -> int -> int
 val write : t -> string -> int -> int
 val output : t -> bool
 val input : t -> Packet.t option
-val newcon : backend -> t
-val open_fd : Unix.file_descr -> t
-val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> t
+val newcon : capacity:capacity -> backend -> t
+val open_fd : Unix.file_descr -> capacity:capacity -> t
+val open_mmap : Xenmmap.mmap_interface -> (unit -> unit) -> capacity:capacity 
-> t
 val close : t -> unit
 val is_fd : t -> bool
 val is_mmap : t -> bool
 val output_len : t -> int
+val can_input: t -> bool
 val has_new_output : t -> bool
 val has_old_output : t -> bool
 val has_output : t -> bool
@@ -93,3 +95,4 @@ val has_partial_input : t -> bool
 val has_more_input : t -> bool
 val is_selectable : t -> bool
 val get_fd : t -> Unix.file_descr
+val debug: t -> string
diff --git a/tools/ocaml/libs/xs/queueop.ml b/tools/ocaml/libs/xs/queueop.ml
index 9ff5bbd529..4e532cdaea 100644
--- a/tools/ocaml/libs/xs/queueop.ml
+++ b/tools/ocaml/libs/xs/queueop.ml
@@ -16,9 +16,10 @@
 open Xenbus
 
 let data_concat ls = (String.concat "\000" ls) ^ "\000"
+let queue con pkt = let r = Xb.queue con pkt in assert (r <> None)
 let queue_path ty (tid: int) (path: string) con =
        let data = data_concat [ path; ] in
-       Xb.queue con (Xb.Packet.create tid 0 ty data)
+       queue con (Xb.Packet.create tid 0 ty data)
 
 (* operations *)
 let directory tid path con = queue_path Xb.Op.Directory tid path con
@@ -27,48 +28,48 @@ let read tid path con = queue_path Xb.Op.Read tid path con
 let getperms tid path con = queue_path Xb.Op.Getperms tid path con
 
 let debug commands con =
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
+       queue con (Xb.Packet.create 0 0 Xb.Op.Debug (data_concat commands))
 
 let watch path data con =
        let data = data_concat [ path; data; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Watch data)
 
 let unwatch path data con =
        let data = data_concat [ path; data; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Unwatch data)
 
 let transaction_start con =
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat 
[]))
+       queue con (Xb.Packet.create 0 0 Xb.Op.Transaction_start (data_concat 
[]))
 
 let transaction_end tid commit con =
        let data = data_concat [ (if commit then "T" else "F"); ] in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Transaction_end data)
 
 let introduce domid mfn port con =
        let data = data_concat [ Printf.sprintf "%u" domid;
                                 Printf.sprintf "%nu" mfn;
                                 string_of_int port; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Introduce data)
 
 let release domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Release data)
 
 let resume domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Resume data)
 
 let getdomainpath domid con =
        let data = data_concat [ Printf.sprintf "%u" domid; ] in
-       Xb.queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
+       queue con (Xb.Packet.create 0 0 Xb.Op.Getdomainpath data)
 
 let write tid path value con =
        let data = path ^ "\000" ^ value (* no NULL at the end *) in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Write data)
 
 let mkdir tid path con = queue_path Xb.Op.Mkdir tid path con
 let rm tid path con = queue_path Xb.Op.Rm tid path con
 
 let setperms tid path perms con =
        let data = data_concat [ path; perms ] in
-       Xb.queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
+       queue con (Xb.Packet.create tid 0 Xb.Op.Setperms data)
diff --git a/tools/ocaml/libs/xs/xsraw.ml b/tools/ocaml/libs/xs/xsraw.ml
index 451f8b38db..cbd1728060 100644
--- a/tools/ocaml/libs/xs/xsraw.ml
+++ b/tools/ocaml/libs/xs/xsraw.ml
@@ -36,8 +36,10 @@ type con = {
 let close con =
        Xb.close con.xb
 
+let capacity = { Xb.maxoutstanding = 1; maxwatchevents = 0; }
+
 let open_fd fd = {
-       xb = Xb.open_fd fd;
+       xb = Xb.open_fd ~capacity fd;
        watchevents = Queue.create ();
 }
 
diff --git a/tools/ocaml/xenstored/connection.ml 
b/tools/ocaml/xenstored/connection.ml
index 3f6a8f1ad0..54f7f76516 100644
--- a/tools/ocaml/xenstored/connection.ml
+++ b/tools/ocaml/xenstored/connection.ml
@@ -20,12 +20,84 @@ open Stdext
 
 let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
 
+type 'a bounded_sender = 'a -> unit option
+(** a bounded sender accepts an ['a] item and returns:
+    None - if there is no room to accept the item
+    Some () -  if it has successfully accepted/sent the item
+ *)
+
+module BoundedPipe : sig
+       type 'a t
+
+       (** [create ~capacity ~destination] creates a bounded pipe with a
+           local buffer holding at most [capacity] items.  Once the buffer is
+           full it will not accept further items.  items from the pipe are
+           flushed into [destination] as long as it accepts items.  The
+           destination could be another pipe.
+        *)
+       val create: capacity:int -> destination:'a bounded_sender -> 'a t
+
+       (** [is_empty t] returns whether the local buffer of [t] is empty. *)
+       val is_empty : _ t -> bool
+
+       (** [length t] the number of items in the internal buffer *)
+       val length: _ t -> int
+
+       (** [flush_pipe t] sends as many items from the local buffer as 
possible,
+                       which could be none. *)
+       val flush_pipe: _ t -> unit
+
+       (** [push t item] tries to [flush_pipe] and then push [item]
+           into the pipe if its [capacity] allows.
+           Returns [None] if there is no more room
+        *)
+       val push : 'a t -> 'a bounded_sender
+end = struct
+       (* items are enqueued in [q], and then flushed to [connect_to] *)
+       type 'a t =
+               { q: 'a Queue.t
+               ; destination: 'a bounded_sender
+               ; capacity: int
+               }
+
+       let create ~capacity ~destination =
+               { q = Queue.create (); capacity; destination }
+
+       let rec flush_pipe t =
+               if not Queue.(is_empty t.q) then
+                       let item = Queue.peek t.q in
+                       match t.destination item with
+                       | None -> () (* no room *)
+                       | Some () ->
+                               (* successfully sent item to next stage *)
+                               let _ = Queue.pop t.q in
+                               (* continue trying to send more items *)
+                               flush_pipe t
+
+       let push t item =
+               (* first try to flush as many items from this pipe as possible 
to make room,
+                  it is important to do this first to preserve the order of 
the items
+                *)
+               flush_pipe t;
+               if Queue.length t.q < t.capacity then begin
+                       (* enqueue, instead of sending directly.
+                          this ensures that [out] sees the items in the same 
order as we receive them
+                        *)
+                       Queue.push item t.q;
+                       Some (flush_pipe t)
+               end else None
+
+       let is_empty t = Queue.is_empty t.q
+       let length t = Queue.length t.q
+end
+
 type watch = {
        con: t;
        token: string;
        path: string;
        base: string;
        is_relative: bool;
+       pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
 }
 
 and t = {
@@ -38,8 +110,36 @@ and t = {
        anonid: int;
        mutable stat_nb_ops: int;
        mutable perm: Perms.Connection.t;
+       pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
 }
 
+module Watch = struct
+       module T = struct
+               type t = watch
+
+               let compare w1 w2 =
+                       (* cannot compare watches from different connections *)
+                       assert (w1.con == w2.con);
+                       match String.compare w1.token w2.token with
+                       | 0 -> String.compare w1.path w2.path
+                       | n -> n
+       end
+       module Set = Set.Make(T)
+
+       let flush_events t =
+               BoundedPipe.flush_pipe t.pending_watchevents;
+               not (BoundedPipe.is_empty t.pending_watchevents)
+
+       let pending_watchevents t =
+               BoundedPipe.length t.pending_watchevents
+end
+
+let source_flush_watchevents t =
+       BoundedPipe.flush_pipe t.pending_source_watchevents
+
+let source_pending_watchevents t =
+       BoundedPipe.length t.pending_source_watchevents
+
 let mark_as_bad con =
        match con.dom with
        |None -> ()
@@ -67,7 +167,8 @@ let watch_create ~con ~path ~token = {
        token = token;
        path = path;
        base = get_path con;
-       is_relative = path.[0] <> '/' && path.[0] <> '@'
+       is_relative = path.[0] <> '/' && path.[0] <> '@';
+       pending_watchevents = BoundedPipe.create 
~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
 }
 
 let get_con w = w.con
@@ -93,6 +194,9 @@ let make_perm dom =
        Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
 
 let create xbcon dom =
+       let destination (watch, pkt) =
+               BoundedPipe.push watch.pending_watchevents pkt
+       in
        let id =
                match dom with
                | None -> let old = !anon_id_next in incr anon_id_next; old
@@ -109,6 +213,16 @@ let create xbcon dom =
        anonid = id;
        stat_nb_ops = 0;
        perm = make_perm dom;
+
+       (* the actual capacity will be lower, this is used as an overflow
+          buffer: anything that doesn't fit elsewhere gets put here, only
+          limited by the amount of watches that you can generate with a
+          single xenstore command (which is finite, although possibly very
+          large in theory for Dom0).  Once the pipe here has any contents the
+          domain is blocked from sending more commands until it is empty
+          again though.
+        *)
+       pending_source_watchevents = BoundedPipe.create 
~capacity:Sys.max_array_length ~destination
        }
        in
        Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
@@ -127,11 +241,17 @@ let set_target con target_domid =
 
 let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
 
-let send_reply con tid rid ty data =
+let packet_of con tid rid ty data =
        if (String.length data) > xenstore_payload_max && (is_backend_mmap con) 
then
-               Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid 
Xenbus.Xb.Op.Error "E2BIG\000")
+               Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
        else
-               Xenbus.Xb.queue con.xb (Xenbus.Xb.Packet.create tid rid ty data)
+               Xenbus.Xb.Packet.create tid rid ty data
+
+let send_reply con tid rid ty data =
+       let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
+       (* should never happen: we only process an input packet when there is 
room for an output packet *)
+       (* and the limit for replies is different from the limit for watch 
events *)
+       assert (result <> None)
 
 let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error 
(err ^ "\000")
 let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
@@ -181,11 +301,11 @@ let del_watch con path token =
        apath, w
 
 let del_watches con =
-  Hashtbl.clear con.watches;
+  Hashtbl.reset con.watches;
   con.nb_watches <- 0
 
 let del_transactions con =
-  Hashtbl.clear con.transactions
+  Hashtbl.reset con.transactions
 
 let list_watches con =
        let ll = Hashtbl.fold
@@ -208,21 +328,29 @@ let lookup_watch_perm path = function
 let lookup_watch_perms oldroot root path =
        lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
 
-let fire_single_watch_unchecked watch =
+let fire_single_watch_unchecked source watch =
        let data = Utils.join_by_null [watch.path; watch.token; ""] in
-       send_reply watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data
+       let pkt = packet_of watch.con Transaction.none 0 
Xenbus.Xb.Op.Watchevent data in
+
+       match BoundedPipe.push source.pending_source_watchevents (watch, pkt) 
with
+       | Some () -> () (* packet queued *)
+       | None ->
+                       (* a well behaved Dom0 shouldn't be able to trigger 
this,
+                          if it happens it is likely a Dom0 bug causing 
runaway memory usage
+                        *)
+                       failwith "watch event overflow, cannot happen"
 
-let fire_single_watch (oldroot, root) watch =
+let fire_single_watch source (oldroot, root) watch =
        let abspath = get_watch_path watch.con watch.path |> 
Store.Path.of_string in
        let perms = lookup_watch_perms oldroot root abspath in
        if Perms.can_fire_watch watch.con.perm perms then
-               fire_single_watch_unchecked watch
+               fire_single_watch_unchecked source watch
        else
                let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") 
|> String.concat ", " in
                let con = get_domstr watch.con in
                Logging.watch_not_fired ~con perms (Store.Path.to_string 
abspath)
 
-let fire_watch roots watch path =
+let fire_watch source roots watch path =
        let new_path =
                if watch.is_relative && path.[0] = '/'
                then begin
@@ -232,7 +360,7 @@ let fire_watch roots watch path =
                end else
                        path
        in
-       fire_single_watch roots { watch with path = new_path }
+       fire_single_watch source roots { watch with path = new_path }
 
 (* Search for a valid unused transaction id. *)
 let rec valid_transaction_id con proposed_id =
@@ -280,6 +408,7 @@ let do_input con = Xenbus.Xb.input con.xb
 let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
 let has_more_input con = Xenbus.Xb.has_more_input con.xb
 
+let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty 
con.pending_source_watchevents
 let has_output con = Xenbus.Xb.has_output con.xb
 let has_old_output con = Xenbus.Xb.has_old_output con.xb
 let has_new_output con = Xenbus.Xb.has_new_output con.xb
@@ -322,7 +451,7 @@ let prevents_live_update con = not (is_bad con)
        && (has_extra_connection_data con || has_transaction_data con)
 
 let has_more_work con =
-       has_more_input con || not (has_old_output con) && has_new_output con
+       (has_more_input con && can_input con) || not (has_old_output con) && 
has_new_output con
 
 let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
 
diff --git a/tools/ocaml/xenstored/connections.ml 
b/tools/ocaml/xenstored/connections.ml
index 3c7429fe7f..7d68c583b4 100644
--- a/tools/ocaml/xenstored/connections.ml
+++ b/tools/ocaml/xenstored/connections.ml
@@ -22,22 +22,30 @@ type t = {
        domains: (int, Connection.t) Hashtbl.t;
        ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
        mutable watches: Connection.watch list Trie.t;
+       mutable has_pending_watchevents: Connection.Watch.Set.t
 }
 
 let create () = {
        anonymous = Hashtbl.create 37;
        domains = Hashtbl.create 37;
        ports = Hashtbl.create 37;
-       watches = Trie.create ()
+       watches = Trie.create ();
+       has_pending_watchevents = Connection.Watch.Set.empty;
 }
 
+let get_capacity () =
+       (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
+       { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = 
!Define.maxwatchevents }
+
 let add_anonymous cons fd =
-       let xbcon = Xenbus.Xb.open_fd fd in
+       let capacity = get_capacity () in
+       let xbcon = Xenbus.Xb.open_fd fd ~capacity in
        let con = Connection.create xbcon None in
        Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
 
 let add_domain cons dom =
-       let xbcon = Xenbus.Xb.open_mmap (Domain.get_interface dom) (fun () -> 
Domain.notify dom) in
+       let capacity = get_capacity () in
+       let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) 
(fun () -> Domain.notify dom) in
        let con = Connection.create xbcon (Some dom) in
        Hashtbl.add cons.domains (Domain.get_id dom) con;
        match Domain.get_port dom with
@@ -48,7 +56,9 @@ let select ?(only_if = (fun _ -> true)) cons =
        Hashtbl.fold (fun _ con (ins, outs) ->
                if (only_if con) then (
                        let fd = Connection.get_fd con in
-                       (fd :: ins,  if Connection.has_output con then fd :: 
outs else outs)
+                       let in_fds = if Connection.can_input con then fd :: ins 
else ins in
+                       let out_fds = if Connection.has_output con then fd :: 
outs else outs in
+                       in_fds, out_fds
                ) else (ins, outs)
        )
        cons.anonymous ([], [])
@@ -67,10 +77,17 @@ let del_watches_of_con con watches =
        | [] -> None
        | ws -> Some ws
 
+let del_watches cons con =
+       Connection.del_watches con;
+       cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+       cons.has_pending_watchevents <-
+               cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ 
fun w ->
+               Connection.get_con w != con
+
 let del_anonymous cons con =
        try
                Hashtbl.remove cons.anonymous (Connection.get_fd con);
-               cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+               del_watches cons con;
                Connection.close con
        with exn ->
                debug "del anonymous %s" (Printexc.to_string exn)
@@ -85,7 +102,7 @@ let del_domain cons id =
                    | Some p -> Hashtbl.remove cons.ports p
                    | None -> ())
                 | None -> ());
-               cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
+               del_watches cons con;
                Connection.close con
        with exn ->
                debug "del domain %u: %s" id (Printexc.to_string exn)
@@ -136,31 +153,33 @@ let del_watch cons con path token =
                cons.watches <- Trie.set cons.watches key watches;
        watch
 
-let del_watches cons con =
-       Connection.del_watches con;
-       cons.watches <- Trie.map (del_watches_of_con con) cons.watches
-
 (* path is absolute *)
-let fire_watches ?oldroot root cons path recurse =
+let fire_watches ?oldroot source root cons path recurse =
        let key = key_of_path path in
        let path = Store.Path.to_string path in
        let roots = oldroot, root in
        let fire_watch _ = function
                | None         -> ()
-               | Some watches -> List.iter (fun w -> Connection.fire_watch 
roots w path) watches
+               | Some watches -> List.iter (fun w -> Connection.fire_watch 
source roots w path) watches
        in
        let fire_rec _x = function
                | None         -> ()
                | Some watches ->
-                       List.iter (Connection.fire_single_watch roots) watches
+                       List.iter (Connection.fire_single_watch source roots) 
watches
        in
        Trie.iter_path fire_watch cons.watches key;
        if recurse then
                Trie.iter fire_rec (Trie.sub cons.watches key)
 
+let send_watchevents cons con =
+       cons.has_pending_watchevents <-
+               cons.has_pending_watchevents |> Connection.Watch.Set.filter 
Connection.Watch.flush_events;
+       Connection.source_flush_watchevents con
+
 let fire_spec_watches root cons specpath =
+       let source = find_domain cons 0 in
        iter cons (fun con ->
-               List.iter (Connection.fire_single_watch (None, root)) 
(Connection.get_watches con specpath))
+               List.iter (Connection.fire_single_watch source (None, root)) 
(Connection.get_watches con specpath))
 
 let set_target cons domain target_domain =
        let con = find_domain cons domain in
@@ -197,6 +216,16 @@ let debug cons =
        let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: 
accu) cons.domains [] in
        String.concat "" (domains @ anonymous)
 
+let debug_watchevents cons con =
+       (* == (physical equality)
+          has to be used here because w.con.xb.backend might contain a 
[unit->unit] value causing regular
+          comparison to fail due to having a 'functional value' which cannot 
be compared.
+        *)
+       let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter 
(fun w -> w.con == con) in
+       let pending = s |> Connection.Watch.Set.elements
+               |> List.map (fun w -> Connection.Watch.pending_watchevents w) 
|> List.fold_left (+) 0 in
+       Printf.sprintf "Watches with pending events: %d, pending events total: 
%d" (Connection.Watch.Set.cardinal s) pending
+
 let filter ~f cons =
        let fold _ v acc = if f v then v :: acc else acc in
        []
diff --git a/tools/ocaml/xenstored/define.ml b/tools/ocaml/xenstored/define.ml
index ba63a8147e..327b6d795e 100644
--- a/tools/ocaml/xenstored/define.ml
+++ b/tools/ocaml/xenstored/define.ml
@@ -24,6 +24,13 @@ let default_config_dir = Paths.xen_config_dir
 let maxwatch = ref (100)
 let maxtransaction = ref (10)
 let maxrequests = ref (1024)   (* maximum requests per transaction *)
+let maxoutstanding = ref (1024) (* maximum outstanding requests, i.e. 
in-flight requests / domain *)
+let maxwatchevents = ref (1024)
+(*
+       maximum outstanding watch events per watch,
+       recommended >= maxoutstanding to avoid blocking backend transactions 
due to
+       malicious frontends
+ *)
 
 let gc_max_overhead = ref 120 (* 120% see comment in xenstored.ml *)
 let conflict_burst_limit = ref 5.0
diff --git a/tools/ocaml/xenstored/oxenstored.conf.in 
b/tools/ocaml/xenstored/oxenstored.conf.in
index 4ae48e42d4..9d034e744b 100644
--- a/tools/ocaml/xenstored/oxenstored.conf.in
+++ b/tools/ocaml/xenstored/oxenstored.conf.in
@@ -62,6 +62,8 @@ quota-maxwatch = 100
 quota-transaction = 10
 quota-maxrequests = 1024
 quota-path-max = 1024
+quota-maxoutstanding = 1024
+quota-maxwatchevents = 1024
 
 # Activate filed base backend
 persistent = false
diff --git a/tools/ocaml/xenstored/process.ml b/tools/ocaml/xenstored/process.ml
index 50ef05be41..6781088387 100644
--- a/tools/ocaml/xenstored/process.ml
+++ b/tools/ocaml/xenstored/process.ml
@@ -57,7 +57,7 @@ let split_one_path data con =
        | path :: "" :: [] -> Store.Path.create path (Connection.get_path con)
        | _                -> raise Invalid_Cmd_Args
 
-let process_watch t cons =
+let process_watch source t cons =
        let oldroot = t.Transaction.oldroot in
        let newroot = Store.get_root t.Transaction.store in
        let ops = Transaction.get_paths t |> List.rev in
@@ -67,8 +67,9 @@ let process_watch t cons =
                | Xenbus.Xb.Op.Rm       -> true, None, oldroot
                | Xenbus.Xb.Op.Setperms -> false, Some oldroot, newroot
                | _              -> raise (Failure "huh ?") in
-               Connections.fire_watches ?oldroot root cons (snd op) recurse in
-       List.iter (fun op -> do_op_watch op cons) ops
+               Connections.fire_watches ?oldroot source root cons (snd op) 
recurse in
+       List.iter (fun op -> do_op_watch op cons) ops;
+       Connections.send_watchevents cons source
 
 let create_implicit_path t perm path =
        let dirname = Store.Path.get_parent path in
@@ -234,6 +235,20 @@ let do_debug con t _domains cons data =
        | "watches" :: _ ->
                let watches = Connections.debug cons in
                Some (watches ^ "\000")
+       | "xenbus" :: domid :: _ ->
+               let domid = int_of_string domid in
+               let con = Connections.find_domain cons domid in
+               let s = Printf.sprintf "xenbus: %s; overflow queue length: %d, 
can_input: %b, has_more_input: %b, has_old_output: %b, has_new_output: %b, 
has_more_work: %b. pending: %s"
+                       (Xenbus.Xb.debug con.xb)
+                       (Connection.source_pending_watchevents con)
+                       (Connection.can_input con)
+                       (Connection.has_more_input con)
+                       (Connection.has_old_output con)
+                       (Connection.has_new_output con)
+                       (Connection.has_more_work con)
+                       (Connections.debug_watchevents cons con)
+               in
+               Some s
        | "mfn" :: domid :: _ ->
                let domid = int_of_string domid in
                let con = Connections.find_domain cons domid in
@@ -342,7 +357,7 @@ let reply_ack fct con t doms cons data =
        fct con t doms cons data;
        Packet.Ack (fun () ->
                if Transaction.get_id t = Transaction.none then
-                       process_watch t cons
+                       process_watch con t cons
        )
 
 let reply_data fct con t doms cons data =
@@ -501,7 +516,7 @@ let do_watch con _t _domains cons data =
        Packet.Ack (fun () ->
                (* xenstore.txt says this watch is fired immediately,
                   implying even if path doesn't exist or is unreadable *)
-               Connection.fire_single_watch_unchecked watch)
+               Connection.fire_single_watch_unchecked con watch)
 
 let do_unwatch con _t _domains cons data =
        let (node, token) =
@@ -532,7 +547,7 @@ let do_transaction_end con t domains cons data =
        if not success then
                raise Transaction_again;
        if commit then begin
-               process_watch t cons;
+               process_watch con t cons;
                match t.Transaction.ty with
                | Transaction.No ->
                        () (* no need to record anything *)
@@ -701,7 +716,8 @@ let process_packet ~store ~cons ~doms ~con ~req =
 let do_input store cons doms con =
        let newpacket =
                try
-                       Connection.do_input con
+                       if Connection.can_input con then Connection.do_input con
+                       else None
                with Xenbus.Xb.Reconnect ->
                        info "%s requests a reconnect" (Connection.get_domstr 
con);
                        History.reconnect con;
@@ -729,6 +745,7 @@ let do_input store cons doms con =
                Connection.incr_ops con
 
 let do_output _store _cons _doms con =
+       Connection.source_flush_watchevents con;
        if Connection.has_output con then (
                if Connection.has_new_output con then (
                        let packet = Connection.peek_output con in
diff --git a/tools/ocaml/xenstored/xenstored.ml 
b/tools/ocaml/xenstored/xenstored.ml
index 3a932f54a6..ffd43a4eee 100644
--- a/tools/ocaml/xenstored/xenstored.ml
+++ b/tools/ocaml/xenstored/xenstored.ml
@@ -103,6 +103,8 @@ let parse_config filename =
                ("quota-maxentity", Config.Set_int Quota.maxent);
                ("quota-maxsize", Config.Set_int Quota.maxsize);
                ("quota-maxrequests", Config.Set_int Define.maxrequests);
+               ("quota-maxoutstanding", Config.Set_int Define.maxoutstanding);
+               ("quota-maxwatchevents", Config.Set_int Define.maxwatchevents);
                ("quota-path-max", Config.Set_int Define.path_max);
                ("gc-max-overhead", Config.Set_int Define.gc_max_overhead);
                ("test-eagain", Config.Set_bool Transaction.test_eagain);
--
generated by git-patchbot for /home/xen/git/xen.git#staging



 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.