[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [RFC Patch v2 15/16] xc_domain_save: implement save_callbacks for colo



On 11/07/13 09:35, Wen Congyang wrote:
> Add a new save callbacks:
> 1. post_sendstate(): SVM will run only when XC_SAVE_ID_LAST_CHECKPOINT is
>    sent to slaver. But we only sent XC_SAVE_ID_LAST_CHECKPOINT when we do
>    live migration now. Add this callback, and we can send it in this
>    callback.
>
> Update some callbacks for colo:
> 1. suspend(): In colo mode, both PVM and SVM are running. So we should suspend
>         both PVM and SVM.
>         Communicate with slaver like this:
>         a. write "continue" to notify slaver to suspend SVM
>         b. suspend PVM and SVM
>         c. slaver writes "suspend" to tell master that SVM is suspended
> 2. postcopy(): In colo mode, both PVM and SVM are running, and we have 
> suspended
>         both PVM and SVM. So we should resume PVM and SVM
>         Communicate with slaver like this:
>         a. write "resume" to notify slaver to resume SVM
>         b. resume PVM and SVM
>         c. slaver writes "resume" to tell master that SVM is resumed
> 3. checkpoint(): In colo mode, we do a new checkpoint only when output packet
>     from PVM and SVM is different. We will block in this callback and return
>     when a output packet is different.
>
> Signed-off-by: Ye Wei <wei.ye1987@xxxxxxxxx>
> Signed-off-by: Jiang Yunhong <yunhong.jiang@xxxxxxxxx>
> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
> ---
>  tools/libxc/xc_domain_save.c                      |   17 ++
>  tools/libxc/xenguest.h                            |    3 +
>  tools/python/xen/lowlevel/checkpoint/checkpoint.c |  302 
> ++++++++++++++++++++-
>  tools/python/xen/lowlevel/checkpoint/checkpoint.h |    1 +
>  4 files changed, 319 insertions(+), 4 deletions(-)
>
> diff --git a/tools/libxc/xc_domain_save.c b/tools/libxc/xc_domain_save.c
> index b477188..8f84c9b 100644
> --- a/tools/libxc/xc_domain_save.c
> +++ b/tools/libxc/xc_domain_save.c
> @@ -1785,6 +1785,23 @@ int xc_domain_save(xc_interface *xch, int io_fd, 
> uint32_t dom, uint32_t max_iter
>          }
>      }
>  
> +    /* Flush last write and discard cache for file. */
> +    if ( outbuf_flush(xch, ob, io_fd) < 0 ) {
> +        PERROR("Error when flushing output buffer");
> +        rc = 1;
> +    }
> +
> +    discard_file_cache(xch, io_fd, 1 /* flush */);
> +
> +    if ( callbacks->post_sendstate )
> +    {
> +        if ( callbacks->post_sendstate(callbacks->data) < 0)
> +        {
> +            PERROR("Error: post_sendstate()\n");
> +            goto out;
> +        }
> +    }
> +
>      /* Zero terminate */
>      i = 0;
>      if ( wrexact(io_fd, &i, sizeof(int)) )
> diff --git a/tools/libxc/xenguest.h b/tools/libxc/xenguest.h
> index 4bb444a..9d7d03c 100644
> --- a/tools/libxc/xenguest.h
> +++ b/tools/libxc/xenguest.h
> @@ -72,6 +72,9 @@ struct save_callbacks {
>       */
>      int (*toolstack_save)(uint32_t domid, uint8_t **buf, uint32_t *len, void 
> *data);
>  
> +    /* called before Zero terminate is sent */
> +    int (*post_sendstate)(void *data);
> +
>      /* to be provided as the last argument to each callback function */
>      void* data;
>  };
> diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.c 
> b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
> index ec14b27..28bdb23 100644
> --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.c
> +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c
> @@ -1,14 +1,22 @@
>  /* python bridge to checkpointing API */
>  
>  #include <Python.h>
> +#include <sys/wait.h>

I cant see anything using this header file which is good, as otherwise I
would still tell you that a python module should not be using any of its
contents.

~Andrew

>  
>  #include <xenstore.h>
>  #include <xenctrl.h>
> +#include <xc_private.h>
> +#include <xg_save_restore.h>
>  
>  #include "checkpoint.h"
>  
>  #define PKG "xen.lowlevel.checkpoint"
>  
> +#define COMP_IOC_MAGIC    'k'
> +#define COMP_IOCTWAIT     _IO(COMP_IOC_MAGIC, 0)
> +#define COMP_IOCTFLUSH    _IO(COMP_IOC_MAGIC, 1)
> +#define COMP_IOCTRESUME   _IO(COMP_IOC_MAGIC, 2)
> +
>  static PyObject* CheckpointError;
>  
>  typedef struct {
> @@ -25,11 +33,15 @@ typedef struct {
>    PyObject* setup_cb;
>  
>    PyThreadState* threadstate;
> +  int colo;
> +  int first_time;
> +  int dev_fd;
>  } CheckpointObject;
>  
>  static int suspend_trampoline(void* data);
>  static int postcopy_trampoline(void* data);
>  static int checkpoint_trampoline(void* data);
> +static int post_sendstate_trampoline(void *data);
>  
>  static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
>                                 PyObject* kwargs)
> @@ -169,10 +181,17 @@ static PyObject* pycheckpoint_start(PyObject* obj, 
> PyObject* args) {
>    } else
>      self->setup_cb = NULL;
>  
> +  if (flags & CHECKPOINT_FLAGS_COLO)
> +    self->colo = 1;
> +  else
> +    self->colo = 0;
> +  self->first_time = 1;
> +
>    memset(&callbacks, 0, sizeof(callbacks));
>    callbacks.suspend = suspend_trampoline;
>    callbacks.postcopy = postcopy_trampoline;
>    callbacks.checkpoint = checkpoint_trampoline;
> +  callbacks.post_sendstate = post_sendstate_trampoline;
>    callbacks.data = self;
>  
>    self->threadstate = PyEval_SaveThread();
> @@ -279,6 +298,196 @@ PyMODINIT_FUNC initcheckpoint(void) {
>    block_timer();
>  }
>  
> +/* colo functions */
> +
> +/* master                   slaver          comment
> + * "continue"   ===>
> + *              <===        "suspend"       guest is suspended
> + */
> +static int notify_slaver_suspend(CheckpointObject *self)
> +{
> +    int fd = self->cps.fd;
> +
> +    if (self->first_time == 1)
> +        return 0;
> +
> +    return write_exact(fd, "continue", 8);
> +}
> +
> +static int wait_slaver_suspend(CheckpointObject *self)
> +{
> +    int fd = self->cps.fd;
> +    xc_interface *xch = self->cps.xch;
> +    char buf[8];
> +
> +    if (self->first_time == 1)
> +        return 0;
> +
> +    if ( read_exact(fd, buf, 7) < 0) {
> +        PERROR("read: suspend");
> +        return -1;
> +    }
> +
> +    buf[7] = '\0';
> +    if (strcmp(buf, "suspend")) {
> +        PERROR("read \"%s\", expect \"suspend\"", buf);
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static int notify_slaver_start_checkpoint(CheckpointObject *self)
> +{
> +    int fd = self->cps.fd;
> +    xc_interface *xch = self->cps.xch;
> +
> +    if (self->first_time == 1)
> +        return 0;
> +
> +    if ( write_exact(fd, "start", 5) < 0) {
> +        PERROR("write start");
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +/*
> + * master                       slaver
> + *                  <====       "finish"
> + * flush packets
> + * "resume"         ====>
> + * resume vm                    resume vm
> + *                  <====       "resume"
> + */
> +static int notify_slaver_resume(CheckpointObject *self)
> +{
> +    int fd = self->cps.fd;
> +    xc_interface *xch = self->cps.xch;
> +    char buf[7];
> +
> +    /* wait slaver to finish update memory, device state... */
> +    if ( read_exact(fd, buf, 6) < 0) {
> +        PERROR("read: finish");
> +        return -1;
> +    }
> +
> +    buf[6] = '\0';
> +    if (strcmp(buf, "finish")) {
> +        ERROR("read \"%s\", expect \"finish\"", buf);
> +        return -1;
> +    }
> +
> +    if (!self->first_time)
> +        /* flush queued packets now */
> +        ioctl(self->dev_fd, COMP_IOCTFLUSH);
> +
> +    /* notify slaver to resume vm*/
> +    if (write_exact(fd, "resume", 6) < 0) {
> +        PERROR("write: resume");
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static int install_fw_network(CheckpointObject *self)
> +{
> +    int rc;
> +    PyObject* result;
> +
> +    PyEval_RestoreThread(self->threadstate);
> +    result = PyObject_CallFunction(self->setup_cb, NULL);
> +    self->threadstate = PyEval_SaveThread();
> +
> +    if (!result)
> +        return -1;
> +
> +    if (result == Py_None || PyObject_IsTrue(result))
> +        rc = 0;
> +    else
> +        rc = -1;
> +
> +    Py_DECREF(result);
> +
> +    return rc;
> +}
> +
> +static int wait_slaver_resume(CheckpointObject *self)
> +{
> +    int fd = self->cps.fd;
> +    xc_interface *xch = self->cps.xch;
> +    char buf[7];
> +
> +    if (read_exact(fd, buf, 6) < 0) {
> +        PERROR("read resume");
> +        return -1;
> +    }
> +
> +    buf[6] = '\0';
> +    if (strcmp(buf, "resume")) {
> +        ERROR("read \"%s\", expect \"resume\"", buf);
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static int colo_postresume(CheckpointObject *self)
> +{
> +    int rc;
> +    int dev_fd = self->dev_fd;
> +
> +    rc = wait_slaver_resume(self);
> +    if (rc < 0)
> +        return rc;
> +
> +    if (self->first_time) {
> +        rc = install_fw_network(self);
> +        if (rc < 0) {
> +            fprintf(stderr, "install network fails\n");
> +            return rc;
> +        }
> +    } else {
> +        ioctl(dev_fd, COMP_IOCTRESUME);
> +    }
> +
> +    return 0;
> +}
> +
> +static int pre_checkpoint(CheckpointObject *self)
> +{
> +    xc_interface *xch = self->cps.xch;
> +
> +    if (!self->first_time)
> +        return 0;
> +
> +    self->dev_fd = open("/dev/HA_compare", O_RDWR);
> +    if (self->dev_fd < 0) {
> +        PERROR("opening /dev/HA_compare fails");
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static void wait_new_checkpoint(CheckpointObject *self)
> +{
> +    int dev_fd = self->dev_fd;
> +    int err;
> +
> +    while (1) {
> +        err = ioctl(dev_fd, COMP_IOCTWAIT);
> +        if (err == 0)
> +            break;
> +
> +        if (err == -1 && errno != ERESTART && errno != ETIME) {
> +            fprintf(stderr, "ioctl() returns -1, errno: %d\n", errno);
> +        }
> +    }
> +}
> +
>  /* private functions */
>  
>  /* bounce C suspend call into python equivalent.
> @@ -289,6 +498,13 @@ static int suspend_trampoline(void* data)
>  
>    PyObject* result;
>  
> +  if (self->colo) {
> +    if (notify_slaver_suspend(self) < 0) {
> +      fprintf(stderr, "nofitying slaver suspend fails\n");
> +      return 0;
> +    }
> +  }
> +
>    /* call default suspend function, then python hook if available */
>    if (self->armed) {
>      if (checkpoint_wait(&self->cps) < 0) {
> @@ -307,8 +523,16 @@ static int suspend_trampoline(void* data)
>      }
>    }
>  
> +  /* suspend_cb() should be called after both sides are suspended */
> +  if (self->colo) {
> +    if (wait_slaver_suspend(self) < 0) {
> +      fprintf(stderr, "waiting slaver suspend fails\n");
> +      return 0;
> +    }
> +  }
> +
>    if (!self->suspend_cb)
> -    return 1;
> +    goto start_checkpoint;
>  
>    PyEval_RestoreThread(self->threadstate);
>    result = PyObject_CallFunction(self->suspend_cb, NULL);
> @@ -319,12 +543,32 @@ static int suspend_trampoline(void* data)
>  
>    if (result == Py_None || PyObject_IsTrue(result)) {
>      Py_DECREF(result);
> -    return 1;
> +    goto start_checkpoint;
>    }
>  
>    Py_DECREF(result);
>  
>    return 0;
> +
> +start_checkpoint:
> +  if (self->colo) {
> +    if (notify_slaver_start_checkpoint(self) < 0) {
> +      fprintf(stderr, "nofitying slaver to start checkpoint fails\n");
> +      return 0;
> +    }
> +
> +    /* PVM is suspended first when doing live migration,
> +     * and then it is suspended for a new checkpoint.
> +     */
> +    if (self->first_time == 1)
> +        /* live migration */
> +        self->first_time = 2;
> +    else if (self->first_time == 2)
> +        /* the first checkpoint */
> +        self->first_time = 0;
> +  }
> +
> +  return 1;
>  }
>  
>  static int postcopy_trampoline(void* data)
> @@ -334,6 +578,13 @@ static int postcopy_trampoline(void* data)
>    PyObject* result;
>    int rc = 0;
>  
> +  if (self->colo) {
> +    if (notify_slaver_resume(self) < 0) {
> +      fprintf(stderr, "nofitying slaver resume fails\n");
> +      return 0;
> +    }
> +  }
> +
>    if (!self->postcopy_cb)
>      goto resume;
>  
> @@ -352,6 +603,13 @@ static int postcopy_trampoline(void* data)
>      return 0;
>    }
>  
> +  if (self->colo) {
> +    if (colo_postresume(self) < 0) {
> +      fprintf(stderr, "postresume fails\n");
> +      return 0;
> +    }
> +  }
> +
>    return rc;
>  }
>  
> @@ -366,8 +624,15 @@ static int checkpoint_trampoline(void* data)
>        return -1;
>    }
>  
> +  if (self->colo) {
> +    if (pre_checkpoint(self) < 0) {
> +      fprintf(stderr, "pre_checkpoint() fails\n");
> +      return -1;
> +    }
> +  }
> +
>    if (!self->checkpoint_cb)
> -    return 0;
> +    goto wait_checkpoint;
>  
>    PyEval_RestoreThread(self->threadstate);
>    result = PyObject_CallFunction(self->checkpoint_cb, NULL);
> @@ -378,10 +643,39 @@ static int checkpoint_trampoline(void* data)
>  
>    if (result == Py_None || PyObject_IsTrue(result)) {
>      Py_DECREF(result);
> -    return 1;
> +    goto wait_checkpoint;
>    }
>  
>    Py_DECREF(result);
>  
>    return 0;
> +
> +wait_checkpoint:
> +  if (self->colo) {
> +    wait_new_checkpoint(self);
> +  }
> +
> +  fprintf(stderr, "\n\nnew checkpoint..........\n");
> +
> +  return 1;
> +}
> +
> +static int post_sendstate_trampoline(void* data)
> +{
> +  CheckpointObject *self = data;
> +  int fd = self->cps.fd;
> +  int i = XC_SAVE_ID_LAST_CHECKPOINT;
> +
> +  if (!self->colo)
> +    return 0;
> +
> +  /* In colo mode, guest is running on slaver side, so we should
> +   * send XC_SAVE_ID_LAST_CHECKPOINT to slaver.
> +   */
> +  if (write_exact(fd, &i, sizeof(int)) < 0) {
> +    fprintf(stderr, "writing XC_SAVE_ID_LAST_CHECKPOINT fails\n");
> +    return -1;
> +  }
> +
> +  return 0;
>  }
> diff --git a/tools/python/xen/lowlevel/checkpoint/checkpoint.h 
> b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
> index 187d9d7..96fc949 100644
> --- a/tools/python/xen/lowlevel/checkpoint/checkpoint.h
> +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h
> @@ -41,6 +41,7 @@ typedef struct {
>  } checkpoint_state;
>  
>  #define CHECKPOINT_FLAGS_COMPRESSION 1
> +#define CHECKPOINT_FLAGS_COLO        2
>  char* checkpoint_error(checkpoint_state* s);
>  
>  void checkpoint_init(checkpoint_state* s);


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.