[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC Patch v2 15/16] xc_domain_save: implement save_callbacks for colo



Add a new save callbacks:
1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is
   sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do
   live migration now. Add this callback, and we can send it in this
   callback.

Update some callbacks for colo:
1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend
        both PVM and SVM.
        Communicate with slaver like this:
        a. write "continue" to notify slaver to suspend SVM
        b. suspend PVM and SVM
        c. slaver writes "suspend" to tell master that SVM is suspended
2. postcopy(): In colo mode, both PVM and SVM are running, and we have suspended
        both PVM and SVM. So we should resume PVM and SVM
        Communicate with slaver like this:
        a. write "resume" to notify slaver to resume SVM
        b. resume PVM and SVM
        c. slaver writes "resume" to tell master that SVM is resumed
3. checkpoint(): In colo mode, we do a new checkpoint only when output packet
    from PVM and SVM is different. We will block in this callback and return
    when a output packet is different.

Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx>
Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx>
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
 tools/libxc/xc_domain_save.c                      |   17 ++
 tools/libxc/xenguest.h                            |    3 +
 tools/python/xen/lowlevel/checkpoint/checkpoint.c |  302 ++++++++++++++++++++-
 tools/python/xen/lowlevel/checkpoint/checkpoint.h |    1 +
 4 files changed, 319 insertions(+), 4 deletions(-)

diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c
index b477188..8f84c9b 100644
--- a/tools/libxc/xc_domain_save.c
+++ b/tools/libxc/xc_domain_save.c
@@ -1785,6 +1785,23 @@ int xc_domain_save(xc_interface *xch, int io_fd, 
uint32_t dom, uint32_t max_iter
         }
     }
 
+    /* Flush last write and discard cache for file. */
+    if ( outbuf_flush(xch, ob, io_fd) < 0 ) {
+        PERROR("Error when flushing output buffer");
+        rc = 1;
+    }
+
+    discard_file_cache(xch, io_fd, 1 /* flush */);
+
+    if ( callbacks->post_sendstate )
+    {
+        if ( callbacks->post_sendstate(callbacks->data) < 0)
+        {
+            PERROR("Error: post_sendstate()\n");
+            goto out;
+        }
+    }
+
     /* Zero terminate */
     i = 0;
     if ( wrexact(io_fd, &i, sizeof(int)) )
diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h
index 4bb444a..9d7d03c 100644
--- a/tools/libxc/xenguest.h
+++ b/tools/libxc/xenguest.h
@@ -72,6 +72,9 @@ struct save_callbacks {
      */
     int (*toolstack_save)(uint32_t domid, uint8_t **buf, uint32_t *len, void 
*data);
 
+    /* called before Zero terminate is sent */
+    int (*post_sendstate)(void *data);
+
     /* to be provided as the last argument to each callback function */
     void* data;
 };
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c 
b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
index ec14b27..28bdb23 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
@@ -1,14 +1,22 @@
 /* python bridge to checkpointing API */
 
 #include <Python.h>
+#include <sys/wait.h>
 
 #include <xenstore.h>
 #include <xenctrl.h>
+#include <xc_private.h>
+#include <xg_save_restore.h>
 
 #include "checkpoint.h"
 
 #define PKG "xen.lowlevel.checkpoint"
 
+#define COMP_IOC_MAGIC    'k'
+#define COMP_IOCTWAIT     _IO(COMP_IOC_MAGIC, 0)
+#define COMP_IOCTFLUSH    _IO(COMP_IOC_MAGIC, 1)
+#define COMP_IOCTRESUME   _IO(COMP_IOC_MAGIC, 2)
+
 static PyObject* CheckpointError;
 
 typedef struct {
@@ -25,11 +33,15 @@ typedef struct {
   PyObject* setup_cb;
 
   PyThreadState* threadstate;
+  int colo;
+  int first_time;
+  int dev_fd;
 } CheckpointObject;
 
 static int suspend_trampoline(void* data);
 static int postcopy_trampoline(void* data);
 static int checkpoint_trampoline(void* data);
+static int post_sendstate_trampoline(void *data);
 
 static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
                                PyObject* kwargs)
@@ -169,10 +181,17 @@ static PyObject* pycheckpoint_start(PyObject* obj, 
PyObject* args) {
   } else
     self->setup_cb = NULL;
 
+  if (flags & CHECKPOINT_FLAGS_COLO)
+    self->colo = 1;
+  else
+    self->colo = 0;
+  self->first_time = 1;
+
   memset(&callbacks, 0, sizeof(callbacks));
   callbacks.suspend = suspend_trampoline;
   callbacks.postcopy = postcopy_trampoline;
   callbacks.checkpoint = checkpoint_trampoline;
+  callbacks.post_sendstate = post_sendstate_trampoline;
   callbacks.data = self;
 
   self->threadstate = PyEval_SaveThread();
@@ -279,6 +298,196 @@ PyMODINIT_FUNC initcheckpoint(void) {
   block_timer();
 }
 
+/* colo functions */
+
+/* master                   slaver          comment
+ * "continue"   ===>
+ *              <===        "suspend"       guest is suspended
+ */
+static int notify_slaver_suspend(CheckpointObject *self)
+{
+    int fd = self->cps.fd;
+
+    if (self->first_time == 1)
+        return 0;
+
+    return write_exact(fd, "continue", 8);
+}
+
+static int wait_slaver_suspend(CheckpointObject *self)
+{
+    int fd = self->cps.fd;
+    xc_interface *xch = self->cps.xch;
+    char buf[8];
+
+    if (self->first_time == 1)
+        return 0;
+
+    if ( read_exact(fd, buf, 7) < 0) {
+        PERROR("read: suspend");
+        return -1;
+    }
+
+    buf[7] = '\0';
+    if (strcmp(buf, "suspend")) {
+        PERROR("read \"%s\", expect \"suspend\"", buf);
+        return -1;
+    }
+
+    return 0;
+}
+
+static int notify_slaver_start_checkpoint(CheckpointObject *self)
+{
+    int fd = self->cps.fd;
+    xc_interface *xch = self->cps.xch;
+
+    if (self->first_time == 1)
+        return 0;
+
+    if ( write_exact(fd, "start", 5) < 0) {
+        PERROR("write start");
+        return -1;
+    }
+
+    return 0;
+}
+
+/*
+ * master                       slaver
+ *                  <====       "finish"
+ * flush packets
+ * "resume"         ====>
+ * resume vm                    resume vm
+ *                  <====       "resume"
+ */
+static int notify_slaver_resume(CheckpointObject *self)
+{
+    int fd = self->cps.fd;
+    xc_interface *xch = self->cps.xch;
+    char buf[7];
+
+    /* wait slaver to finish update memory, device state... */
+    if ( read_exact(fd, buf, 6) < 0) {
+        PERROR("read: finish");
+        return -1;
+    }
+
+    buf[6] = '\0';
+    if (strcmp(buf, "finish")) {
+        ERROR("read \"%s\", expect \"finish\"", buf);
+        return -1;
+    }
+
+    if (!self->first_time)
+        /* flush queued packets now */
+        ioctl(self->dev_fd, COMP_IOCTFLUSH);
+
+    /* notify slaver to resume vm*/
+    if (write_exact(fd, "resume", 6) < 0) {
+        PERROR("write: resume");
+        return -1;
+    }
+
+    return 0;
+}
+
+static int install_fw_network(CheckpointObject *self)
+{
+    int rc;
+    PyObject* result;
+
+    PyEval_RestoreThread(self->threadstate);
+    result = PyObject_CallFunction(self->setup_cb, NULL);
+    self->threadstate = PyEval_SaveThread();
+
+    if (!result)
+        return -1;
+
+    if (result == Py_None || PyObject_IsTrue(result))
+        rc = 0;
+    else
+        rc = -1;
+
+    Py_DECREF(result);
+
+    return rc;
+}
+
+static int wait_slaver_resume(CheckpointObject *self)
+{
+    int fd = self->cps.fd;
+    xc_interface *xch = self->cps.xch;
+    char buf[7];
+
+    if (read_exact(fd, buf, 6) < 0) {
+        PERROR("read resume");
+        return -1;
+    }
+
+    buf[6] = '\0';
+    if (strcmp(buf, "resume")) {
+        ERROR("read \"%s\", expect \"resume\"", buf);
+        return -1;
+    }
+
+    return 0;
+}
+
+static int colo_postresume(CheckpointObject *self)
+{
+    int rc;
+    int dev_fd = self->dev_fd;
+
+    rc = wait_slaver_resume(self);
+    if (rc < 0)
+        return rc;
+
+    if (self->first_time) {
+        rc = install_fw_network(self);
+        if (rc < 0) {
+            fprintf(stderr, "install network fails\n");
+            return rc;
+        }
+    } else {
+        ioctl(dev_fd, COMP_IOCTRESUME);
+    }
+
+    return 0;
+}
+
+static int pre_checkpoint(CheckpointObject *self)
+{
+    xc_interface *xch = self->cps.xch;
+
+    if (!self->first_time)
+        return 0;
+
+    self->dev_fd = open("/dev/HA_compare", O_RDWR);
+    if (self->dev_fd < 0) {
+        PERROR("opening /dev/HA_compare fails");
+        return -1;
+    }
+
+    return 0;
+}
+
+static void wait_new_checkpoint(CheckpointObject *self)
+{
+    int dev_fd = self->dev_fd;
+    int err;
+
+    while (1) {
+        err = ioctl(dev_fd, COMP_IOCTWAIT);
+        if (err == 0)
+            break;
+
+        if (err == -1 && errno != ERESTART && errno != ETIME) {
+            fprintf(stderr, "ioctl() returns -1, errno: %d\n", errno);
+        }
+    }
+}
+
 /* private functions */
 
 /* bounce C suspend call into python equivalent.
@@ -289,6 +498,13 @@ static int suspend_trampoline(void* data)
 
   PyObject* result;
 
+  if (self->colo) {
+    if (notify_slaver_suspend(self) < 0) {
+      fprintf(stderr, "nofitying slaver suspend fails\n");
+      return 0;
+    }
+  }
+
   /* call default suspend function, then python hook if available */
   if (self->armed) {
     if (checkpoint_wait(&self->cps) < 0) {
@@ -307,8 +523,16 @@ static int suspend_trampoline(void* data)
     }
   }
 
+  /* suspend_cb() should be called after both sides are suspended */
+  if (self->colo) {
+    if (wait_slaver_suspend(self) < 0) {
+      fprintf(stderr, "waiting slaver suspend fails\n");
+      return 0;
+    }
+  }
+
   if (!self->suspend_cb)
-    return 1;
+    goto start_checkpoint;
 
   PyEval_RestoreThread(self->threadstate);
   result = PyObject_CallFunction(self->suspend_cb, NULL);
@@ -319,12 +543,32 @@ static int suspend_trampoline(void* data)
 
   if (result == Py_None || PyObject_IsTrue(result)) {
     Py_DECREF(result);
-    return 1;
+    goto start_checkpoint;
   }
 
   Py_DECREF(result);
 
   return 0;
+
+start_checkpoint:
+  if (self->colo) {
+    if (notify_slaver_start_checkpoint(self) < 0) {
+      fprintf(stderr, "nofitying slaver to start checkpoint fails\n");
+      return 0;
+    }
+
+    /* PVM is suspended first when doing live migration,
+     * and then it is suspended for a new checkpoint.
+     */
+    if (self->first_time == 1)
+        /* live migration */
+        self->first_time = 2;
+    else if (self->first_time == 2)
+        /* the first checkpoint */
+        self->first_time = 0;
+  }
+
+  return 1;
 }
 
 static int postcopy_trampoline(void* data)
@@ -334,6 +578,13 @@ static int postcopy_trampoline(void* data)
   PyObject* result;
   int rc = 0;
 
+  if (self->colo) {
+    if (notify_slaver_resume(self) < 0) {
+      fprintf(stderr, "nofitying slaver resume fails\n");
+      return 0;
+    }
+  }
+
   if (!self->postcopy_cb)
     goto resume;
 
@@ -352,6 +603,13 @@ static int postcopy_trampoline(void* data)
     return 0;
   }
 
+  if (self->colo) {
+    if (colo_postresume(self) < 0) {
+      fprintf(stderr, "postresume fails\n");
+      return 0;
+    }
+  }
+
   return rc;
 }
 
@@ -366,8 +624,15 @@ static int checkpoint_trampoline(void* data)
       return -1;
   }
 
+  if (self->colo) {
+    if (pre_checkpoint(self) < 0) {
+      fprintf(stderr, "pre_checkpoint() fails\n");
+      return -1;
+    }
+  }
+
   if (!self->checkpoint_cb)
-    return 0;
+    goto wait_checkpoint;
 
   PyEval_RestoreThread(self->threadstate);
   result = PyObject_CallFunction(self->checkpoint_cb, NULL);
@@ -378,10 +643,39 @@ static int checkpoint_trampoline(void* data)
 
   if (result == Py_None || PyObject_IsTrue(result)) {
     Py_DECREF(result);
-    return 1;
+    goto wait_checkpoint;
   }
 
   Py_DECREF(result);
 
   return 0;
+
+wait_checkpoint:
+  if (self->colo) {
+    wait_new_checkpoint(self);
+  }
+
+  fprintf(stderr, "\n\nnew checkpoint..........\n");
+
+  return 1;
+}
+
+static int post_sendstate_trampoline(void* data)
+{
+  CheckpointObject *self = data;
+  int fd = self->cps.fd;
+  int i = XC_SAVE_ID_LAST_CHECKPOINT;
+
+  if (!self->colo)
+    return 0;
+
+  /* In colo mode, guest is running on slaver side, so we should
+   * send XC_SAVE_ID_LAST_CHECKPOINT to slaver.
+   */
+  if (write_exact(fd, &i, sizeof(int)) < 0) {
+    fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n");
+    return -1;
+  }
+
+  return 0;
 }
diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h 
b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
index 187d9d7..96fc949 100644
--- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
@@ -41,6 +41,7 @@ typedef struct {
 } checkpoint_state;
 
 #define CHECKPOINT_FLAGS_COMPRESSION 1
+#define CHECKPOINT_FLAGS_COLO        2
 char* checkpoint_error(checkpoint_state* s);
 
 void checkpoint_init(checkpoint_state* s);
-- 
1.7.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.