[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH v2 16/27] tools/libxl: Infrastructure for reading a libxl migration v2 stream



From: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>

This contains the event machinary and state machines to read an act on a
non-checkpointed migration v2 stream (with the exception of the
xc_domain_restore() handling which is spliced later in a bisectable way).

It also contains some boilerplate to help support checkpointed streams, which
shall be introduced in a later patch.

Signed-off-by: Ross Lagerwall <ross.lagerwall@xxxxxxxxxx>
Signed-off-by: Andrew Cooper <andrew.cooper3@xxxxxxxxxx>
CC: Ian Campbell <Ian.Campbell@xxxxxxxxxx>
CC: Ian Jackson <Ian.Jackson@xxxxxxxxxxxxx>
CC: Wei Liu <wei.liu2@xxxxxxxxxx>

---

Large quantities of the logic here are completely overhauled since v1, mostly
as part of fixing the checkpoint buffering bug which was the cause of the
broken Remus failover.  The result is actually more simple overall; all
records are buffered in memory (there is no splicing of the emulator records
any more), with normal streams having exactly 0 or 1 records currently in the
buffer, before processing.  Remus support later will allow multiple buffered
records.
---
 tools/libxl/Makefile            |    1 +
 tools/libxl/libxl_internal.h    |   55 +++++
 tools/libxl/libxl_stream_read.c |  504 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 560 insertions(+)
 create mode 100644 tools/libxl/libxl_stream_read.c

diff --git a/tools/libxl/Makefile b/tools/libxl/Makefile
index cc9c152..c71c5fe 100644
--- a/tools/libxl/Makefile
+++ b/tools/libxl/Makefile
@@ -94,6 +94,7 @@ LIBXL_OBJS = flexarray.o libxl.o libxl_create.o libxl_dm.o 
libxl_pci.o \
                        libxl_dom.o libxl_exec.o libxl_xshelp.o libxl_device.o \
                        libxl_internal.o libxl_utils.o libxl_uuid.o \
                        libxl_json.o libxl_aoutils.o libxl_numa.o libxl_vnuma.o 
\
+                       libxl_stream_read.o \
                        libxl_save_callout.o _libxl_save_msgs_callout.o \
                        libxl_qmp.o libxl_event.o libxl_fork.o $(LIBXL_OBJS-y)
 LIBXL_OBJS += libxl_genid.o
diff --git a/tools/libxl/libxl_internal.h b/tools/libxl/libxl_internal.h
index 4bd6ea1..0f17e7c 100644
--- a/tools/libxl/libxl_internal.h
+++ b/tools/libxl/libxl_internal.h
@@ -19,6 +19,8 @@
 
 #include "libxl_osdeps.h" /* must come before any other headers */
 
+#include "libxl_sr_stream_format.h"
+
 #include <assert.h>
 #include <dirent.h>
 #include <errno.h>
@@ -3211,6 +3213,58 @@ typedef void libxl__domain_create_cb(libxl__egc *egc,
                                      libxl__domain_create_state*,
                                      int rc, uint32_t domid);
 
+/* State for manipulating a libxl migration v2 stream */
+typedef struct libxl__stream_read_state libxl__stream_read_state;
+
+typedef struct libxl__sr_record_buf {
+    /* private to stream read helper */
+    LIBXL_STAILQ_ENTRY(struct libxl__sr_record_buf) entry;
+    libxl__sr_rec_hdr hdr;
+    void *body;
+} libxl__sr_record_buf;
+
+struct libxl__stream_read_state {
+    /* filled by the user */
+    libxl__ao *ao;
+    int fd;
+    void (*completion_callback)(libxl__egc *egc,
+                                libxl__stream_read_state *srs,
+                                int rc);
+    /* Private */
+    int rc;
+    bool running;
+
+    /* Main stream-reading data */
+    libxl__datacopier_state dc;
+    libxl__sr_hdr hdr;
+    libxl__sr_record_buf *incoming_record;
+    LIBXL_STAILQ_HEAD(, libxl__sr_record_buf) record_queue;
+    enum {
+        SRS_PHASE_NORMAL,
+    } phase;
+    bool recursion_guard;
+
+    /* Emulator blob handling */
+    libxl__datacopier_state emu_dc;
+    libxl__carefd *emu_carefd;
+};
+
+_hidden void libxl__stream_read_start(libxl__egc *egc,
+                                      libxl__stream_read_state *stream);
+
+_hidden void libxl__stream_read_continue(libxl__egc *egc,
+                                         libxl__stream_read_state *stream);
+
+_hidden void libxl__stream_read_abort(libxl__egc *egc,
+                                      libxl__stream_read_state *stream, int 
rc);
+
+static inline bool libxl__stream_read_inuse(
+    const libxl__stream_read_state *stream)
+{
+    return stream->running;
+}
+
+
 struct libxl__domain_create_state {
     /* filled in by user */
     libxl__ao *ao;
@@ -3227,6 +3281,7 @@ struct libxl__domain_create_state {
     libxl__stub_dm_spawn_state dmss;
         /* If we're not doing stubdom, we use only dmss.dm,
          * for the non-stubdom device model. */
+    libxl__stream_read_state srs;
     libxl__save_helper_state shs;
     /* necessary if the domain creation failed and we have to destroy it */
     libxl__domain_destroy_state dds;
diff --git a/tools/libxl/libxl_stream_read.c b/tools/libxl/libxl_stream_read.c
new file mode 100644
index 0000000..6f5d572
--- /dev/null
+++ b/tools/libxl/libxl_stream_read.c
@@ -0,0 +1,504 @@
+/*
+ * Copyright (C) 2015      Citrix Ltd.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; version 2.1 only. with the special
+ * exception on linking described in file LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ */
+
+#include "libxl_osdeps.h" /* must come before any other headers */
+
+#include "libxl_internal.h"
+
+/*
+ * Infrastructure for reading and acting on the contents of a libxl migration
+ * stream. There are a lot of moving parts here.
+ *
+ * Entry points from outside:
+ *  - libxl__stream_read_start()
+ *     - Set up reading a stream from the start.
+ *
+ * The crux of this logic revolves around the stream_continue() function.  It
+ * chooses the next action to perform.  Valid actions are:
+ *  - Read another record from the stream.
+ *  - Process a buffered record.
+ *
+ * There are several chains of event:
+ *
+ * 1) Starting a stream follows:
+ *    - libxl__stream_read_start()
+ *    - stream_header_done()
+ *    - stream_continue()
+ *
+ * 2) Reading a record follows:
+ *    - stream_continue()
+ *    - record_header_done()
+ *    - record_body_done()
+ *    - stream_continue()
+ *
+ * 3) Processing a record had several chains to follow, depending on the
+ *    record in question.
+ * 3a) "Simple" record:
+ *    - process_record()
+ *    - stream_continue()
+ * 3b) LIBXC record:
+ *    - process_record()
+ *    - libxl__xc_domain_restore()
+ *    - libxl__xc_domain_restore_done()
+ *    - stream_continue()
+ * 3c) EMULATOR record:
+ *    - process_record()
+ *    - stream_write_emulator()
+ *    - stream_write_emulator_done()
+ *    - stream_continue()
+ */
+
+/* Stream error/success handling. */
+static void stream_success(libxl__egc *egc,
+                           libxl__stream_read_state *stream);
+static void stream_failed(libxl__egc *egc,
+                          libxl__stream_read_state *stream, int rc);
+static void stream_done(libxl__egc *egc,
+                        libxl__stream_read_state *stream);
+
+/* Event callbacks for main reading loop. */
+static void stream_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int rc, int onwrite, int errnoval);
+
+/* Event chain for reading a record from the stream. */
+static void setup_read_record(libxl__egc *egc,
+                              libxl__stream_read_state *stream);
+static void record_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int rc, int onwrite, int errnoval);
+static void record_body_done(libxl__egc *egc,
+                             libxl__datacopier_state *dc,
+                             int rc, int onwrite, int errnoval);
+
+static bool process_record(libxl__egc *egc,
+                           libxl__stream_read_state *stream);
+
+/* Event chain for writing an emulator blob. */
+static void write_emulator_blob(libxl__egc *egc,
+                                libxl__stream_read_state *stream,
+                                libxl__sr_record_buf *rec);
+static void write_emulator_done(libxl__egc *egc,
+                                libxl__datacopier_state *dc,
+                                int rc, int onwrite, int errnoval);
+
+/* Helper to set up reading some data from the stream. */
+static int setup_read(libxl__stream_read_state *stream,
+                      const char *what, void *ptr, size_t nr_bytes,
+                      libxl__datacopier_callback cb)
+{
+    libxl__datacopier_state *dc = &stream->dc;
+
+    dc->readwhat      = what;
+    dc->readbuf       = ptr;
+    dc->bytes_to_read = nr_bytes;
+    dc->used          = 0;
+    dc->callback      = cb;
+
+    return libxl__datacopier_start(dc);
+}
+
+void libxl__stream_read_start(libxl__egc *egc,
+                              libxl__stream_read_state *stream)
+{
+    libxl__datacopier_state *dc = &stream->dc;
+    int ret = 0;
+
+    /* State initialisation. */
+    assert(!stream->running);
+
+    memset(dc, 0, sizeof(*dc));
+    dc->ao = stream->ao;
+    dc->readfd = stream->fd;
+    dc->writefd = -1;
+
+    /* Start reading the stream header. */
+    ret = setup_read(stream, "stream header",
+                     &stream->hdr, sizeof(stream->hdr),
+                     stream_header_done);
+    if (ret)
+        goto err;
+
+    stream->running = true;
+    stream->phase = SRS_PHASE_NORMAL;
+    LIBXL_STAILQ_INIT(&stream->record_queue);
+    stream->recursion_guard = 0;
+
+    assert(!ret);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+void libxl__stream_read_abort(libxl__egc *egc,
+                              libxl__stream_read_state *stream, int rc)
+{
+    stream_failed(egc, stream, rc);
+}
+
+static void stream_success(libxl__egc *egc, libxl__stream_read_state *stream)
+{
+    stream->rc = 0;
+
+    stream_done(egc, stream);
+}
+
+static void stream_failed(libxl__egc *egc,
+                          libxl__stream_read_state *stream, int rc)
+{
+    assert(rc);
+    stream->rc = rc;
+
+    if (stream->running) {
+        stream_done(egc, stream);
+    }
+}
+
+static void stream_done(libxl__egc *egc,
+                        libxl__stream_read_state *stream)
+{
+    libxl__sr_record_buf *rec, *trec;
+
+    assert(stream->running);
+    stream->running = false;
+
+    if (stream->emu_carefd)
+        libxl__carefd_close(stream->emu_carefd);
+
+    LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec) {
+        free(rec->body);
+        free(rec);
+    }
+
+    stream->completion_callback(egc, stream, stream->rc);
+}
+
+static void stream_continue(libxl__egc *egc,
+                            libxl__stream_read_state *stream)
+{
+    STATE_AO_GC(stream->ao);
+
+    /* Must not mutually recurse with process_record() */
+    assert(stream->recursion_guard == false);
+    stream->recursion_guard = true;
+
+    switch (stream->phase) {
+    case SRS_PHASE_NORMAL:
+        /*
+         * Normal phase of the stream.  We arrive here in several senarios.
+         * Alternate between reading another record from the stream, and
+         * processing the record.  At no point should there ever be two
+         * records in the queue.
+         */
+        if (LIBXL_STAILQ_EMPTY(&stream->record_queue))
+            setup_read_record(egc, stream);
+        else {
+            if (process_record(egc, stream))
+                setup_read_record(egc, stream);
+        }
+        break;
+
+    default:
+        abort();
+    }
+
+    assert(stream->recursion_guard == true);
+    stream->recursion_guard = false;
+}
+
+static void stream_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int rc, int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl__sr_hdr *hdr = &stream->hdr;
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (rc || onwrite || errnoval) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "rc %d, onwrite %d, errnoval %d", rc, onwrite, errnoval);
+        goto err;
+    }
+
+    hdr->ident   = be64toh(hdr->ident);
+    hdr->version = be32toh(hdr->version);
+    hdr->options = be32toh(hdr->options);
+
+    if (hdr->ident != RESTORE_STREAM_IDENT) {
+        ret = ERROR_FAIL;
+        LOG(ERROR,
+            "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64,
+            RESTORE_STREAM_IDENT, hdr->ident);
+        goto err;
+    }
+    if (hdr->version != RESTORE_STREAM_VERSION) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "Unexpected Version: expected %u, got %u",
+            RESTORE_STREAM_VERSION, hdr->version);
+        goto err;
+    }
+    if (hdr->options & RESTORE_OPT_BIG_ENDIAN) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "Unable to handle big endian streams");
+        goto err;
+    }
+
+    LOG(DEBUG, "Stream v%u%s", hdr->version,
+        hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : "");
+
+    stream_continue(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void setup_read_record(libxl__egc *egc,
+                              libxl__stream_read_state *stream)
+{
+    STATE_AO_GC(stream->ao);
+    libxl__sr_record_buf *rec = NULL;
+    int ret;
+
+    assert(stream->incoming_record == NULL);
+
+    stream->incoming_record = rec = libxl__zalloc(NOGC, sizeof(*rec));
+    ret = setup_read(stream, "record header",
+                     &rec->hdr, sizeof(rec->hdr),
+                     record_header_done);
+    if (ret)
+        goto err;
+
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void record_header_done(libxl__egc *egc,
+                               libxl__datacopier_state *dc,
+                               int rc, int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl__sr_record_buf *rec = stream->incoming_record;
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (rc || onwrite || errnoval) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "rc %d, onwrite %d, errnoval %d", rc, onwrite, errnoval);
+        goto err;
+    }
+
+    /* No body? All done. */
+    if (rec->hdr.length == 0) {
+        record_body_done(egc, dc, 0, 0, 0);
+        return;
+    }
+
+    size_t bytes_to_read = ROUNDUP(rec->hdr.length, REC_ALIGN_ORDER);
+    rec->body = libxl__malloc(NOGC, bytes_to_read);
+
+    ret = setup_read(stream, "record body",
+                     rec->body, bytes_to_read,
+                     record_body_done);
+    if (ret)
+        goto err;
+
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void record_body_done(libxl__egc *egc,
+                             libxl__datacopier_state *dc,
+                             int rc, int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
+    libxl__sr_record_buf *rec = stream->incoming_record;
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    if (rc || onwrite || errnoval) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "rc %d, onwrite %d, errnoval %d", rc, onwrite, errnoval);
+        goto err;
+    }
+
+    LIBXL_STAILQ_INSERT_TAIL(&stream->record_queue, rec, entry);
+    stream->incoming_record = NULL;
+
+    stream_continue(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+/*
+ * Returns a boolean indicating whether a further action should be set up by
+ * the caller.  This is needed to prevent mutual recursion with
+ * stream_continue().
+ */
+static bool process_record(libxl__egc *egc,
+                           libxl__stream_read_state *stream)
+{
+    STATE_AO_GC(stream->ao);
+    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
+    libxl__sr_record_buf *rec;
+    bool further_action_needed = false;
+    int ret = 0;
+
+    /* Pop a record from the head of the queue. */
+    assert(!LIBXL_STAILQ_EMPTY(&stream->record_queue));
+    rec = LIBXL_STAILQ_FIRST(&stream->record_queue);
+    LIBXL_STAILQ_REMOVE_HEAD(&stream->record_queue, entry);
+
+    LOG(DEBUG, "Record: %u, length %u", rec->hdr.type, rec->hdr.length);
+
+    switch (rec->hdr.type) {
+
+    case REC_TYPE_END:
+        stream_success(egc, stream);
+        break;
+
+    case REC_TYPE_XENSTORE_DATA:
+        ret = libxl__toolstack_restore(dcs->guest_domid, rec->body,
+                                       rec->hdr.length, &dcs->shs);
+        if (ret)
+            goto err;
+
+        /*
+         * libxl__toolstack_restore() is a synchronous function.  Request
+         * that our caller queues another action for us.
+         */
+        further_action_needed = true;
+        break;
+
+    case REC_TYPE_EMULATOR_CONTEXT:
+        write_emulator_blob(egc, stream, rec);
+        break;
+
+    default:
+        LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type);
+        ret = ERROR_FAIL;
+        goto err;
+    }
+
+    assert(!ret);
+    free(rec->body);
+    free(rec);
+    return further_action_needed;
+
+ err:
+    assert(ret);
+    free(rec->body);
+    free(rec);
+    stream_failed(egc, stream, ret);
+    return false;
+}
+
+static void write_emulator_blob(libxl__egc *egc,
+                                libxl__stream_read_state *stream,
+                                libxl__sr_record_buf *rec)
+{
+    libxl__domain_create_state *dcs = CONTAINER_OF(stream, *dcs, srs);
+    libxl__datacopier_state *dc = &stream->emu_dc;
+    libxl__sr_emulator_hdr *emu_hdr;
+    STATE_AO_GC(stream->ao);
+    char path[256];
+    int ret = 0, writefd = -1;
+
+    if ( rec->hdr.length < sizeof(*emu_hdr) ) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "Emulator record too short to contain header");
+        goto err;
+    }
+    emu_hdr = rec->body;
+
+    sprintf(path, XC_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid);
+
+    libxl__carefd_begin();
+    writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0600);
+    if (writefd == -1) {
+        ret = ERROR_FAIL;
+        LOGE(ERROR, "unable to open %s", path);
+        libxl__carefd_unlock();
+        goto err;
+    }
+    stream->emu_carefd = libxl__carefd_record(CTX, writefd);
+    libxl__carefd_unlock();
+
+
+    memset(dc, 0, sizeof(*dc));
+    dc->ao = stream->ao;
+    dc->writewhat = "qemu save file";
+    dc->writefd = writefd;
+    dc->maxsz = -1;
+    dc->callback = write_emulator_done;
+
+    ret = libxl__datacopier_start(dc);
+    if (ret)
+        goto err;
+
+    libxl__datacopier_prefixdata(egc, dc,
+                                 rec->body + sizeof(*emu_hdr),
+                                 rec->hdr.length - sizeof(*emu_hdr));
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+static void write_emulator_done(libxl__egc *egc,
+                                libxl__datacopier_state *dc,
+                                int rc, int onwrite, int errnoval)
+{
+    libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, emu_dc);
+    STATE_AO_GC(dc->ao);
+    int ret = 0;
+
+    libxl__carefd_close(stream->emu_carefd);
+    stream->emu_carefd = NULL;
+
+    if (rc || errnoval) {
+        ret = ERROR_FAIL;
+        LOG(ERROR, "rc %d, onwrite %d, errnoval %d", rc, onwrite, errnoval);
+        goto err;
+    }
+
+    stream_continue(egc, stream);
+    return;
+
+ err:
+    assert(ret);
+    stream_failed(egc, stream, ret);
+}
+
+/*
+ * Local variables:
+ * mode: C
+ * c-basic-offset: 4
+ * indent-tabs-mode: nil
+ * End:
+ */
-- 
1.7.10.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.