[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] [xen-unstable] Remus: add python control extensions

# HG changeset patch
# User Keir Fraser <keir.fraser@xxxxxxxxxx>
# Date 1258126417 0
# Node ID ea0e302362bb49c679e203bc7f0d8c9165c6f9d9
# Parent  64599a2d310d9d7b7cd48fcb09393f47f1d26026
Remus: add python control extensions

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
 tools/python/setup.py                                |   25 
 tools/python/xen/lowlevel/checkpoint/checkpoint.c    |  363 ++++++++
 tools/python/xen/lowlevel/checkpoint/checkpoint.h    |   59 +
 tools/python/xen/lowlevel/checkpoint/libcheckpoint.c |  782 +++++++++++++++++++
 tools/python/xen/lowlevel/netlink/libnetlink.c       |  585 ++++++++++++++
 tools/python/xen/lowlevel/netlink/libnetlink.h       |   58 +
 tools/python/xen/lowlevel/netlink/netlink.c          |  211 +++++
 tools/python/xen/remus/blkdev.py                     |   31 
 tools/python/xen/remus/image.py                      |  227 +++++
 tools/python/xen/remus/netlink.py                    |  314 +++++++
 tools/python/xen/remus/profile.py                    |   56 +
 tools/python/xen/remus/qdisc.py                      |  178 ++++
 tools/python/xen/remus/save.py                       |  172 ++++
 tools/python/xen/remus/tapdisk.py                    |    4 
 tools/python/xen/remus/util.py                       |   31 
 tools/python/xen/remus/vbd.py                        |    9 
 tools/python/xen/remus/vdi.py                        |  121 ++
 tools/python/xen/remus/vif.py                        |   14 
 tools/python/xen/remus/vm.py                         |  156 +++
 19 files changed, 3393 insertions(+), 3 deletions(-)

diff -r 64599a2d310d -r ea0e302362bb tools/python/setup.py
--- a/tools/python/setup.py     Fri Nov 13 15:31:45 2009 +0000
+++ b/tools/python/setup.py     Fri Nov 13 15:33:37 2009 +0000
@@ -67,10 +67,28 @@ ptsname = Extension("ptsname",
                libraries          = libraries,
                sources            = [ "ptsname/ptsname.c" ])
+checkpoint = Extension("checkpoint",
+                       extra_compile_args = extra_compile_args,
+                       include_dirs       = include_dirs,
+                       library_dirs       = library_dirs,
+                       libraries          = libraries + [ "rt" ],
+                       sources            = [ 
+netlink = Extension("netlink",
+                    extra_compile_args = extra_compile_args,
+                    include_dirs       = include_dirs,
+                    library_dirs       = library_dirs,
+                    libraries          = libraries,
+                    sources            = [ "xen/lowlevel/netlink/netlink.c",
 modules = [ xc, xs, ptsname, acm, flask ]
-if os.uname()[0] == 'SunOS':
-    modules.append(scf)
-    modules.append(process)
+plat = os.uname()[0]
+if plat == 'SunOS':
+    modules.extend([ scf, process ])
+if plat == 'Linux':
+    modules.extend([ checkpoint, netlink ])
 setup(name            = 'xen',
       version         = '3.0',
@@ -89,6 +107,7 @@ setup(name            = 'xen',
+                         'xen.remus',
diff -r 64599a2d310d -r ea0e302362bb 
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c Fri Nov 13 15:33:37 
2009 +0000
@@ -0,0 +1,363 @@
+/* python bridge to checkpointing API */
+#include <Python.h>
+#include <xs.h>
+#include <xenctrl.h>
+#include "checkpoint.h"
+#define PKG "xen.lowlevel.checkpoint"
+static PyObject* CheckpointError;
+typedef struct {
+  PyObject_HEAD
+  checkpoint_state cps;
+  /* milliseconds between checkpoints */
+  unsigned int interval;
+  int armed;
+  PyObject* suspend_cb;
+  PyObject* postcopy_cb;
+  PyObject* checkpoint_cb;
+  PyThreadState* threadstate;
+} CheckpointObject;
+static int suspend_trampoline(void* data);
+static int postcopy_trampoline(void* data);
+static int checkpoint_trampoline(void* data);
+static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args,
+                               PyObject* kwargs)
+  CheckpointObject* self = (CheckpointObject*)type->tp_alloc(type, 0);
+  if (!self)
+    return NULL;
+  checkpoint_init(&self->cps);
+  self->suspend_cb = NULL;
+  self->armed = 0;
+  return (PyObject*)self;
+static int Checkpoint_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+  return 0;
+static void Checkpoint_dealloc(CheckpointObject* self)
+  checkpoint_close(&self->cps);
+  self->ob_type->tp_free((PyObject*)self);
+static PyObject* pycheckpoint_open(PyObject* obj, PyObject* args)
+  CheckpointObject* self = (CheckpointObject*)obj;
+  checkpoint_state* cps = &self->cps;
+  unsigned int domid;
+  if (!PyArg_ParseTuple(args, "I", &domid))
+    return NULL;
+  if (checkpoint_open(cps, domid) < 0) {
+    PyErr_SetString(CheckpointError, checkpoint_error(cps));
+    return NULL;
+  }
+static PyObject* pycheckpoint_close(PyObject* obj, PyObject* args)
+  CheckpointObject* self = (CheckpointObject*)obj;
+  checkpoint_close(&self->cps);
+  Py_XDECREF(self->suspend_cb);
+  self->suspend_cb = NULL;
+  Py_XDECREF(self->postcopy_cb);
+  self->postcopy_cb = NULL;
+  Py_XDECREF(self->checkpoint_cb);
+  self->checkpoint_cb = NULL;
+static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) {
+  CheckpointObject* self = (CheckpointObject*)obj;
+  PyObject* iofile;
+  PyObject* suspend_cb = NULL;
+  PyObject* postcopy_cb = NULL;
+  PyObject* checkpoint_cb = NULL;
+  unsigned int interval = 0;
+  int fd;
+  struct save_callbacks callbacks;
+  int rc;
+  if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb,
+                       &checkpoint_cb, &interval))
+    return NULL;
+  self->interval = interval;
+  Py_INCREF(iofile);
+  Py_XINCREF(suspend_cb);
+  Py_XINCREF(postcopy_cb);
+  Py_XINCREF(checkpoint_cb);
+  fd = PyObject_AsFileDescriptor(iofile);
+  Py_DECREF(iofile);
+  if (fd < 0) {
+    PyErr_SetString(PyExc_TypeError, "invalid file handle");
+    return NULL;
+  }
+  if (suspend_cb && suspend_cb != Py_None) {
+    if (!PyCallable_Check(suspend_cb)) {
+      PyErr_SetString(PyExc_TypeError, "suspend callback not callable");
+      goto err;
+    }
+    self->suspend_cb = suspend_cb;
+  } else
+    self->suspend_cb = NULL;
+  if (postcopy_cb && postcopy_cb != Py_None) {
+    if (!PyCallable_Check(postcopy_cb)) {
+      PyErr_SetString(PyExc_TypeError, "postcopy callback not callable");
+      return NULL;
+    }
+    self->postcopy_cb = postcopy_cb;
+  } else
+    self->postcopy_cb = NULL;
+  if (checkpoint_cb && checkpoint_cb != Py_None) {
+    if (!PyCallable_Check(checkpoint_cb)) {
+      PyErr_SetString(PyExc_TypeError, "checkpoint callback not callable");
+      return NULL;
+    }
+    self->checkpoint_cb = checkpoint_cb;
+  } else
+    self->checkpoint_cb = NULL;
+  callbacks.suspend = suspend_trampoline;
+  callbacks.postcopy = postcopy_trampoline;
+  callbacks.checkpoint = checkpoint_trampoline;
+  callbacks.data = self;
+  self->threadstate = PyEval_SaveThread();
+  rc = checkpoint_start(&self->cps, fd, &callbacks);
+  PyEval_RestoreThread(self->threadstate);
+  if (rc < 0) {
+    PyErr_SetString(CheckpointError, checkpoint_error(&self->cps));
+    goto err;
+  }
+  err:
+  self->suspend_cb = NULL;
+  Py_XDECREF(suspend_cb);
+  self->postcopy_cb = NULL;
+  Py_XDECREF(postcopy_cb);
+  self->checkpoint_cb = NULL;
+  Py_XDECREF(checkpoint_cb);
+  return NULL;
+static PyMethodDef Checkpoint_methods[] = {
+  { "open", pycheckpoint_open, METH_VARARGS,
+    "open connection to xen" },
+  { "close", pycheckpoint_close, METH_NOARGS,
+    "close connection to xen" },
+  { "start", pycheckpoint_start, METH_VARARGS | METH_KEYWORDS,
+    "begin a checkpoint" },
+  { NULL, NULL, 0, NULL }
+static PyTypeObject CheckpointType = {
+  0,                          /* ob_size           */
+  PKG ".checkpointer",   /* tp_name           */
+  sizeof(CheckpointObject),   /* tp_basicsize      */
+  0,                          /* tp_itemsize       */
+  (destructor)Checkpoint_dealloc, /* tp_dealloc        */
+  NULL,                       /* tp_print          */
+  NULL,                       /* tp_getattr        */
+  NULL,                       /* tp_setattr        */
+  NULL,                       /* tp_compare        */
+  NULL,                       /* tp_repr           */
+  NULL,                       /* tp_as_number      */
+  NULL,                       /* tp_as_sequence    */
+  NULL,                       /* tp_as_mapping     */
+  NULL,                       /* tp_hash           */
+  NULL,                       /* tp_call           */
+  NULL,                       /* tp_str            */
+  NULL,                       /* tp_getattro       */
+  NULL,                       /* tp_setattro       */
+  NULL,                       /* tp_as_buffer      */
+  Py_TPFLAGS_DEFAULT,         /* tp_flags          */
+  "Checkpoint object",        /* tp_doc            */
+  NULL,                       /* tp_traverse       */
+  NULL,                       /* tp_clear          */
+  NULL,                       /* tp_richcompare    */
+  0,                          /* tp_weaklistoffset */
+  NULL,                       /* tp_iter           */
+  NULL,                       /* tp_iternext       */
+  Checkpoint_methods,         /* tp_methods        */
+  NULL,                       /* tp_members        */
+  NULL,                       /* tp_getset         */
+  NULL,                       /* tp_base           */
+  NULL,                       /* tp_dict           */
+  NULL,                       /* tp_descr_get      */
+  NULL,                       /* tp_descr_set      */
+  0,                          /* tp_dictoffset     */
+  (initproc)Checkpoint_init,  /* tp_init           */
+  NULL,                       /* tp_alloc          */
+  Checkpoint_new,             /* tp_new            */
+static PyMethodDef methods[] = {
+  { NULL }
+static char doc[] = "checkpoint API";
+PyMODINIT_FUNC initcheckpoint(void) {
+  PyObject *m;
+  if (PyType_Ready(&CheckpointType) < 0)
+    return;
+  m = Py_InitModule3(PKG, methods, doc);
+  if (!m)
+    return;
+  Py_INCREF(&CheckpointType);
+  PyModule_AddObject(m, "checkpointer", (PyObject*)&CheckpointType);
+  CheckpointError = PyErr_NewException(PKG ".error", NULL, NULL);
+  Py_INCREF(CheckpointError);
+  PyModule_AddObject(m, "error", CheckpointError);
+  block_timer();
+/* private functions */
+/* bounce C suspend call into python equivalent.
+ * returns 1 on success or 0 on failure */
+static int suspend_trampoline(void* data)
+  CheckpointObject* self = (CheckpointObject*)data;
+  PyObject* result;
+  /* call default suspend function, then python hook if available */
+  if (self->armed) {
+    if (checkpoint_wait(&self->cps) < 0) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return 0;
+    }
+  } else {
+    if (self->interval) {
+      self->armed = 1;
+      checkpoint_settimer(&self->cps, self->interval);
+    }
+    if (!checkpoint_suspend(&self->cps)) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return 0;
+    }
+  }
+  if (!self->suspend_cb)
+    return 1;
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->suspend_cb, NULL);
+  self->threadstate = PyEval_SaveThread();
+  if (!result)
+    return 0;
+  if (result == Py_None || PyObject_IsTrue(result)) {
+    Py_DECREF(result);
+    return 1;
+  }
+  Py_DECREF(result);
+  return 0;
+static int postcopy_trampoline(void* data)
+  CheckpointObject* self = (CheckpointObject*)data;
+  PyObject* result;
+  int rc = 0;
+  if (!self->postcopy_cb)
+    goto resume;
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->postcopy_cb, NULL);
+  if (result && (result == Py_None || PyObject_IsTrue(result)))
+    rc = 1;
+  Py_XDECREF(result);
+  self->threadstate = PyEval_SaveThread();
+  resume:
+  if (checkpoint_resume(&self->cps) < 0) {
+    fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+    return 0;
+  }
+  return rc;
+static int checkpoint_trampoline(void* data)
+  CheckpointObject* self = (CheckpointObject*)data;
+  PyObject* result;
+  if (checkpoint_postflush(&self->cps) < 0) {
+      fprintf(stderr, "%s\n", checkpoint_error(&self->cps));
+      return -1;
+  }
+  if (!self->checkpoint_cb)
+    return 0;
+  PyEval_RestoreThread(self->threadstate);
+  result = PyObject_CallFunction(self->checkpoint_cb, NULL);
+  self->threadstate = PyEval_SaveThread();
+  if (!result)
+    return 0;
+  if (result == Py_None || PyObject_IsTrue(result)) {
+    Py_DECREF(result);
+    return 1;
+  }
+  Py_DECREF(result);
+  return 0;
diff -r 64599a2d310d -r ea0e302362bb 
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h Fri Nov 13 15:33:37 
2009 +0000
@@ -0,0 +1,59 @@
+/* API for checkpointing */
+#ifndef _CHECKPOINT_H_
+#define _CHECKPOINT_H_ 1
+#include <pthread.h>
+#include <semaphore.h>
+#include <time.h>
+#include <xenguest.h>
+#include <xs.h>
+typedef enum {
+    dt_unknown,
+    dt_pv,
+    dt_hvm,
+    dt_pvhvm /* HVM with PV drivers */
+} checkpoint_domtype;
+typedef struct {
+    int xch;               /* xc handle */
+    int xce;               /* event channel handle */
+    struct xs_handle* xsh; /* xenstore handle */
+    int watching_shutdown; /* state of watch on @releaseDomain */
+    unsigned int domid;
+    checkpoint_domtype domtype;
+    int fd;
+    int suspend_evtchn;
+    char* errstr;
+    /* suspend deadline thread support */
+    volatile int suspended;
+    volatile int done;
+    pthread_t suspend_thr;
+    sem_t suspended_sem;
+    sem_t resumed_sem;
+    timer_t timer;
+} checkpoint_state;
+char* checkpoint_error(checkpoint_state* s);
+void checkpoint_init(checkpoint_state* s);
+int checkpoint_open(checkpoint_state* s, unsigned int domid);
+void checkpoint_close(checkpoint_state* s);
+int checkpoint_start(checkpoint_state* s, int fd,
+                    struct save_callbacks* callbacks);
+int checkpoint_suspend(checkpoint_state* s);
+int checkpoint_resume(checkpoint_state* s);
+int checkpoint_postflush(checkpoint_state* s);
+int checkpoint_settimer(checkpoint_state* s, int millis);
+int checkpoint_wait(checkpoint_state* s);
+void block_timer(void);
+void unblock_timer(void);
diff -r 64599a2d310d -r ea0e302362bb 
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c      Fri Nov 13 
15:33:37 2009 +0000
@@ -0,0 +1,782 @@
+/* API for checkpointing */
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <signal.h>
+#include <sys/stat.h>
+#include <xenctrl.h>
+#include <xenguest.h>
+#include <xs.h>
+#include "checkpoint.h"
+static char errbuf[256];
+static int setup_suspend_evtchn(checkpoint_state* s);
+static void release_suspend_evtchn(checkpoint_state *s);
+static int setup_shutdown_watch(checkpoint_state* s);
+static int check_shutdown_watch(checkpoint_state* s);
+static void release_shutdown_watch(checkpoint_state* s);
+static int poll_evtchn(checkpoint_state* s);
+static int switch_qemu_logdirty(checkpoint_state* s, int enable);
+static int suspend_hvm(checkpoint_state* s);
+static int suspend_qemu(checkpoint_state* s);
+static int resume_qemu(checkpoint_state* s);
+static int send_qemu(checkpoint_state* s);
+static int create_suspend_timer(checkpoint_state* s);
+static int delete_suspend_timer(checkpoint_state* s);
+static int create_suspend_thread(checkpoint_state* s);
+static void stop_suspend_thread(checkpoint_state* s);
+/* Returns a string describing the most recent error returned by
+ * a checkpoint function. Static -- do not free. */
+char* checkpoint_error(checkpoint_state* s)
+    return s->errstr;
+void checkpoint_init(checkpoint_state* s)
+    s->xch = -1;
+    s->xce = -1;
+    s->xsh = NULL;
+    s->watching_shutdown = 0;
+    s->domid = 0;
+    s->domtype = dt_unknown;
+    s->fd = -1;
+    s->suspend_evtchn = -1;
+    s->errstr = NULL;
+    s->suspended = 0;
+    s->done = 0;
+    s->suspend_thr = 0;
+    s->timer = 0;
+/* open a checkpoint session to guest domid */
+int checkpoint_open(checkpoint_state* s, unsigned int domid)
+    xc_dominfo_t dominfo;
+    unsigned long pvirq;
+    s->domid = domid;
+    s->xch = xc_interface_open();
+    if (s->xch < 0) {
+       s->errstr = "could not open control interface (are you root?)";
+       return -1;
+    }
+    s->xsh = xs_daemon_open();
+    if (!s->xsh) {
+       checkpoint_close(s);
+       s->errstr = "could not open xenstore handle";
+       return -1;
+    }
+    s->xce = xc_evtchn_open();
+    if (s->xce < 0) {
+       checkpoint_close(s);
+       s->errstr = "could not open event channel handle";
+       return -1;
+    }
+    if (xc_domain_getinfo(s->xch, s->domid, 1, &dominfo) < 0) {
+       checkpoint_close(s);
+       s->errstr = "could not get domain info";
+       return -1;
+    }
+    if (dominfo.hvm) {
+       if (xc_get_hvm_param(s->xch, s->domid, HVM_PARAM_CALLBACK_IRQ, &pvirq)) 
+           checkpoint_close(s);
+           s->errstr = "could not get HVM callback IRQ";
+           return -1;
+       }
+       s->domtype = pvirq ? dt_pvhvm : dt_hvm;
+    } else
+       s->domtype = dt_pv;
+    if (setup_shutdown_watch(s) < 0) {
+       checkpoint_close(s);
+       return -1;
+    }
+    if (s->domtype == dt_pv) {
+       if (setup_suspend_evtchn(s) < 0) {
+           checkpoint_close(s);
+           return -1;
+       }
+    } else if (s->domtype == dt_pvhvm) {
+       checkpoint_close(s);
+       s->errstr = "PV-on-HVM is unsupported";
+       return -1;
+    }
+    return 0;
+void checkpoint_close(checkpoint_state* s)
+  if (s->timer)
+    delete_suspend_timer(s);
+  if (s->suspend_thr)
+    stop_suspend_thread(s);
+  release_shutdown_watch(s);
+  release_suspend_evtchn(s);
+  if (s->xch >= 0) {
+    xc_interface_close(s->xch);
+    s->xch = -1;
+  }
+  if (s->xce >= 0) {
+    xc_evtchn_close(s->xce);
+    s->xce = -1;
+  }
+  if (s->xsh) {
+    xs_daemon_close(s->xsh);
+    s->xsh = NULL;
+  }
+  s->domid = 0;
+  s->fd = -1;
+  s->suspend_evtchn = -1;
+/* we toggle logdirty ourselves around the xc_domain_save call --
+ * it avoids having to pass around checkpoint_state */
+static void noop_switch_logdirty(int domid, unsigned enable)
+    return;
+int checkpoint_start(checkpoint_state* s, int fd,
+                    struct save_callbacks* callbacks)
+    int hvm, rc;
+    int flags = XCFLAGS_LIVE;
+    if (!s->domid) {
+       s->errstr = "checkpoint state not opened";
+       return -1;
+    }
+    s->fd = fd;
+    hvm = s->domtype > dt_pv;
+    if (hvm) {
+       flags |= XCFLAGS_HVM;
+       if ((rc = switch_qemu_logdirty(s, 1)))
+           return rc;
+    }
+    rc = xc_domain_save(s->xch, fd, s->domid, 0, 0, flags, callbacks, hvm,
+       noop_switch_logdirty);
+    if (hvm)
+       switch_qemu_logdirty(s, 0);
+    return rc;
+/* suspend the domain. Returns 0 on failure, 1 on success */
+int checkpoint_suspend(checkpoint_state* s)
+  struct timeval tv;
+  int rc;
+  gettimeofday(&tv, NULL);
+  fprintf(stderr, "PROF: suspending at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+         (unsigned long)tv.tv_usec);
+  if (s->domtype == dt_hvm) {
+      return suspend_hvm(s) < 0 ? 0 : 1;
+  }
+  rc = xc_evtchn_notify(s->xce, s->suspend_evtchn);
+  if (rc < 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to notify suspend event channel: %d", rc);
+    s->errstr = errbuf;
+    return 0;
+  }
+  do {
+    rc = poll_evtchn(s);
+  } while (rc >= 0 && rc != s->suspend_evtchn);
+  if (rc <= 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to receive suspend notification: %d", rc);
+    s->errstr = errbuf;
+    return 0;
+  }
+  if (xc_evtchn_unmask(s->xce, s->suspend_evtchn) < 0) {
+    snprintf(errbuf, sizeof(errbuf),
+            "failed to unmask suspend notification channel: %d", rc);
+    s->errstr = errbuf;
+    return 0;
+  }
+  return 1;
+/* wait for a suspend to be triggered by another thread */
+int checkpoint_wait(checkpoint_state* s)
+  int rc;
+  if (!s->suspend_thr) {
+    s->errstr = "checkpoint timer is not active\n";
+    return -1;
+  }
+  do {
+    rc = sem_wait(&s->suspended_sem);
+    if (rc < 0 && errno != EINTR) {
+      snprintf(errbuf, sizeof(errbuf),
+              "error waiting for suspend semaphore: %d %d\n", rc, errno);
+      s->errstr = errbuf;
+      return -1;
+    }
+  } while (rc < 0);
+  if (!s->suspended) {
+    snprintf(errbuf, sizeof(errbuf), "domain not suspended?\n");
+    s->errstr = errbuf;
+    return -1;
+  }
+  return 0;
+/* let guest execution resume */
+int checkpoint_resume(checkpoint_state* s)
+  struct timeval tv;
+  int rc;
+  if (xc_domain_resume(s->xch, s->domid, 1)) {
+    snprintf(errbuf, sizeof(errbuf), "error resuming domain: %d", errno);
+    s->errstr = errbuf;
+    return -1;
+  }
+  gettimeofday(&tv, NULL);
+  fprintf(stderr, "PROF: resumed at %lu.%06lu\n", (unsigned long)tv.tv_sec,
+         (unsigned long)tv.tv_usec);
+  if (s->domtype > dt_pv && resume_qemu(s) < 0)
+      return -1;
+  /* restore watchability in xenstore */
+  if (xs_resume_domain(s->xsh, s->domid) < 0)
+    fprintf(stderr, "error resuming domain in xenstore\n");
+  s->suspended = 0;
+  if (s->suspend_thr) {
+    if ((rc = sem_post(&s->resumed_sem)))
+      fprintf(stderr, "error posting resume semaphore\n");
+  }
+  return 0;
+/* called after xc_domain_save has flushed its buffer */
+int checkpoint_postflush(checkpoint_state *s)
+    if (s->domtype > dt_pv && send_qemu(s) < 0)
+       return -1;
+    return 0;
+/* force suspend within millis ms if copy hasn't completed yet */
+int checkpoint_settimer(checkpoint_state* s, int millis)
+  struct itimerspec t;
+  int err;
+  if (!s->suspend_thr) {
+    if (create_suspend_timer(s) < 0)
+      return -1;
+    if (create_suspend_thread(s) < 0) {
+      delete_suspend_timer(s);
+      return -1;
+    }
+  }
+  t.it_value.tv_sec = millis / 1000;
+  t.it_value.tv_nsec = (millis % 1000) * 1000000L;
+  t.it_interval.tv_sec = t.it_value.tv_sec;
+  t.it_interval.tv_nsec = t.it_value.tv_nsec;
+  if ((err = timer_settime(s->timer, 0, &t, NULL))) {
+    fprintf(stderr, "Error arming timer: %d\n", err);
+    return -1;
+  }
+  return 0;
+int delete_suspend_timer(checkpoint_state* s)
+  int rc = 0;
+  if (s->timer) {
+    if ((rc = timer_delete(s->timer)))
+      fprintf(stderr, "Error deleting timer: %s\n", strerror(errno));
+    s->timer = NULL;
+  }
+  return rc;
+/* Set up event channel used to signal a guest to suspend itself */
+static int setup_suspend_evtchn(checkpoint_state* s)
+  int port;
+  port = xs_suspend_evtchn_port(s->domid);
+  if (port < 0) {
+    s->errstr = "failed to read suspend event channel";
+    return -1;
+  }
+  s->suspend_evtchn = xc_suspend_evtchn_init(s->xch, s->xce, s->domid, port);
+  if (s->suspend_evtchn < 0) {
+    snprintf(errbuf, sizeof(errbuf), "failed to bind suspend event channel");
+    s->errstr = errbuf;
+    return -1;
+  }
+  fprintf(stderr, "bound to suspend event channel %u:%d as %d\n", s->domid, 
+    s->suspend_evtchn);
+  return 0;
+/* release suspend event channels bound to guest */
+static void release_suspend_evtchn(checkpoint_state *s)
+  /* TODO: teach xen to clean up if port is unbound */
+  if (s->xce >= 0 && s->suspend_evtchn > 0) {
+    xc_suspend_evtchn_release(s->xce, s->suspend_evtchn);
+    s->suspend_evtchn = 0;
+  }
+static int setup_shutdown_watch(checkpoint_state* s)
+  char buf[16];
+  /* write domain ID to watch so we can ignore other domain shutdowns */
+  snprintf(buf, sizeof(buf), "%u", s->domid);
+  if ( !xs_watch(s->xsh, "@releaseDomain", buf) ) {
+    fprintf(stderr, "Could not bind to shutdown watch\n");
+    return -1;
+  }
+  /* watch fires once on registration */
+  s->watching_shutdown = 1;
+  check_shutdown_watch(s);
+  return 0;
+static int check_shutdown_watch(checkpoint_state* s) {
+  unsigned int count;
+  char **vec;
+  char buf[16];
+  vec = xs_read_watch(s->xsh, &count);
+  if (s->watching_shutdown == 1) {
+      s->watching_shutdown = 2;
+      return 0;
+  }
+  if (!vec) {
+    fprintf(stderr, "empty watch fired\n");
+    return 0;
+  }
+  snprintf(buf, sizeof(buf), "%d", s->domid);
+  if (!strcmp(vec[XS_WATCH_TOKEN], buf)) {
+    fprintf(stderr, "domain %d shut down\n", s->domid);
+    return -1;
+  }
+  return 0;
+static void release_shutdown_watch(checkpoint_state* s) {
+  char buf[16];
+  if (!s->xsh)
+    return;
+  if (!s->watching_shutdown)
+      return;
+  snprintf(buf, sizeof(buf), "%u", s->domid);
+  if (!xs_unwatch(s->xsh, "@releaseDomain", buf))
+    fprintf(stderr, "Could not release shutdown watch\n");
+/* wrapper around xc_evtchn_pending which detects errors */
+static int poll_evtchn(checkpoint_state* s)
+  int fd, xsfd, maxfd;
+  fd_set rfds, efds;
+  struct timeval tv;
+  int rc;
+  fd = xc_evtchn_fd(s->xce);
+  xsfd = xs_fileno(s->xsh);
+  maxfd = fd > xsfd ? fd : xsfd;
+  FD_ZERO(&rfds);
+  FD_ZERO(&efds);
+  FD_SET(fd, &rfds);
+  FD_SET(xsfd, &rfds);
+  FD_SET(fd, &efds);
+  FD_SET(xsfd, &efds);
+  /* give it 500 ms to respond */
+  tv.tv_sec = 0;
+  tv.tv_usec = 500000;
+  rc = select(maxfd + 1, &rfds, NULL, &efds, &tv);
+  if (rc < 0)
+    fprintf(stderr, "error polling event channel: %s\n", strerror(errno));
+  else if (!rc)
+    fprintf(stderr, "timeout waiting for event channel\n");
+  else if (FD_ISSET(fd, &rfds))
+    return xc_evtchn_pending(s->xce);
+  else if (FD_ISSET(xsfd, &rfds))
+    return check_shutdown_watch(s);
+  return -1;
+/* adapted from the eponymous function in xc_save */
+static int switch_qemu_logdirty(checkpoint_state *s, int enable)
+    char path[128];
+    char *tail, *cmd, *response;
+    char **vec;
+    unsigned int len;
+    sprintf(path, "/local/domain/0/device-model/%u/logdirty/", s->domid);
+    tail = path + strlen(path);
+    strcpy(tail, "ret");
+    if (!xs_watch(s->xsh, path, "qemu-logdirty-ret")) {
+       s->errstr = "error watching qemu logdirty return";
+       return -1;
+    }
+    /* null fire. XXX unify with shutdown watch! */
+    vec = xs_read_watch(s->xsh, &len);
+    free(vec);
+    strcpy(tail, "cmd");
+    cmd = enable ? "enable" : "disable";
+    if (!xs_write(s->xsh, XBT_NULL, path, cmd, strlen(cmd))) {
+       s->errstr = "error signalling qemu logdirty";
+       return -1;
+    }
+    vec = xs_read_watch(s->xsh, &len);
+    free(vec);
+    strcpy(tail, "ret");
+    xs_unwatch(s->xsh, path, "qemu-logdirty-ret");
+    response = xs_read(s->xsh, XBT_NULL, path, &len);
+    if (!len || strcmp(response, cmd)) {
+       if (len)
+           free(response);
+       s->errstr = "qemu logdirty command failed";
+       return -1;
+    }
+    free(response);
+    fprintf(stderr, "qemu logdirty mode: %s\n", cmd);
+    return 0;
+static int suspend_hvm(checkpoint_state *s)
+    int rc = -1;
+    fprintf(stderr, "issuing HVM suspend hypercall\n");
+    rc = xc_domain_shutdown(s->xch, s->domid, SHUTDOWN_suspend);
+    if (rc < 0) {
+       s->errstr = "shutdown hypercall failed";
+       return -1;
+    }
+    fprintf(stderr, "suspend hypercall returned %d\n", rc);
+    if (check_shutdown_watch(s) >= 0)
+       return -1;
+    rc = suspend_qemu(s);
+    return rc;
+static int suspend_qemu(checkpoint_state *s)
+    char path[128];
+    fprintf(stderr, "pausing QEMU\n");
+    sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+    if (!xs_write(s->xsh, XBT_NULL, path, "save", 4)) {
+       fprintf(stderr, "error signalling QEMU to save\n");
+       return -1;
+    }
+    sprintf(path, "/local/domain/0/device-model/%d/state", s->domid);
+    do {
+       char* state;
+       unsigned int len;
+       state = xs_read(s->xsh, XBT_NULL, path, &len);
+       if (!state) {
+           s->errstr = "error reading QEMU state";
+           return -1;
+       }
+       if (!strcmp(state, "paused")) {
+           free(state);
+           return 0;
+       }
+       free(state);
+       usleep(1000);
+    } while(1);
+    return -1;
+static int resume_qemu(checkpoint_state *s)
+    char path[128];
+    fprintf(stderr, "resuming QEMU\n");
+    sprintf(path, "/local/domain/0/device-model/%d/command", s->domid);
+    if (!xs_write(s->xsh, XBT_NULL, path, "continue", 8)) {
+       fprintf(stderr, "error signalling QEMU to resume\n");
+       return -1;
+    }
+    return 0;
+static int send_qemu(checkpoint_state *s)
+    char buf[8192];
+    char path[128];
+    struct stat sb;
+    uint32_t qlen = 0;
+    int qfd;
+    int rc;
+    if (s->fd < 0)
+       return -1;
+    sprintf(path, "/var/lib/xen/qemu-save.%d", s->domid);
+    if (stat(path, &sb) < 0) {
+       snprintf(errbuf, sizeof(errbuf),
+               "error getting QEMU state file status: %s", strerror(errno));
+       s->errstr = errbuf;
+       return -1;
+    }
+    qlen = sb.st_size;
+    qfd = open(path, O_RDONLY);
+    if (qfd < 0) {
+       snprintf(errbuf, sizeof(errbuf), "error opening QEMU state file: %s",
+                strerror(errno));
+       s->errstr = errbuf;
+       return -1;
+    }
+    fprintf(stderr, "Sending %u bytes of QEMU state\n", qlen);
+    if (write(s->fd, "RemusDeviceModelState", 21) != 21) {
+       s->errstr = "error writing QEMU header";
+       close(qfd);
+       return -1;
+    }
+    if (write(s->fd, &qlen, sizeof(qlen)) != sizeof(qlen)) {
+       s->errstr = "error writing QEMU size";
+       close(qfd);
+       return -1;
+    }
+    while ((rc = read(qfd, buf, qlen > sizeof(buf) ? sizeof(buf) : qlen)) > 0) 
+       qlen -= rc;
+       if (write(s->fd, buf, rc) != rc) {
+           rc = -1;
+           break;
+       }
+    }
+    if (rc < 0) {
+       snprintf(errbuf, sizeof(errbuf), "error writing QEMU state: %s",
+                strerror(errno));
+       s->errstr = errbuf;
+    }
+    close(qfd);
+    return rc;
+/*thread responsible to suspend the domain early if necessary*/
+static void *suspend_thread(void *arg)
+  checkpoint_state* s = (checkpoint_state*)arg;
+  sigset_t tss;
+  int rc;
+  int sig;
+  fprintf(stderr, "Suspend thread started\n");
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN);
+  while (1) {
+    /* wait for checkpoint thread to signal resume */
+    if ((rc = sem_wait(&s->resumed_sem)))
+      fprintf(stderr, "Error waiting on resume semaphore\n");
+    if ((rc = sigwait(&tss, &sig))) {
+      fprintf(stderr, "sigwait failed: %d %d\n", rc, errno);
+      break;
+    }
+    if (sig != SIGRTMIN)
+      fprintf(stderr, "received unexpected signal %d\n", sig);
+    if (s->done)
+      break;
+    if (s->suspended) {
+      fprintf(stderr, "domain already suspended?\n");
+    } else {
+      rc = checkpoint_suspend(s);
+      if (rc)
+       s->suspended = 1;
+      else
+       fprintf(stderr, "checkpoint_suspend failed\n");
+    }
+    if ((rc = sem_post(&s->suspended_sem)))
+      fprintf(stderr, "Error posting suspend semaphore\n");
+  }
+  fprintf(stderr, "Suspend thread exiting\n");
+  return NULL;
+static int create_suspend_timer(checkpoint_state* s)
+  struct sigevent event;
+  int err;
+  event.sigev_notify = SIGEV_SIGNAL;
+  event.sigev_signo = SIGRTMIN;
+  event.sigev_value.sival_int = 0;
+  if ((err = timer_create(CLOCK_REALTIME, &event, &s->timer))) {
+    snprintf(errbuf, sizeof(errbuf), "Error creating timer: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+  return 0;
+void block_timer(void)
+  sigset_t tss;
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN);
+  pthread_sigmask(SIG_BLOCK, &tss, NULL);
+void unblock_timer(void)
+  sigset_t tss;
+  sigemptyset(&tss);
+  sigaddset(&tss, SIGRTMIN);
+  pthread_sigmask(SIG_UNBLOCK, &tss, NULL);
+static int create_suspend_thread(checkpoint_state* s)
+  int err;
+  if ((err = sem_init(&s->suspended_sem, 0, 0))) {
+    snprintf(errbuf, sizeof(errbuf),
+            "Error initializing suspend semaphore: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+  if ((err = sem_init(&s->resumed_sem, 0, 0))) {
+    snprintf(errbuf, sizeof(errbuf),
+            "Error initializing resume semaphore: %d\n", err);
+    s->errstr = errbuf;
+    return -1;
+  }
+  /* signal mask should be inherited */
+  block_timer();
+  if ((err = pthread_create(&s->suspend_thr, NULL, suspend_thread, s))) {
+    snprintf(errbuf, sizeof(errbuf), "Error creating suspend thread: %d\n", 
+    s->errstr = errbuf;
+    return -1;
+  }
+  return 0;
+static void stop_suspend_thread(checkpoint_state* s)
+  int err;
+  s->done = 1;
+  err = sem_post(&s->resumed_sem);
+  err = pthread_join(s->suspend_thr, NULL);
+  s->suspend_thr = 0;
diff -r 64599a2d310d -r ea0e302362bb 
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.c    Fri Nov 13 15:33:37 
2009 +0000
@@ -0,0 +1,585 @@
+ * libnetlink.c        RTnetlink service routines.
+ *
+ *             This program is free software; you can redistribute it and/or
+ *             modify it under the terms of the GNU General Public License
+ *             as published by the Free Software Foundation; either version
+ *             2 of the License, or (at your option) any later version.
+ *
+ * Authors:    Alexey Kuznetsov, <kuznet@xxxxxxxxxxxxx>
+ *
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <syslog.h>
+#include <fcntl.h>
+#include <net/if_arp.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <errno.h>
+#include <time.h>
+#include <sys/uio.h>
+#include "libnetlink.h"
+void rtnl_close(struct rtnl_handle *rth)
+       close(rth->fd);
+int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions,
+                     int protocol)
+       socklen_t addr_len;
+       int sndbuf = 32768;
+       int rcvbuf = 32768;
+       memset(rth, 0, sizeof(rth));
+       rth->fd = socket(AF_NETLINK, SOCK_RAW, protocol);
+       if (rth->fd < 0) {
+               perror("Cannot open netlink socket");
+               return -1;
+       }
+       if (setsockopt(rth->fd,SOL_SOCKET,SO_SNDBUF,&sndbuf,sizeof(sndbuf)) < 
0) {
+               perror("SO_SNDBUF");
+               return -1;
+       }
+       if (setsockopt(rth->fd,SOL_SOCKET,SO_RCVBUF,&rcvbuf,sizeof(rcvbuf)) < 
0) {
+               perror("SO_RCVBUF");
+               return -1;
+       }
+       memset(&rth->local, 0, sizeof(rth->local));
+       rth->local.nl_family = AF_NETLINK;
+       rth->local.nl_groups = subscriptions;
+       if (bind(rth->fd, (struct sockaddr*)&rth->local, sizeof(rth->local)) < 
0) {
+               perror("Cannot bind netlink socket");
+               return -1;
+       }
+       addr_len = sizeof(rth->local);
+       if (getsockname(rth->fd, (struct sockaddr*)&rth->local, &addr_len) < 0) 
+               perror("Cannot getsockname");
+               return -1;
+       }
+       if (addr_len != sizeof(rth->local)) {
+               fprintf(stderr, "Wrong address length %d\n", addr_len);
+               return -1;
+       }
+       if (rth->local.nl_family != AF_NETLINK) {
+               fprintf(stderr, "Wrong address family %d\n", 
+               return -1;
+       }
+       rth->seq = time(NULL);
+       return 0;
+int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions)
+       return rtnl_open_byproto(rth, subscriptions, NETLINK_ROUTE);
+int rtnl_wilddump_request(struct rtnl_handle *rth, int family, int type)
+       struct {
+               struct nlmsghdr nlh;
+               struct rtgenmsg g;
+       } req;
+       struct sockaddr_nl nladdr;
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       memset(&req, 0, sizeof(req));
+       req.nlh.nlmsg_len = sizeof(req);
+       req.nlh.nlmsg_type = type;
+       req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+       req.nlh.nlmsg_pid = 0;
+       req.nlh.nlmsg_seq = rth->dump = ++rth->seq;
+       req.g.rtgen_family = family;
+       return sendto(rth->fd, (void*)&req, sizeof(req), 0,
+                     (struct sockaddr*)&nladdr, sizeof(nladdr));
+int rtnl_send(struct rtnl_handle *rth, const char *buf, int len)
+       struct sockaddr_nl nladdr;
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       return sendto(rth->fd, buf, len, 0, (struct sockaddr*)&nladdr, 
+int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int len)
+       struct nlmsghdr nlh;
+       struct sockaddr_nl nladdr;
+       struct iovec iov[2] = {
+               { .iov_base = &nlh, .iov_len = sizeof(nlh) },
+               { .iov_base = req, .iov_len = len }
+       };
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen =  sizeof(nladdr),
+               .msg_iov = iov,
+               .msg_iovlen = 2,
+       };
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nlh.nlmsg_len = NLMSG_LENGTH(len);
+       nlh.nlmsg_type = type;
+       nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST;
+       nlh.nlmsg_pid = 0;
+       nlh.nlmsg_seq = rth->dump = ++rth->seq;
+       return sendmsg(rth->fd, &msg, 0);
+int rtnl_dump_filter(struct rtnl_handle *rth,
+                    rtnl_filter_t filter,
+                    void *arg1,
+                    rtnl_filter_t junk,
+                    void *arg2)
+       struct sockaddr_nl nladdr;
+       struct iovec iov;
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char buf[16384];
+       iov.iov_base = buf;
+       while (1) {
+               int status;
+               struct nlmsghdr *h;
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rth->fd, &msg, 0);
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+               h = (struct nlmsghdr*)buf;
+               while (NLMSG_OK(h, status)) {
+                       int err;
+                       if (nladdr.nl_pid != 0 ||
+                           h->nlmsg_pid != rth->local.nl_pid ||
+                           h->nlmsg_seq != rth->dump) {
+                               if (junk) {
+                                       err = junk(&nladdr, h, arg2);
+                                       if (err < 0)
+                                               return err;
+                               }
+                               goto skip_it;
+                       }
+                       if (h->nlmsg_type == NLMSG_DONE)
+                               return 0;
+                       if (h->nlmsg_type == NLMSG_ERROR) {
+                               struct nlmsgerr *err = (struct 
+                               if (h->nlmsg_len < NLMSG_LENGTH(sizeof(struct 
nlmsgerr))) {
+                                       fprintf(stderr, "ERROR truncated\n");
+                               } else {
+                                       errno = -err->error;
+                                       perror("RTNETLINK answers");
+                               }
+                               return -1;
+                       }
+                       err = filter(&nladdr, h, arg1);
+                       if (err < 0)
+                               return err;
+                       h = NLMSG_NEXT(h, status);
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+             unsigned groups, struct nlmsghdr *answer,
+             rtnl_filter_t junk,
+             void *jarg)
+       int status;
+       unsigned seq;
+       struct nlmsghdr *h;
+       struct sockaddr_nl nladdr;
+       struct iovec iov = {
+               .iov_base = (void*) n,
+               .iov_len = n->nlmsg_len
+       };
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char   buf[16384];
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = peer;
+       nladdr.nl_groups = groups;
+       n->nlmsg_seq = seq = ++rtnl->seq;
+       if (answer == NULL)
+               n->nlmsg_flags |= NLM_F_ACK;
+       status = sendmsg(rtnl->fd, &msg, 0);
+       if (status < 0) {
+               perror("Cannot talk to rtnetlink");
+               return -1;
+       }
+       memset(buf,0,sizeof(buf));
+       iov.iov_base = buf;
+       while (1) {
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rtnl->fd, &msg, 0);
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+               if (msg.msg_namelen != sizeof(nladdr)) {
+                       fprintf(stderr, "sender address length == %d\n", 
+                       exit(1);
+               }
+               for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+                       int err;
+                       int len = h->nlmsg_len;
+                       int l = len - sizeof(*h);
+                       if (l<0 || len>status) {
+                               if (msg.msg_flags & MSG_TRUNC) {
+                                       fprintf(stderr, "Truncated message\n");
+                                       return -1;
+                               }
+                               fprintf(stderr, "!!!malformed message: 
len=%d\n", len);
+                               exit(1);
+                       }
+                       if (nladdr.nl_pid != peer ||
+                           h->nlmsg_pid != rtnl->local.nl_pid ||
+                           h->nlmsg_seq != seq) {
+                               if (junk) {
+                                       err = junk(&nladdr, h, jarg);
+                                       if (err < 0)
+                                               return err;
+                               }
+                               continue;
+                       }
+                       if (h->nlmsg_type == NLMSG_ERROR) {
+                               struct nlmsgerr *err = (struct 
+                               if (l < sizeof(struct nlmsgerr)) {
+                                       fprintf(stderr, "ERROR truncated\n");
+                               } else {
+                                       errno = -err->error;
+                                       if (errno == 0) {
+                                               if (answer)
+                                                       memcpy(answer, h, 
+                                               return 0;
+                                       }
+                                       perror("RTNETLINK answers");
+                               }
+                               return -1;
+                       }
+                       if (answer) {
+                               memcpy(answer, h, h->nlmsg_len);
+                               return 0;
+                       }
+                       fprintf(stderr, "Unexpected reply!!!\n");
+                       status -= NLMSG_ALIGN(len);
+                       h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+int rtnl_listen(struct rtnl_handle *rtnl,
+               rtnl_filter_t handler,
+               void *jarg)
+       int status;
+       struct nlmsghdr *h;
+       struct sockaddr_nl nladdr;
+       struct iovec iov;
+       struct msghdr msg = {
+               .msg_name = &nladdr,
+               .msg_namelen = sizeof(nladdr),
+               .msg_iov = &iov,
+               .msg_iovlen = 1,
+       };
+       char   buf[8192];
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = 0;
+       nladdr.nl_groups = 0;
+       iov.iov_base = buf;
+       while (1) {
+               iov.iov_len = sizeof(buf);
+               status = recvmsg(rtnl->fd, &msg, 0);
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("OVERRUN");
+                       continue;
+               }
+               if (status == 0) {
+                       fprintf(stderr, "EOF on netlink\n");
+                       return -1;
+               }
+               if (msg.msg_namelen != sizeof(nladdr)) {
+                       fprintf(stderr, "Sender address length == %d\n", 
+                       exit(1);
+               }
+               for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) {
+                       int err;
+                       int len = h->nlmsg_len;
+                       int l = len - sizeof(*h);
+                       if (l<0 || len>status) {
+                               if (msg.msg_flags & MSG_TRUNC) {
+                                       fprintf(stderr, "Truncated message\n");
+                                       return -1;
+                               }
+                               fprintf(stderr, "!!!malformed message: 
len=%d\n", len);
+                               exit(1);
+                       }
+                       err = handler(&nladdr, h, jarg);
+                       if (err < 0)
+                               return err;
+                       status -= NLMSG_ALIGN(len);
+                       h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len));
+               }
+               if (msg.msg_flags & MSG_TRUNC) {
+                       fprintf(stderr, "Message truncated\n");
+                       continue;
+               }
+               if (status) {
+                       fprintf(stderr, "!!!Remnant of size %d\n", status);
+                       exit(1);
+               }
+       }
+int rtnl_from_file(FILE *rtnl, rtnl_filter_t handler,
+                  void *jarg)
+       int status;
+       struct sockaddr_nl nladdr;
+       char   buf[8192];
+       struct nlmsghdr *h = (void*)buf;
+       memset(&nladdr, 0, sizeof(nladdr));
+       nladdr.nl_family = AF_NETLINK;
+       nladdr.nl_pid = 0;
+       nladdr.nl_groups = 0;
+       while (1) {
+               int err, len, type;
+               int l;
+               status = fread(&buf, 1, sizeof(*h), rtnl);
+               if (status < 0) {
+                       if (errno == EINTR)
+                               continue;
+                       perror("rtnl_from_file: fread");
+                       return -1;
+               }
+               if (status == 0)
+                       return 0;
+               len = h->nlmsg_len;
+               type= h->nlmsg_type;
+               l = len - sizeof(*h);
+               if (l<0 || len>sizeof(buf)) {
+                       fprintf(stderr, "!!!malformed message: len=%d @%lu\n",
+                               len, ftell(rtnl));
+                       return -1;
+               }
+               status = fread(NLMSG_DATA(h), 1, NLMSG_ALIGN(l), rtnl);
+               if (status < 0) {
+                       perror("rtnl_from_file: fread");
+                       return -1;
+               }
+               if (status < l) {
+                       fprintf(stderr, "rtnl-from_file: truncated message\n");
+                       return -1;
+               }
+               err = handler(&nladdr, h, jarg);
+               if (err < 0)
+                       return err;
+       }
+int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data)
+       int len = RTA_LENGTH(4);
+       struct rtattr *rta;
+       if (NLMSG_ALIGN(n->nlmsg_len) + len > maxlen) {
+               fprintf(stderr,"addattr32: Error! max allowed bound %d 
+               return -1;
+       }
+       rta = NLMSG_TAIL(n);
+       rta->rta_type = type;
+       rta->rta_len = len;
+       memcpy(RTA_DATA(rta), &data, 4);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + len;
+       return 0;
+int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void *data,
+             int alen)
+       int len = RTA_LENGTH(alen);
+       struct rtattr *rta;
+       if (NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len) > maxlen) {
+               fprintf(stderr, "addattr_l ERROR: message exceeded bound of 
+               return -1;
+       }
+       rta = NLMSG_TAIL(n);
+       rta->rta_type = type;
+       rta->rta_len = len;
+       memcpy(RTA_DATA(rta), data, alen);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len);
+       return 0;
+int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len)
+       if (NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len) > maxlen) {
+               fprintf(stderr, "addraw_l ERROR: message exceeded bound of 
+               return -1;
+       }
+       memcpy(NLMSG_TAIL(n), data, len);
+       memset((void *) NLMSG_TAIL(n) + len, 0, NLMSG_ALIGN(len) - len);
+       n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len);
+       return 0;
+int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data)
+       int len = RTA_LENGTH(4);
+       struct rtattr *subrta;
+       if (RTA_ALIGN(rta->rta_len) + len > maxlen) {
+               fprintf(stderr,"rta_addattr32: Error! max allowed bound %d 
+               return -1;
+       }
+       subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+       subrta->rta_type = type;
+       subrta->rta_len = len;
+       memcpy(RTA_DATA(subrta), &data, 4);
+       rta->rta_len = NLMSG_ALIGN(rta->rta_len) + len;
+       return 0;
+int rta_addattr_l(struct rtattr *rta, int maxlen, int type,
+                 const void *data, int alen)
+       struct rtattr *subrta;
+       int len = RTA_LENGTH(alen);
+       if (RTA_ALIGN(rta->rta_len) + RTA_ALIGN(len) > maxlen) {
+               fprintf(stderr,"rta_addattr_l: Error! max allowed bound %d 
+               return -1;
+       }
+       subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len));
+       subrta->rta_type = type;
+       subrta->rta_len = len;
+       memcpy(RTA_DATA(subrta), data, alen);
+       rta->rta_len = NLMSG_ALIGN(rta->rta_len) + RTA_ALIGN(len);
+       return 0;
+int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len)
+       memset(tb, 0, sizeof(struct rtattr *) * (max + 1));
+       while (RTA_OK(rta, len)) {
+               if (rta->rta_type <= max)
+                       tb[rta->rta_type] = rta;
+               rta = RTA_NEXT(rta,len);
+       }
+       if (len)
+               fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, 
+       return 0;
+int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr *rta, int 
+       int i = 0;
+       memset(tb, 0, sizeof(struct rtattr *) * max);
+       while (RTA_OK(rta, len)) {
+               if (rta->rta_type <= max && i < max)
+                       tb[i++] = rta;
+               rta = RTA_NEXT(rta,len);
+       }
+       if (len)
+               fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, 
+       return i;
diff -r 64599a2d310d -r ea0e302362bb 
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/netlink/libnetlink.h    Fri Nov 13 15:33:37 
2009 +0000
@@ -0,0 +1,58 @@
+#ifndef __LIBNETLINK_H__
+#define __LIBNETLINK_H__ 1
+#include <netinet/in.h>
+#include <asm/types.h>
+#include <linux/netlink.h>
+#include <linux/rtnetlink.h>
+struct rtnl_handle
+       int                     fd;
+       struct sockaddr_nl      local;
+       struct sockaddr_nl      peer;
+       __u32                   seq;
+       __u32                   dump;
+extern int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions);
+extern int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions, 
int protocol);
+extern void rtnl_close(struct rtnl_handle *rth);
+extern int rtnl_wilddump_request(struct rtnl_handle *rth, int fam, int type);
+extern int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int 
+typedef int (*rtnl_filter_t)(const struct sockaddr_nl *,
+                            struct nlmsghdr *n, void *);
+extern int rtnl_dump_filter(struct rtnl_handle *rth, rtnl_filter_t filter,
+                           void *arg1,
+                           rtnl_filter_t junk,
+                           void *arg2);
+extern int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer,
+                    unsigned groups, struct nlmsghdr *answer,
+                    rtnl_filter_t junk,
+                    void *jarg);
+extern int rtnl_send(struct rtnl_handle *rth, const char *buf, int);
+extern int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data);
+extern int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void 
*data, int alen);
+extern int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len);
+extern int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data);
+extern int rta_addattr_l(struct rtattr *rta, int maxlen, int type, const void 
*data, int alen);
+extern int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int 
+extern int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr 
*rta, int len);
+#define parse_rtattr_nested(tb, max, rta) \
+       (parse_rtattr((tb), (max), RTA_DATA(rta), RTA_PAYLOAD(rta)))
+extern int rtnl_listen(struct rtnl_handle *, rtnl_filter_t handler,
+                      void *jarg);
+extern int rtnl_from_file(FILE *, rtnl_filter_t handler,
+                      void *jarg);
+#define NLMSG_TAIL(nmsg) \
+       ((struct rtattr *) (((void *) (nmsg)) + NLMSG_ALIGN((nmsg)->nlmsg_len)))
+#endif /* __LIBNETLINK_H__ */
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/netlink/netlink.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/lowlevel/netlink/netlink.c       Fri Nov 13 15:33:37 
2009 +0000
@@ -0,0 +1,211 @@
+/* python binding to libnetlink */
+#include <Python.h>
+#include "libnetlink.h"
+#define PKG "xen.lowlevel.netlink"
+typedef struct {
+  PyObject_HEAD
+  int opened;
+  struct rtnl_handle rth;
+} PyRtnlObject;
+/* todo: subscriptions? */
+static PyObject* PyRtnl_new(PyTypeObject* type, PyObject* args,
+                            PyObject* kwargs)
+  return type->tp_alloc(type, 0);
+static int PyRtnl_init(PyObject* obj, PyObject* args, PyObject* kwargs)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  if (rtnl_open(&self->rth, 0) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not open rtnl handle");
+    return -1;
+  }
+  return 0;
+static void PyRtnl_dealloc(PyRtnlObject* obj)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  rtnl_close(&self->rth);
+static PyObject* pyrtnl_talk(PyObject* obj, PyObject* args)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  char* msg;
+  int len;
+  int peer = 0;
+  int groups = 0;
+  if (!PyArg_ParseTuple(args, "s#|ii", &msg, &len, &peer, &groups))
+    return NULL;
+  if (rtnl_talk(&self->rth, (struct nlmsghdr*)msg, peer, groups, NULL, NULL,
+                NULL) < 0)
+  {
+    PyErr_SetString(PyExc_IOError, "error sending message");
+    return NULL;
+  }
+static PyObject* pyrtnl_wilddump_request(PyObject* obj, PyObject* args)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  int family, type;
+  if (!PyArg_ParseTuple(args, "ii", &family, &type))
+    return NULL;
+  if (rtnl_wilddump_request(&self->rth, family, type) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not send dump request");
+    return NULL;
+  }
+static PyObject* pyrtnl_dump_request(PyObject* obj, PyObject* args)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  int type;
+  char* req;
+  int len;
+  if (!PyArg_ParseTuple(args, "is#", &type, &req, &len))
+    return NULL;
+  if (rtnl_dump_request(&self->rth, type, req, len) < 0) {
+    PyErr_SetString(PyExc_IOError, "could not send dump request");
+    return NULL;
+  }
+/* translate args to python and call python callback */
+static int dump_filter_helper(const struct sockaddr_nl *who,
+                              struct nlmsghdr *n, void *arg)
+  PyObject* filter = arg;
+  PyObject* args;
+  PyObject* result;
+  args = Py_BuildValue("s#s#", who, sizeof(*who), n, n->nlmsg_len);
+  result = PyObject_CallObject(filter, args);
+  Py_DECREF(args);
+  if (!result)
+    return -1;
+  /* result is ignored as long as an exception isn't raised */
+  Py_DECREF(result);
+  return 0;
+static PyObject* pyrtnl_dump_filter(PyObject* obj, PyObject* args)
+  PyRtnlObject* self = (PyRtnlObject*)obj;
+  PyObject *filter;
+  if (!PyArg_ParseTuple(args, "O:dump_filter", &filter))
+    return NULL;
+  if (!PyCallable_Check(filter)) {
+    PyErr_SetString(PyExc_TypeError, "parameter must be callable");
+    return NULL;
+  }
+  Py_INCREF(filter);
+  if (rtnl_dump_filter(&self->rth, dump_filter_helper, filter, NULL,
+                       NULL) < 0)
+  {
+    Py_DECREF(filter);
+    return NULL;
+  }
+  Py_DECREF(filter);
+static PyMethodDef PyRtnl_methods[] = {
+  { "talk", pyrtnl_talk, METH_VARARGS,
+    "send a message to rtnetlink and receive a response.\n" },
+  { "wilddump_request", pyrtnl_wilddump_request, METH_VARARGS,
+    "dump objects.\n" },
+  { "dump_request", pyrtnl_dump_request, METH_VARARGS,
+    "start a dump of a particular netlink type.\n" },
+  { "dump_filter", pyrtnl_dump_filter, METH_VARARGS,
+    "iterate over an rtnl dump.\n" },
+  { NULL }
+static PyTypeObject PyRtnlType = {
+  0,                          /* ob_size           */
+  PKG ".rtnl",                /* tp_name           */
+  sizeof(PyRtnlObject),       /* tp_basicsize      */
+  0,                          /* tp_itemsize       */
+  (destructor)PyRtnl_dealloc, /* tp_dealloc        */
+  NULL,                       /* tp_print          */
+  NULL,                       /* tp_getattr        */
+  NULL,                       /* tp_setattr        */
+  NULL,                       /* tp_compare        */
+  NULL,                       /* tp_repr           */
+  NULL,                       /* tp_as_number      */
+  NULL,                       /* tp_as_sequence    */
+  NULL,                       /* tp_as_mapping     */
+  NULL,                       /* tp_hash           */
+  NULL,                       /* tp_call           */
+  NULL,                       /* tp_str            */
+  NULL,                       /* tp_getattro       */
+  NULL,                       /* tp_setattro       */
+  NULL,                       /* tp_as_buffer      */
+  Py_TPFLAGS_DEFAULT,         /* tp_flags          */
+  "rtnetlink handle",         /* tp_doc            */
+  NULL,                       /* tp_traverse       */
+  NULL,                       /* tp_clear          */
+  NULL,                       /* tp_richcompare    */
+  0,                          /* tp_weaklistoffset */
+  NULL,                       /* tp_iter           */
+  NULL,                       /* tp_iternext       */
+  PyRtnl_methods,             /* tp_methods        */
+  NULL,                       /* tp_members        */
+  NULL,                       /* tp_getset         */
+  NULL,                       /* tp_base           */
+  NULL,                       /* tp_dict           */
+  NULL,                       /* tp_descr_get      */
+  NULL,                       /* tp_descr_set      */
+  0,                          /* tp_dictoffset     */
+  PyRtnl_init,                /* tp_init           */
+  NULL,                       /* tp_alloc          */
+  PyRtnl_new,                 /* tp_new            */
+static PyMethodDef methods[] = {
+  { NULL }
+static char doc[] = "libnetlink wrapper";
+PyMODINIT_FUNC initnetlink(void)
+  PyObject *mod;
+  if (PyType_Ready(&PyRtnlType) == -1)
+    return;
+  if (!(mod = Py_InitModule3(PKG, methods, doc)))
+    return;
+  Py_INCREF(&PyRtnlType);
+  PyModule_AddObject(mod, "rtnl", (PyObject *)&PyRtnlType);
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/blkdev.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/blkdev.py  Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,31 @@
+handlers = []
+class BlkDevException(Exception): pass
+class BlkDev(object):
+    "Object representing a VM block device"
+    def __init__(self, **props):
+        self.uname = ''
+        if 'dev' not in props:
+            raise BlkDevException('no device')
+        #if 'uname' not in props:
+            #raise BlkDevException('no uname')
+        if 'mode' not in props:
+            raise BlkDevException('no mode')
+        self.__dict__.update(props)
+        self.dev = props['dev'].rstrip(':disk')
+    def __str__(self):
+        return '%s,%s,%s' % (self.uname, self.dev, self.mode)
+def register(handler):
+    "register a block device class with parser"
+    if handler not in handlers:
+        handlers.insert(0, handler)
+def parse(props):
+    "turn a vm device dictionary into a blkdev object"
+    for handler in handlers:
+        if handler.handles(**props):
+            return handler(**props)
+    return BlkDev(**props)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/image.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/image.py   Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,227 @@
+# VM image file manipulation
+import logging, struct
+import vm
+SIGNATURE = 'LinuxGuestRecord'
+LONGLEN = struct.calcsize('L')
+INTLEN = struct.calcsize('i')
+PAGE_SIZE = 4096
+# ~0L
+P2M_EXT_SIG = 4294967295L
+# frames per page
+FPP = 1024
+LTAB_MASK = 0xf << 28
+BATCH_SIZE = 1024
+log = logging.getLogger()
+class VMParseException(Exception): pass
+class VMImage(object):
+    def __init__(self, img=None):
+        """img may be a path or a file object.
+        If compact is True, apply checkpoints to base image instead
+        of simply concatenating them.
+        """
+        self.img = img
+        self.dom = None
+        self.fd = None
+        self.header = None
+        self.nr_pfns = 0
+        # p2m extension header (unparsed)
+        self.p2mext = None
+        if self.img:
+            self.open(self.img)
+    def open(self, img):
+        if isinstance(img, str):
+            self.fd = file(img, 'rb')
+        else:
+            self.fd = img
+        self.readheader()
+    def readheader(self):
+        sig = self.fd.read(len(SIGNATURE))
+        if sig != SIGNATURE:
+            raise VMParseException("Bad signature in image")
+        hlen = self.fd.read(INTLEN)
+        hlen, = struct.unpack('!i', hlen)
+        self.header = self.fd.read(hlen)
+        self.dom = parseheader(self.header)
+    def readp2mfl(self):
+        "read the P2M frame list"
+        pfnlen = self.fd.read(LONGLEN)
+        self.nr_pfns, = struct.unpack('L', pfnlen)
+        p2m0 = self.fd.read(LONGLEN)
+        p2mhdr = p2m0
+        p2m0, = struct.unpack('L', p2m0)
+        if p2m0 == P2M_EXT_SIG:
+            elen = self.fd.read(INTLEN)
+            elen, = struct.unpack('I', elen)
+            self.p2mext = self.fd.read(elen)
+            p2m0 = self.fd.read(LONGLEN)
+            p2m0, = struct.unpack('L', p2m0)
+        p2mfl = [p2m0]
+        p2mfle = (self.nr_pfns + FPP - 1)/FPP - 1
+        p2ms = self.fd.read(LONGLEN * p2mfle)
+        p2mfl.extend(struct.unpack('%dL' % p2mfle, p2ms))
+        self.p2mfl = p2mfl
+    def flush(self):
+        self.ofd.write(self.tail)
+class Writer(object):
+    """compress a stream of checkpoints into a single image of the
+    last checkpoint"""
+    def __init__(self, fd, compact=False):
+        self.fd = fd
+        self.compact = compact
+        self.vm = None
+        self.tail = None
+        # offset to first batch of pages
+        self.imgstart = 0
+        # PFN mappings
+        self.pfns = []
+    def __del__(self):
+        self.close()
+    def writeheader(self):
+        hlen = struct.pack('!i', len(self.vm.header))
+        header = ''.join([SIGNATURE, hlen, self.vm.header])
+        self.fd.write(header)
+    def writep2mfl(self):
+        p2m = [struct.pack('L', self.vm.nr_pfns)]
+        if self.vm.p2mext:
+            p2m.extend([struct.pack('L', P2M_EXT_SIG), self.vm.p2mext])
+        p2m.append(struct.pack('%dL' % len(self.vm.p2mfl), *self.vm.p2mfl))
+        self.fd.write(''.join(p2m))
+    def writebatch(self, batch):
+        def offset(pfn):
+            isz = (pfn / BATCH_SIZE + 1) * IDXLEN
+            return self.imgstart + isz + pfn * PAGE_SIZE
+        if not self.compact:
+            return self.fd.write(batch)
+        batch = parsebatch(batch)
+        # sort pages for better disk seek behaviour
+        batch.sort(lambda x, y: cmp(x[0] & ~LTAB_MASK, y[0] & ~LTAB_MASK))
+        for pfndesc, page in batch:
+            pfn = pfndesc & ~LTAB_MASK
+            if pfn > self.vm.nr_pfns:
+                log.error('INVALID PFN: %d' % pfn)
+            if len(self.pfns) <= pfn:
+                self.pfns.extend([0] * (pfn - len(self.pfns) + 1))
+            self.pfns[pfn] = pfndesc
+            self.fd.seek(offset(pfn))
+            self.fd.write(page)
+        #print "max offset: %d, %d" % (len(self.pfns), offset(self.pfns[-1]))
+    def writeindex(self):
+        "Write batch header in front of each page"
+        hdrlen = INTLEN + BATCH_SIZE * LONGLEN
+        batches = (len(self.pfns) + BATCH_SIZE - 1) / BATCH_SIZE
+        for i in xrange(batches):
+            offset = self.imgstart + i * (hdrlen + (PAGE_SIZE * BATCH_SIZE))
+            pfnoff = i * BATCH_SIZE
+            # python auto-clamps overreads
+            pfns = self.pfns[pfnoff:pfnoff + BATCH_SIZE]
+            self.fd.seek(offset)
+            self.fd.write(struct.pack('i', len(pfns)))
+            self.fd.write(struct.pack('%dL' % len(pfns), *pfns))
+    def slurp(self, ifd):
+        """Apply an incremental checkpoint to a loaded image.
+        accepts a path or a file object."""
+        if isinstance(ifd, str):
+            ifd = file(ifd, 'rb')
+        if not self.vm:
+            self.vm = VMImage(ifd)
+            self.writeheader()
+            self.vm.readp2mfl()
+            self.writep2mfl()
+            self.imgstart = self.fd.tell()
+        while True:
+            l, batch = readbatch(ifd)
+            if l <= 0:
+                break
+            self.writebatch(batch)
+        self.tail = batch + ifd.read()
+    def flush(self):
+        if self.tail:
+            self.fd.seek(0, 2)
+            self.fd.write(self.tail)
+            if self.compact:
+                self.writeindex()
+        self.tail = None
+    def close(self):
+        self.flush()
+def parseheader(header):
+    "parses a header sexpression"
+    return vm.parsedominfo(vm.strtosxpr(header))
+def makeheader(dominfo):
+    "create an image header from a VM dominfo sxpr"
+    items = [SIGNATURE]
+    sxpr = vm.sxprtostr(dominfo)
+    items.append(struct.pack('!i', len(sxpr)))
+    items.append(sxpr)
+    return ''.join(items)
+def readbatch(fd):
+    batch = []
+    batchlen = fd.read(INTLEN)
+    batch.append(batchlen)
+    batchlen, = struct.unpack('i', batchlen)
+    log.info("batch length: %d" % batchlen)
+    if batchlen <= 0:
+        return (batchlen, batch[0])
+    batchfns = fd.read(LONGLEN * batchlen)
+    batch.append(batchfns)
+    pages = fd.read(PAGE_SIZE * batchlen)
+    if len(pages) != PAGE_SIZE * batchlen:
+        log.error('SHORT READ: %d' % len(pages))
+    batch.append(pages)
+    return (batchlen, ''.join(batch))
+def parsebatch(batch):
+    "parse a batch string into pages"
+    batchlen, batch = batch[:INTLEN], batch[INTLEN:]
+    batchlen, = struct.unpack('i', batchlen)
+    #print 'batch length: %d' % batchlen
+    pfnlen = batchlen * LONGLEN
+    pfns = struct.unpack('%dL' % batchlen, batch[:pfnlen])
+    pagebuf = batch[pfnlen:]
+    pages = [pagebuf[i*PAGE_SIZE:(i+1)*PAGE_SIZE] for i in xrange(batchlen)]
+    return zip(pfns, pages)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/netlink.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/netlink.py Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,314 @@
+# netlink wrappers
+import socket, struct
+import xen.lowlevel.netlink
+NETLINK_ROUTE          = 0
+NLM_F_REQUEST = 1 # It is request message.
+NLM_F_MULTI   = 2 # Multipart message, terminated by NLMSG_DONE
+NLM_F_ACK     = 4 # Reply with ack, with zero or error code
+NLM_F_ECHO    = 8 # Echo this request
+# Modifiers to GET request
+NLM_F_ROOT   = 0x100 # specify tree root
+NLM_F_MATCH  = 0x200 # return all matching
+NLM_F_ATOMIC = 0x400 # atomic GET
+# Modifiers to NEW request
+NLM_F_REPLACE = 0x100 # Override existing
+NLM_F_EXCL    = 0x200 # Do not touch, if it exists
+NLM_F_CREATE  = 0x400 # Create, if it does not exist
+NLM_F_APPEND  = 0x800 # Add to end of list
+IFLA_MTU       = 4
+IFLA_LINK      = 5
+IFLA_QDISC     = 6
+IFLA_STATS     = 7
+IFLA_COST      = 8
+IFLA_MASTER    = 10
+IFLA_TXQLEN    = 13
+IFLA_MAP       = 14
+IFLA_WEIGHT    = 15
+TCA_KIND    = 1
+TCA_STATS   = 3
+TCA_RATE    = 5
+TCA_FCNT    = 6
+TCA_STATS2  = 7
+class RTNLException(Exception): pass
+def align(l, alignto=4):
+    return (l + alignto - 1) & ~(alignto - 1)
+class rtattr(object):
+    "rtattribute"
+    fmt = "HH"
+    fmtlen = struct.calcsize(fmt)
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.rta_len = 0
+            self.rta_type = 0
+            self.body = ''
+    def __len__(self):
+        return align(self.rta_len)
+    def pack(self):
+        self.rta_len = align(self.fmtlen + len(self.body))
+        s = struct.pack(self.fmt, self.rta_len, self.rta_type) + self.body
+        pad = self.rta_len - len(s)
+        if pad:
+            s += '\0' * pad
+        return s
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.rta_len, self.rta_type = args
+        self.body = msg[align(self.fmtlen):self.rta_len]
+class rtattrlist(object):
+    def __init__(self, msg):
+        self.start = msg
+    def __iter__(self):
+        body = self.start
+        while len(body) > rtattr.fmtlen:
+            rta = rtattr(body)
+            yield rta
+            body = body[len(rta):]
+class nlmsg(object):
+    "netlink message header"
+    fmt = "IHHII"
+    fmtlen = struct.calcsize(fmt)
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.nlmsg_len = 0
+            self.nlmsg_type = 0
+            self.nlmsg_flags = 0
+            self.nlmsg_seq = 0
+            self.nlmsg_pid = 0
+            self.rta = ''
+            self.body = ''
+    def __len__(self):
+        return align(self.fmtlen + len(self.body) + len(self.rta))
+    def addattr(self, type, data):
+        attr = rtattr()
+        attr.rta_type = type
+        attr.body = data
+        self.rta += attr.pack()
+    def settype(self, cmd):
+        self.nlmsg_type = cmd
+    def pack(self):
+        return struct.pack(self.fmt, len(self), self.nlmsg_type,
+                           self.nlmsg_flags, self.nlmsg_seq,
+                           self.nlmsg_pid) + self.body + self.rta
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.nlmsg_len, self.nlmsg_type, self.nlmsg_flags = args[:3]
+        self.nlmsg_seq, self.nlmsg_pid = args[3:]
+        self.body = msg[align(self.fmtlen):]
+        self.rta = ''
+    def __str__(self):
+        return '<netlink message, len %d, type %d>' % \
+            (self.nlmsg_len, self.nlmsg_type)
+class ifinfomsg(object):
+    "interface info message"
+    fmt = "BxHiII"
+    fmtlen = struct.calcsize(fmt)
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.ifi_family = 0
+            self.ifi_type = 0
+            self.ifi_index = 0
+            self.ifi_flags = 0
+            self.ifi_change = 0
+            self.body = ''
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.ifi_family, self.ifi_type, self.ifi_index= args[:3]
+        self.ifi_flags, self.ifi_change = args[3:]
+        self.body = msg[align(self.fmtlen):]
+    def __str__(self):
+        return '<ifinfo message, family %d, type %d, index %d>' % \
+            (self.ifi_family, self.ifi_type, self.ifi_index)
+class tcmsg(object):
+    "TC message"
+    fmt = "BxxxiIII"
+    fmtlen = struct.calcsize(fmt)
+    def __init__(self, msg=None):
+        if msg:
+            self.unpack(msg)
+        else:
+            self.tcm_family = socket.AF_UNSPEC
+            self.tcm_ifindex = 0
+            self.tcm_handle = 0
+            self.tcm_parent = 0
+            self.tcm_info = 0
+            self.rta = ''
+    def unpack(self, msg):
+        args = struct.unpack(self.fmt, msg[:self.fmtlen])
+        self.tcm_family, self.tcm_ifindex, self.tcm_handle = args[:3]
+        self.tcm_parent, self.tcm_info = args[3:]
+        self.rta = msg[align(self.fmtlen):]
+    def pack(self):
+        return struct.pack(self.fmt, self.tcm_family, self.tcm_ifindex,
+                           self.tcm_handle, self.tcm_parent, self.tcm_info)
+    def __str__(self):
+        return '<tc message, family %d, index %d>' % \
+            (self.tcm_family, self.tcm_ifindex)
+class newlinkmsg(object):
+    def __init__(self, nlmsg):
+        if nlmsg.nlmsg_type != RTM_NEWLINK:
+            raise RTNLException("wrong message type")
+        self.nlmsg = nlmsg
+        self.ifi = ifinfomsg(self.nlmsg.body)
+        self.rtattrs = {}
+        for rta in rtattrlist(self.ifi.body):
+            self.rtattrs[rta.rta_type] = rta.body
+class newqdiscmsg(object):
+    def __init__(self, nlmsg):
+        if nlmsg.nlmsg_type != RTM_NEWQDISC:
+            raise RTNLException("wrong message type")
+        self.nlmsg = nlmsg
+        self.t = tcmsg(self.nlmsg.body)
+        self.rtattrs = {}
+        for rta in rtattrlist(self.t.rta):
+            self.rtattrs[rta.rta_type] = rta.body
+class rtnl(object):
+    def __init__(self):
+        self._rth = xen.lowlevel.netlink.rtnl()
+        self._linkcache = None
+    def getlink(self, key, cached=False):
+        """returns the interface object corresponding to the key, which
+        may be an index number or device name."""
+        if not cached:
+            self._linkcache = None
+        if self._linkcache is None:
+            self._linkcache = self.getlinks()
+        if isinstance(key, int):
+            return self._linkcache.get(key)
+        for k, v in self._linkcache.iteritems():
+            if v['name'] == key:
+                return v
+        return None
+    def getlinks(self):
+        """returns a dictionary of interfaces keyed by kernel
+        interface index"""
+        links = {}
+        def dumpfilter(addr, msgstr):
+            msg = newlinkmsg(nlmsg(msgstr))
+            idx = msg.ifi.ifi_index
+            ifname = msg.rtattrs[IFLA_IFNAME].strip('\0')
+            address = msg.rtattrs.get(IFLA_ADDRESS)
+            link = {'index': idx,
+                    'type': msg.ifi.ifi_type,
+                    'name': ifname,
+                    'address': address}
+            links[idx] = link
+        self._rth.wilddump_request(socket.AF_UNSPEC, RTM_GETLINK)
+        self._rth.dump_filter(dumpfilter)
+        return links
+    def getqdisc(self, dev):
+        """returns the queueing discipline on device dev, which may be
+        specified by kernel index or device name"""
+        qdiscs = self.getqdiscs(dev)
+        if qdiscs:
+            return qdiscs.values()[0]
+        return None
+    def getqdiscs(self, dev=None):
+        """returns a dictionary of queueing disciplines keyed by kernel
+        interface index"""
+        qdiscs = {}
+        def dumpfilter(addr, msgstr):
+            msg = newqdiscmsg(nlmsg(msgstr))
+            idx = msg.t.tcm_ifindex
+            handle = msg.t.tcm_handle
+            kind = msg.rtattrs[TCA_KIND].strip('\0')
+            opts = msg.rtattrs.get(TCA_OPTIONS)
+            qdisc = {'index': idx,
+                     'handle': handle,
+                     'kind': kind,
+                     'options': opts}
+            qdiscs[idx] = qdisc
+        tcm = tcmsg()
+        if dev:
+            link = self.getlink(dev)
+            if not link:
+                raise QdiscException('device %s not found' % dev)
+            tcm.tcm_ifindex = link['index']
+        msg = tcm.pack()
+        self._rth.dump_request(RTM_GETQDISC, msg)
+        self._rth.dump_filter(dumpfilter)
+        return qdiscs
+    def talk(self, req):
+        self._rth.talk(req)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/profile.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/profile.py Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,56 @@
+"""Simple profiling module
+import time
+class ProfileBlock(object):
+    """A section of code to be profiled"""
+    def __init__(self, name):
+        self.name = name
+    def enter(self):
+        print "PROF: entered %s at %f" % (self.name, time.time())
+    def exit(self):
+        print "PROF: exited %s at %f" % (self.name, time.time())
+class NullProfiler(object):
+    def enter(self, name):
+        pass
+    def exit(self, name=None):
+        pass
+class Profiler(object):
+    def __init__(self):
+        self.blocks = {}
+        self.running = []
+    def enter(self, name):
+        try:
+            block = self.blocks[name]
+        except KeyError:
+            block = ProfileBlock(name)
+            self.blocks[name] = block
+        block.enter()
+        self.running.append(block)
+    def exit(self, name=None):
+        if name is not None:
+            block = None
+            while self.running:
+                tmp = self.running.pop()
+                if tmp.name == name:
+                    block = tmp
+                    break
+                tmp.exit()
+            if not block:
+                raise KeyError('block %s not running' % name)
+        else:
+            try:
+                block = self.running.pop()
+            except IndexError:
+                raise KeyError('no block running')
+        block.exit()
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/qdisc.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/qdisc.py   Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,178 @@
+import socket, struct
+import netlink
+qdisc_kinds = {}
+class QdiscException(Exception): pass
+class request(object):
+    "qdisc request message"
+    def __init__(self, cmd, flags=0, dev=None, handle=0):
+        self.n = netlink.nlmsg()
+        self.t = netlink.tcmsg()
+        self.n.nlmsg_flags = netlink.NLM_F_REQUEST|flags
+        self.n.nlmsg_type = cmd
+        self.t.tcm_family = socket.AF_UNSPEC
+        if not handle:
+            handle = TC_H_ROOT
+        self.t.tcm_parent = handle
+        if dev:
+            self.t.tcm_ifindex = dev
+    def pack(self):
+        t = self.t.pack()
+        self.n.body = t
+        return self.n.pack()
+class addrequest(request):
+    def __init__(self, dev, handle, qdisc):
+        flags = netlink.NLM_F_EXCL|netlink.NLM_F_CREATE
+        super(addrequest, self).__init__(netlink.RTM_NEWQDISC, flags=flags,
+                                         dev=dev, handle=handle)
+        self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+        opts = qdisc.pack()
+        if opts:
+            self.n.addattr(netlink.TCA_OPTIONS, opts)
+class delrequest(request):
+    def __init__(self, dev, handle):
+        super(delrequest, self).__init__(netlink.RTM_DELQDISC, dev=dev,
+                                         handle=handle)
+class changerequest(request):
+    def __init__(self, dev, handle, qdisc):
+        super(changerequest, self).__init__(netlink.RTM_NEWQDISC,
+                                            dev=dev, handle=handle)
+        self.n.addattr(netlink.TCA_KIND, qdisc.kind)
+        opts = qdisc.pack()
+        if opts:
+            self.n.addattr(netlink.TCA_OPTIONS, opts)
+class Qdisc(object):
+    def __new__(cls, qdict=None, *args, **opts):
+        if qdict:
+            kind = qdict.get('kind')
+            cls = qdisc_kinds.get(kind, cls)
+        obj = super(Qdisc, cls).__new__(cls, qdict=qdict, *args, **opts)
+        return obj
+    def __init__(self, qdict):
+        self._qdict = qdict
+        self.kind = qdict['kind']
+        self.handle = qdict['handle'] >> 16
+    def parse(self, opts):
+        if opts:
+            raise QdiscException('cannot parse qdisc parameters')
+    def optstr(self):
+        if self.qdict['options']:
+            return '[cannot parse qdisc parameters]'
+        else:
+            return ''
+    def pack(self):
+        return ''
+class PrioQdisc(Qdisc):
+    fmt = 'i%sB' % (TC_PRIO_MAX + 1)
+    def __init__(self, qdict):
+        super(PrioQdisc, self).__init__(qdict)
+        if qdict.get('options'):
+            self.unpack(qdict['options'])
+        else:
+            self.bands = 3
+            self.priomap = [1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]
+    def pack(self):
+        #return struct.pack(self.fmt, self.bands, *self.priomap)
+        return ''
+    def unpack(self, opts):
+        args = struct.unpack(self.fmt, opts)
+        self.bands = args[0]
+        self.priomap = args[1:]
+    def optstr(self):
+        mapstr = ' '.join([str(p) for p in self.priomap])
+        return 'bands %d priomap  %s' % (self.bands, mapstr)
+qdisc_kinds['prio'] = PrioQdisc
+qdisc_kinds['pfifo_fast'] = PrioQdisc
+class CfifoQdisc(Qdisc):
+    fmt = 'II'
+    def __init__(self, qdict):
+        super(CfifoQdisc, self).__init__(qdict)
+        if qdict.get('options'):
+            self.unpack(qdict['options'])
+        else:
+            self.epoch = 0
+            self.vmid = 0
+    def pack(self):
+        return struct.pack(self.fmt, self.epoch, self.vmid)
+    def unpack(self, opts):
+        self.epoch, self.vmid = struct.unpack(self.fmt, opts)
+    def parse(self, opts):
+        args = list(opts)
+        try:
+            while args:
+                arg = args.pop(0)
+                if arg == 'epoch':
+                    self.epoch = int(args.pop(0))
+                    continue
+                if arg.lower() == 'vmid':
+                    self.vmid = int(args.pop(0))
+                    continue
+        except Exception, inst:
+            raise QdiscException(str(inst))
+    def optstr(self):
+        return 'epoch %d vmID %d' % (self.epoch, self.vmid)
+qdisc_kinds['cfifo'] = CfifoQdisc
+class QueueQdisc(Qdisc):
+    fmt = 'I'
+    def __init__(self, qdict=None):
+        if not qdict:
+            qdict = {'kind': 'queue',
+                     'handle': TC_H_ROOT}
+        super(QueueQdisc, self).__init__(qdict)
+        self.action = 0
+    def pack(self):
+        return struct.pack(self.fmt, self.action)
+    def parse(self, args):
+        if not args:
+            raise QdiscException('no action given')
+        arg = args[0]
+        if arg == 'checkpoint':
+            self.action = TC_QUEUE_CHECKPOINT
+        elif arg == 'release':
+            self.action = TC_QUEUE_RELEASE
+        else:
+            raise QdiscException('unknown action')
+qdisc_kinds['queue'] = QueueQdisc
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/save.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/save.py    Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+import os, select, socket, threading, time, signal, xmlrpclib
+from xen.xend.XendClient import server
+from xen.xend.xenstore.xswatch import xswatch
+import xen.lowlevel.xc
+from xen.xend.xenstore import xsutil
+xc = xen.lowlevel.xc.xc()
+import xen.lowlevel.checkpoint
+import vm, image
+XCFLAGS_LIVE =      1
+xcsave = '/usr/lib/xen/bin/xc_save'
+class _proxy(object):
+    "proxy simulates an object without inheritance"
+    def __init__(self, obj):
+        self._obj = obj
+    def __getattr__(self, name):
+        return getattr(self._obj, name)
+    def proxy(self, obj):
+        self._obj = obj
+class CheckpointError(Exception): pass
+class CheckpointingFile(_proxy):
+    """Tee writes into separate file objects for each round.
+    This is necessary because xc_save gets a single file descriptor
+    for the duration of checkpointing.
+    """
+    def __init__(self, path):
+        self.path = path
+        self.round = 0
+        self.rfd, self.wfd = os.pipe()
+        self.fd = file(path, 'wb')
+        # this pipe is used to notify the writer thread of checkpoints
+        self.cprfd, self.cpwfd = os.pipe()
+        super(CheckpointingFile, self).__init__(self.fd)
+        wt = threading.Thread(target=self._wrthread, name='disk-write-thread')
+        wt.setDaemon(True)
+        wt.start()
+        self.wt = wt
+    def fileno(self):
+        return self.wfd
+    def close(self):
+        os.close(self.wfd)
+        # closing wfd should signal writer to stop
+        self.wt.join()
+        os.close(self.rfd)
+        os.close(self.cprfd)
+        os.close(self.cpwfd)
+        self.fd.close()
+        self.wt = None
+    def checkpoint(self):
+        os.write(self.cpwfd, '1')
+    def _wrthread(self):
+        while True:
+            r, o, e = select.select((self.rfd, self.cprfd), (), ())
+            if self.rfd in r:
+                data = os.read(self.rfd, 256 * 1024)
+                if not data:
+                    break
+                self.fd.write(data)
+            if self.cprfd in r:
+                junk = os.read(self.cprfd, 1)
+                self.round += 1
+                self.fd = file('%s.%d' % (self.path, self.round), 'wb')
+                self.proxy(self.fd)
+class MigrationSocket(_proxy):
+    def __init__(self, address):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.connect(address)
+        sock.send("receive\n")
+        sock.recv(80)
+        fd = os.fdopen(sock.fileno(), 'w+')
+        self.sock = sock
+        super(MigrationSocket, self).__init__(fd)
+class Keepalive(object):
+    "Call a keepalive method at intervals"
+    def __init__(self, method, interval=0.1):
+        self.keepalive = method
+        self.interval = interval
+        self.thread = None
+        self.running = False
+    def start(self):
+        if not self.interval:
+            return
+        self.thread = threading.Thread(target=self.run, 
+        self.thread.setDaemon(True)
+        self.running = True
+        self.thread.start()
+    def stop(self):
+        if not self.thread:
+            return
+        self.running = False
+        self.thread.join()
+        self.thread = None
+    def run(self):
+        while self.running:
+            self.keepalive()
+            time.sleep(self.interval)
+        self.keepalive(stop=True)
+class Saver(object):
+    def __init__(self, domid, fd, suspendcb=None, resumecb=None,
+                 checkpointcb=None, interval=0):
+        """Create a Saver object for taking guest checkpoints.
+        domid:        name, number or UUID of a running domain
+        fd:           a stream to which checkpoint data will be written.
+        suspendcb:    callback invoked after guest is suspended
+        resumecb:     callback invoked before guest resumes
+        checkpointcb: callback invoked when a checkpoint is complete. Return
+                      True to take another checkpoint, or False to stop.
+        """
+        self.fd = fd
+        self.suspendcb = suspendcb
+        self.resumecb = resumecb
+        self.checkpointcb = checkpointcb
+        self.interval = interval
+        self.vm = vm.VM(domid)
+        self.checkpointer = None
+    def start(self):
+        vm.getshadowmem(self.vm)
+        hdr = image.makeheader(self.vm.dominfo)
+        self.fd.write(hdr)
+        self.fd.flush()
+        self.checkpointer = xen.lowlevel.checkpoint.checkpointer()
+        try:
+            self.checkpointer.open(self.vm.domid)
+            self.checkpointer.start(self.fd, self.suspendcb, self.resumecb,
+                                    self.checkpointcb, self.interval)
+            self.checkpointer.close()
+        except xen.lowlevel.checkpoint.error, e:
+            raise CheckpointError(e)
+    def _resume(self):
+        """low-overhead version of XendDomainInfo.resumeDomain"""
+        # TODO: currently assumes SUSPEND_CANCEL is available
+        if True:
+            xc.domain_resume(self.vm.domid, 1)
+            xsutil.ResumeDomain(self.vm.domid)
+        else:
+            server.xend.domain.resumeDomain(self.vm.domid)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/tapdisk.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/tapdisk.py Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,4 @@
+import blkdev
+class TapDisk(BlkDev):
+    pass
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/util.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/util.py    Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,31 @@
+# utility functions
+import os, subprocess
+class PipeException(Exception):
+    def __init__(self, message, errno):
+        self.errno = errno
+        message = '%s: %d, %s' % (message, errno, os.strerror(errno))
+        Exception.__init__(self, message)
+def canonifymac(mac):
+    return ':'.join(['%02x' % int(field, 16) for field in mac.split(':')])
+def runcmd(args, cwd=None):
+    # TODO: stdin handling
+    if type(args) == str:
+        args = args.split(' ')
+    try:
+        proc = subprocess.Popen(args, stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE, close_fds=True,
+                                cwd=cwd)
+        stdout = proc.stdout.read()
+        stderr = proc.stderr.read()
+        proc.wait()
+        if proc.returncode:
+            print ' '.join(args)
+            print stderr.strip()
+            raise PipeException('%s failed' % args[0], proc.returncode)
+        return stdout
+    except (OSError, IOError), inst:
+        raise PipeException('could not run %s' % args[0], inst.errno)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vbd.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/vbd.py     Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,9 @@
+import blkdev
+class VBD(blkdev.BlkDev):
+    def handles(self, **props):
+        uname = props.get('uname', '')
+        return uname.startswith('phy:')
+    handles = classmethod(handles)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vdi.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/vdi.py     Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,121 @@
+#code to play with vdis and snapshots
+import os
+def run(cmd):
+    fd = os.popen(cmd)
+    res = [l for l in fd if l.rstrip()]
+    return not fd.close(), res
+_blockstore = '/blockstore.dat'
+def set_blockstore(blockstore):
+    global _blockstore
+    __blockstore = blockstore
+class SnapShot:
+    def __init__(self, vdi, block, index):
+       self.__vdi = vdi
+       self.__block = block
+       self.__index = index
+       #TODO add snapshot date and radix
+    def __str__(self):
+       return '%d %d %d' % (self.__vdi.id(), self.__block, self.__index)
+    def vdi(self):
+       return self.__vdi
+    def block(self):
+       return self.__block
+    def index(self):
+       return self.__index
+    def match(self, block, index):
+       return self.__block == block and self.__index == index
+class VDIException(Exception):
+       pass
+class VDI:
+    def __init__(self, id, name):
+       self.__id = id
+       self.__name = name
+    def __str__(self):
+       return 'vdi: %d %s' % (self.__id, self.__name)
+    def id(self):
+       return self.__id
+    def name(self):
+       return self.__name
+    def list_snapshots(self):
+       res, ls = run('vdi_snap_list %s %d' % (_blockstore, self.__id))
+       if res:
+           return [SnapShot(self, int(l[0]), int(l[1])) for l in [l.split() 
for l in ls[1:]]]
+       else:
+           raise VDIException("Error reading snapshot list")
+    def snapshot(self):
+       res, ls = run('vdi_checkpoint %s %d' % (_blockstore, self.__id))
+       if res:
+           _, block, idx = ls[0].split()
+           return SnapShot(self, int(block), int(idx))
+       else:
+           raise VDIException("Error taking vdi snapshot")
+def create(name, snap):
+    res, _ = run('vdi_create %s %s %d %d'
+                % (_blockstore, name, snap.block(), snap.index()))
+    if res:
+       return lookup_by_name(name)
+    else:
+       raise VDIException('Unable to create vdi from snapshot')
+def fill(name, img_file):
+    res, _ = run('vdi_create %s %s' % (_blockstore, name))
+    if res:
+       vdi = lookup_by_name(name)
+       res, _ = run('vdi_fill %d %s' % (vdi.id(), img_file))
+       if res:
+           return vdi
+    raise VDIException('Unable to create vdi from disk img file')
+def list_vdis():
+    vdis = []
+    res, lines = run('vdi_list %s' % _blockstore)
+    if res:
+       for l in lines:
+           r = l.split()
+           vdis.append(VDI(int(r[0]), r[1]))
+       return vdis
+    else:
+       raise VDIException("Error doing vdi list")
+def lookup_by_id(id):
+    vdis = list_vdis()
+    for v in vdis:
+       if v.id() == id:
+           return v
+    raise VDIException("No match from vdi id")
+def lookup_by_name(name):
+    vdis = list_vdis()
+    for v in vdis:
+       if v.name() == name:
+           return v
+    raise VDIException("No match for vdi name")
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vif.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/vif.py     Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,14 @@
+from xen.remus.util import canonifymac
+class VIF(object):
+    def __init__(self, **props):
+        self.__dict__.update(props)
+        if 'mac' in props:
+            self.mac = canonifymac(props['mac'])
+    def __str__(self):
+        return self.mac
+def parse(props):
+    "turn a vm device dictionary into a vif object"
+    return VIF(**props)
diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vm.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/vm.py      Fri Nov 13 15:33:37 2009 +0000
@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+import xmlrpclib
+from xen.xend.XendClient import server
+from xen.xend import sxp
+# XXX XendDomain is voodoo to let balloon import succeed
+from xen.xend import XendDomain, balloon
+import vif
+import blkdev
+# need a nicer way to load disk drivers
+import vbd
+class VMException(Exception): pass
+class VM(object):
+    "Representation of a virtual machine"
+    def __init__(self, domid=None, dominfo=None):
+        self.dominfo = dominfo
+        self.domid = -1
+        self.name = 'unknown'
+        self.dom = {}
+        self.disks = []
+        self.vifs = []
+        if domid:
+            try:
+                self.dominfo = server.xend.domain(domid, 'all')
+            except xmlrpclib.Fault:
+                raise VMException('error looking up domain %s' % str(domid))
+        if self.dominfo:
+            self.loaddominfo()
+    def loaddominfo(self):
+        self.dom = parsedominfo(self.dominfo)
+        self.domid = self.dom['domid']
+        self.name = self.dom['name']
+        self.disks = getdisks(self.dom)
+        self.vifs = getvifs(self.dom)
+    def __str__(self):
+        return 'VM %d (%s), MACs: [%s], disks: [%s]' % \
+               (self.domid, self.name, self.epoch, ', '.join(self.macs),
+                ', '.join([str(d) for d in self.disks]))
+def parsedominfo(dominfo):
+    "parses a dominfo sexpression in the form of python lists of lists"
+    def s2d(s):
+        r = {}
+        for elem in s:
+            if len(elem) == 0:
+                continue
+            name = elem[0]
+            if len(elem) == 1:
+                val = None
+            else:
+                val = elem[1]
+            if isinstance(val, list):
+                val = s2d(elem[1:])
+            if isinstance(name, list):
+                # hack for ['cpus', [[1]]]
+                return s2d(elem)
+            if name in r:
+                for k, v in val.iteritems():
+                    if k in r[name]:
+                        if not isinstance(r[name][k], list):
+                            r[name][k] = [r[name][k]]
+                        r[name][k].append(v)
+                    else:
+                        r[name][k] = v
+            else:
+                r[name] = val
+        return r
+    return s2d(dominfo[1:])
+def domtosxpr(dom):
+    "convert a dominfo into a python sxpr"
+    def d2s(d):
+        r = []
+        for k, v in d.iteritems():
+            elem = [k]
+            if isinstance(v, dict):
+                elem.extend(d2s(v))
+            else:
+                if v is None:
+                    v = ''
+                elem.append(v)
+            r.append(elem)
+        return r
+    sxpr = ['domain']
+    sxpr.extend(d2s(dom))
+    return sxpr
+def strtosxpr(s):
+    "convert a string to a python sxpr"
+    p = sxp.Parser()
+    p.input(s)
+    return p.get_val()
+def sxprtostr(sxpr):
+    "convert an sxpr to string"
+    return sxp.to_string(sxpr)
+def getvifs(dom):
+    "return vif objects for devices in dom"
+    vifs = dom['device'].get('vif', [])
+    if type(vifs) != list:
+        vifs = [vifs]
+    return [vif.parse(v) for v in vifs]
+def getdisks(dom):
+    "return block device objects for devices in dom"
+    disks = dom['device'].get('vbd', [])
+    if type(disks) != list:
+        disks = [disks]
+    # tapdisk1 devices
+    tap1s = dom['device'].get('tap', [])
+    if type(tap1s) != list:
+        disks.append(tap1s)
+    else:
+        disks.extend(tap1s)
+    # tapdisk2 devices
+    tap2s = dom['device'].get('tap2', [])
+    if type(tap2s) != list:
+        disks.append(tap2s)
+    else:
+        disks.extend(tap2s)
+    return [blkdev.parse(disk) for disk in disks]
+def fromxend(domid):
+    "create a VM object from xend information"
+    return VM(domid)
+def getshadowmem(vm):
+    "Balloon down domain0 to create free memory for shadow paging."
+    maxmem = int(vm.dom['maxmem'])
+    shadow = int(vm.dom['shadow_memory'])
+    vcpus = int(vm.dom['vcpus'])
+    # from XendDomainInfo.checkLiveMigrateMemory:
+    # 1MB per vcpu plus 4Kib/Mib of RAM.  This is higher than
+    # the minimum that Xen would allocate if no value were given.
+    needed = vcpus * 1024 + maxmem * 4 - shadow * 1024
+    if needed > 0:
+        print "Freeing %d kB for shadow mode" % needed
+        balloon.free(needed, vm.dominfo)

Xen-changelog mailing list



Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.