[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH RFC 17/20] libxl/libxl_stream_read.c: track callback chains with an explicit phase



As the previous patch did for libxl_stream_write, do for
libxl_stream_read.  libxl_stream_read already has a notion of phase for
its record-buffering behaviour - this is combined with the callback
chain phase.  Again, this is done to support the addition of a new
callback chain for postcopy live migration.

No functional change.

Signed-off-by: Joshua Otto <jtotto@xxxxxxxxxxxx>
---
 tools/libxl/libxl_internal.h    |  7 ++--
 tools/libxl/libxl_stream_read.c | 83 +++++++++++++++++++++--------------------
 2 files changed, 45 insertions(+), 45 deletions(-)

diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index e99d2ef..c754706 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -3123,9 +3123,7 @@ struct libxl__stream_read_state {
     /* Private */
     int rc;
     bool running;
-    bool in_checkpoint;
     bool sync_teardown; /* Only used to coordinate shutdown on error path. */
-    bool in_checkpoint_state;
     libxl__save_helper_state shs;
     libxl__conversion_helper_state chs;
 
@@ -3135,8 +3133,9 @@ struct libxl__stream_read_state {
     LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue; /* NOGC */
     enum {
         SRS_PHASE_NORMAL,
-        SRS_PHASE_BUFFERING,
-        SRS_PHASE_UNBUFFERING,
+        SRS_PHASE_CHECKPOINT_BUFFERING,
+        SRS_PHASE_CHECKPOINT_UNBUFFERING,
+        SRS_PHASE_CHECKPOINT_STATE
     } phase;
     bool recursion_guard;
 
diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c
index 89c2f21..4cb553e 100644
--- a/tools/libxl/libxl_stream_read.c
+++ b/tools/libxl/libxl_stream_read.c
@@ -29,14 +29,15 @@
  * processed, and all records will be processed in queue order.
  *
  * Internal states:
- *           running  phase       in_         record   incoming
- *                                checkpoint  _queue   _record
+ *           running  phase                   record   incoming
+ *                                            _queue   _record
  *
- * Undefined    undef  undef        undef       undef    undef
- * Idle         false  undef        false       0        0
- * Active       true   NORMAL       false       0/1      0/partial
- * Active       true   BUFFERING    true        any      0/partial
- * Active       true   UNBUFFERING  true        any      0
+ * Undefined    undef  undef                    undef    undef
+ * Idle         false  undef                    0        0
+ * Active       true   NORMAL                   0/1      0/partial
+ * Active       true   CHECKPOINT_BUFFERING     any      0/partial
+ * Active       true   CHECKPOINT_UNBUFFERING   any      0
+ * Active       true   CHECKPOINT_STATE         0/1      0/partial
  *
  * While reading data from the stream, 'dc' is active and a callback
  * is expected.  Most actions in process_record() start a callback of
@@ -48,12 +49,12 @@
  *   Records are read one at time and immediately processed.  (The
  *   record queue will not contain more than a single record.)
  *
- * PHASE_BUFFERING:
+ * PHASE_CHECKPOINT_BUFFERING:
  *   This phase is used in checkpointed streams, when libxc signals
  *   the presence of a checkpoint in the stream.  Records are read and
  *   buffered until a CHECKPOINT_END record has been read.
  *
- * PHASE_UNBUFFERING:
+ * PHASE_CHECKPOINT_UNBUFFERING:
  *   Once a CHECKPOINT_END record has been read, all buffered records
  *   are processed.
  *
@@ -172,6 +173,12 @@ static void checkpoint_state_done(libxl__egc *egc,
 
 /*----- Helpers -----*/
 
+static inline bool stream_in_checkpoint(libxl__stream_read_state *stream)
+{
+    return stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING ||
+           stream->phase == SRS_PHASE_CHECKPOINT_UNBUFFERING;
+}
+
 /* Helper to set up reading some data from the stream. */
 static int setup_read(libxl__stream_read_state *stream,
                       const char *what, void *ptr, size_t nr_bytes,
@@ -210,7 +217,6 @@ void libxl__stream_read_init(libxl__stream_read_state 
*stream)
 
     stream->rc = 0;
     stream->running = false;
-    stream->in_checkpoint = false;
     stream->sync_teardown = false;
     FILLZERO(stream->dc);
     FILLZERO(stream->hdr);
@@ -297,10 +303,9 @@ void libxl__stream_read_start_checkpoint(libxl__egc *egc,
                                          libxl__stream_read_state *stream)
 {
     assert(stream->running);
-    assert(!stream->in_checkpoint);
+    assert(stream->phase == SRS_PHASE_NORMAL);
 
-    stream->in_checkpoint = true;
-    stream->phase = SRS_PHASE_BUFFERING;
+    stream->phase = SRS_PHASE_CHECKPOINT_BUFFERING;
 
     /*
      * Libxc has handed control of the fd to us.  Start reading some
@@ -392,6 +397,7 @@ static void stream_continue(libxl__egc *egc,
 
     switch (stream->phase) {
     case SRS_PHASE_NORMAL:
+    case SRS_PHASE_CHECKPOINT_STATE:
         /*
          * Normal phase (regular migration or restore from file):
          *
@@ -416,9 +422,9 @@ static void stream_continue(libxl__egc *egc,
         }
         break;
 
-    case SRS_PHASE_BUFFERING: {
+    case SRS_PHASE_CHECKPOINT_BUFFERING: {
         /*
-         * Buffering phase (checkpointed streams only):
+         * Buffering phase:
          *
          * logically:
          *   do { read_record(); } while ( not CHECKPOINT_END );
@@ -431,8 +437,6 @@ static void stream_continue(libxl__egc *egc,
         libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST(
             &stream->record_queue, libxl__sr_record_buf, entry);
 
-        assert(stream->in_checkpoint);
-
         if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) {
             setup_read_record(egc, stream);
             break;
@@ -442,19 +446,18 @@ static void stream_continue(libxl__egc *egc,
          * There are now some number of buffered records, with a
          * CHECKPOINT_END at the end. Start processing them all.
          */
-        stream->phase = SRS_PHASE_UNBUFFERING;
+        stream->phase = SRS_PHASE_CHECKPOINT_UNBUFFERING;
     }
         /* FALLTHROUGH */
-    case SRS_PHASE_UNBUFFERING:
+    case SRS_PHASE_CHECKPOINT_UNBUFFERING:
         /*
-         * Unbuffering phase (checkpointed streams only):
+         * Unbuffering phase:
          *
          * logically:
          *   do { process_record(); } while ( not CHECKPOINT_END );
          *
          * Process all records collected during the buffering phase.
          */
-        assert(stream->in_checkpoint);
 
         while (process_record(egc, stream))
             ; /*
@@ -625,7 +628,7 @@ static bool process_record(libxl__egc *egc,
         break;
 
     case REC_TYPE_CHECKPOINT_END:
-        if (!stream->in_checkpoint) {
+        if (!stream_in_checkpoint(stream)) {
             LOG(ERROR, "Unexpected CHECKPOINT_END record in stream");
             rc = ERROR_FAIL;
             goto err;
@@ -634,7 +637,7 @@ static bool process_record(libxl__egc *egc,
         break;
 
     case REC_TYPE_CHECKPOINT_STATE:
-        if (!stream->in_checkpoint_state) {
+        if (stream->phase != SRS_PHASE_CHECKPOINT_STATE) {
             LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream");
             rc = ERROR_FAIL;
             goto err;
@@ -743,7 +746,12 @@ static void stream_complete(libxl__egc *egc,
 {
     assert(stream->running);
 
-    if (stream->in_checkpoint) {
+    switch (stream->phase) {
+    case SRS_PHASE_NORMAL:
+        stream_done(egc, stream, rc);
+        break;
+    case SRS_PHASE_CHECKPOINT_BUFFERING:
+    case SRS_PHASE_CHECKPOINT_UNBUFFERING:
         assert(rc);
 
         /*
@@ -752,10 +760,8 @@ static void stream_complete(libxl__egc *egc,
          * libxl__xc_domain_restore_done()
          */
         checkpoint_done(egc, stream, rc);
-        return;
-    }
-
-    if (stream->in_checkpoint_state) {
+        break;
+    case SRS_PHASE_CHECKPOINT_STATE:
         assert(rc);
 
         /*
@@ -767,10 +773,8 @@ static void stream_complete(libxl__egc *egc,
          *    libxl__stream_read_abort()
          */
         checkpoint_state_done(egc, stream, rc);
-        return;
+        break;
     }
-
-    stream_done(egc, stream, rc);
 }
 
 static void checkpoint_done(libxl__egc *egc,
@@ -778,18 +782,17 @@ static void checkpoint_done(libxl__egc *egc,
 {
     int ret;
 
-    assert(stream->in_checkpoint);
+    assert(stream_in_checkpoint(stream));
 
     if (rc == 0)
         ret = XGR_CHECKPOINT_SUCCESS;
-    else if (stream->phase == SRS_PHASE_BUFFERING)
+    else if (stream->phase == SRS_PHASE_CHECKPOINT_BUFFERING)
         ret = XGR_CHECKPOINT_FAILOVER;
     else
         ret = XGR_CHECKPOINT_ERROR;
 
     stream->checkpoint_callback(egc, stream, ret);
 
-    stream->in_checkpoint = false;
     stream->phase = SRS_PHASE_NORMAL;
 }
 
@@ -799,8 +802,7 @@ static void stream_done(libxl__egc *egc,
     libxl__sr_record_buf *rec, *trec;
 
     assert(stream->running);
-    assert(!stream->in_checkpoint);
-    assert(!stream->in_checkpoint_state);
+    assert(stream->phase == SRS_PHASE_NORMAL);
     stream->running = false;
 
     if (stream->incoming_record)
@@ -955,9 +957,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc,
                                          libxl__stream_read_state *stream)
 {
     assert(stream->running);
-    assert(!stream->in_checkpoint);
-    assert(!stream->in_checkpoint_state);
-    stream->in_checkpoint_state = true;
+    assert(stream->phase == SRS_PHASE_NORMAL);
+    stream->phase = SRS_PHASE_CHECKPOINT_STATE;
 
     setup_read_record(egc, stream);
 }
@@ -965,8 +966,8 @@ void libxl__stream_read_checkpoint_state(libxl__egc *egc,
 static void checkpoint_state_done(libxl__egc *egc,
                                   libxl__stream_read_state *stream, int rc)
 {
-    assert(stream->in_checkpoint_state);
-    stream->in_checkpoint_state = false;
+    assert(stream->phase == SRS_PHASE_CHECKPOINT_STATE);
+    stream->phase = SRS_PHASE_NORMAL;
     stream->checkpoint_callback(egc, stream, rc);
 }
 
-- 
2.7.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
https://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.