[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] [xen-unstable] Remus: add python control extensions
# HG changeset patch # User Keir Fraser <keir.fraser@xxxxxxxxxx> # Date 1258126417 0 # Node ID ea0e302362bb49c679e203bc7f0d8c9165c6f9d9 # Parent 64599a2d310d9d7b7cd48fcb09393f47f1d26026 Remus: add python control extensions Signed-off-by: Brendan Cully <brendan@xxxxxxxxx> --- tools/python/setup.py | 25 tools/python/xen/lowlevel/checkpoint/checkpoint.c | 363 ++++++++ tools/python/xen/lowlevel/checkpoint/checkpoint.h | 59 + tools/python/xen/lowlevel/checkpoint/libcheckpoint.c | 782 +++++++++++++++++++ tools/python/xen/lowlevel/netlink/libnetlink.c | 585 ++++++++++++++ tools/python/xen/lowlevel/netlink/libnetlink.h | 58 + tools/python/xen/lowlevel/netlink/netlink.c | 211 +++++ tools/python/xen/remus/blkdev.py | 31 tools/python/xen/remus/image.py | 227 +++++ tools/python/xen/remus/netlink.py | 314 +++++++ tools/python/xen/remus/profile.py | 56 + tools/python/xen/remus/qdisc.py | 178 ++++ tools/python/xen/remus/save.py | 172 ++++ tools/python/xen/remus/tapdisk.py | 4 tools/python/xen/remus/util.py | 31 tools/python/xen/remus/vbd.py | 9 tools/python/xen/remus/vdi.py | 121 ++ tools/python/xen/remus/vif.py | 14 tools/python/xen/remus/vm.py | 156 +++ 19 files changed, 3393 insertions(+), 3 deletions(-) diff -r 64599a2d310d -r ea0e302362bb tools/python/setup.py --- a/tools/python/setup.py Fri Nov 13 15:31:45 2009 +0000 +++ b/tools/python/setup.py Fri Nov 13 15:33:37 2009 +0000 @@ -67,10 +67,28 @@ ptsname = Extension("ptsname", libraries = libraries, sources = [ "ptsname/ptsname.c" ]) +checkpoint = Extension("checkpoint", + extra_compile_args = extra_compile_args, + include_dirs = include_dirs, + library_dirs = library_dirs, + libraries = libraries + [ "rt" ], + sources = [ "xen/lowlevel/checkpoint/checkpoint.c", + "xen/lowlevel/checkpoint/libcheckpoint.c"]) + +netlink = Extension("netlink", + extra_compile_args = extra_compile_args, + include_dirs = include_dirs, + library_dirs = library_dirs, + libraries = libraries, + sources = [ "xen/lowlevel/netlink/netlink.c", + "xen/lowlevel/netlink/libnetlink.c"]) + modules = [ xc, xs, ptsname, acm, flask ] -if os.uname()[0] == 'SunOS': - modules.append(scf) - modules.append(process) +plat = os.uname()[0] +if plat == 'SunOS': + modules.extend([ scf, process ]) +if plat == 'Linux': + modules.extend([ checkpoint, netlink ]) setup(name = 'xen', version = '3.0', @@ -89,6 +107,7 @@ setup(name = 'xen', 'xen.web', 'xen.sv', 'xen.xsview', + 'xen.remus', 'xen.xend.tests', 'xen.xend.server.tests', diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/checkpoint/checkpoint.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.c Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,363 @@ +/* python bridge to checkpointing API */ + +#include <Python.h> + +#include <xs.h> +#include <xenctrl.h> + +#include "checkpoint.h" + +#define PKG "xen.lowlevel.checkpoint" + +static PyObject* CheckpointError; + +typedef struct { + PyObject_HEAD + checkpoint_state cps; + + /* milliseconds between checkpoints */ + unsigned int interval; + int armed; + + PyObject* suspend_cb; + PyObject* postcopy_cb; + PyObject* checkpoint_cb; + + PyThreadState* threadstate; +} CheckpointObject; + +static int suspend_trampoline(void* data); +static int postcopy_trampoline(void* data); +static int checkpoint_trampoline(void* data); + +static PyObject* Checkpoint_new(PyTypeObject* type, PyObject* args, + PyObject* kwargs) +{ + CheckpointObject* self = (CheckpointObject*)type->tp_alloc(type, 0); + + if (!self) + return NULL; + + checkpoint_init(&self->cps); + self->suspend_cb = NULL; + self->armed = 0; + + return (PyObject*)self; +} + +static int Checkpoint_init(PyObject* obj, PyObject* args, PyObject* kwargs) +{ + return 0; +} + +static void Checkpoint_dealloc(CheckpointObject* self) +{ + checkpoint_close(&self->cps); + + self->ob_type->tp_free((PyObject*)self); +} + +static PyObject* pycheckpoint_open(PyObject* obj, PyObject* args) +{ + CheckpointObject* self = (CheckpointObject*)obj; + checkpoint_state* cps = &self->cps; + unsigned int domid; + + if (!PyArg_ParseTuple(args, "I", &domid)) + return NULL; + + if (checkpoint_open(cps, domid) < 0) { + PyErr_SetString(CheckpointError, checkpoint_error(cps)); + + return NULL; + } + + Py_RETURN_NONE; +} + +static PyObject* pycheckpoint_close(PyObject* obj, PyObject* args) +{ + CheckpointObject* self = (CheckpointObject*)obj; + + checkpoint_close(&self->cps); + + Py_XDECREF(self->suspend_cb); + self->suspend_cb = NULL; + Py_XDECREF(self->postcopy_cb); + self->postcopy_cb = NULL; + Py_XDECREF(self->checkpoint_cb); + self->checkpoint_cb = NULL; + + Py_RETURN_NONE; +} + +static PyObject* pycheckpoint_start(PyObject* obj, PyObject* args) { + CheckpointObject* self = (CheckpointObject*)obj; + + PyObject* iofile; + PyObject* suspend_cb = NULL; + PyObject* postcopy_cb = NULL; + PyObject* checkpoint_cb = NULL; + unsigned int interval = 0; + + int fd; + struct save_callbacks callbacks; + int rc; + + if (!PyArg_ParseTuple(args, "O|OOOI", &iofile, &suspend_cb, &postcopy_cb, + &checkpoint_cb, &interval)) + return NULL; + + self->interval = interval; + + Py_INCREF(iofile); + Py_XINCREF(suspend_cb); + Py_XINCREF(postcopy_cb); + Py_XINCREF(checkpoint_cb); + + fd = PyObject_AsFileDescriptor(iofile); + Py_DECREF(iofile); + if (fd < 0) { + PyErr_SetString(PyExc_TypeError, "invalid file handle"); + return NULL; + } + + if (suspend_cb && suspend_cb != Py_None) { + if (!PyCallable_Check(suspend_cb)) { + PyErr_SetString(PyExc_TypeError, "suspend callback not callable"); + goto err; + } + self->suspend_cb = suspend_cb; + } else + self->suspend_cb = NULL; + + if (postcopy_cb && postcopy_cb != Py_None) { + if (!PyCallable_Check(postcopy_cb)) { + PyErr_SetString(PyExc_TypeError, "postcopy callback not callable"); + return NULL; + } + self->postcopy_cb = postcopy_cb; + } else + self->postcopy_cb = NULL; + + if (checkpoint_cb && checkpoint_cb != Py_None) { + if (!PyCallable_Check(checkpoint_cb)) { + PyErr_SetString(PyExc_TypeError, "checkpoint callback not callable"); + return NULL; + } + self->checkpoint_cb = checkpoint_cb; + } else + self->checkpoint_cb = NULL; + + callbacks.suspend = suspend_trampoline; + callbacks.postcopy = postcopy_trampoline; + callbacks.checkpoint = checkpoint_trampoline; + callbacks.data = self; + + self->threadstate = PyEval_SaveThread(); + rc = checkpoint_start(&self->cps, fd, &callbacks); + PyEval_RestoreThread(self->threadstate); + + if (rc < 0) { + PyErr_SetString(CheckpointError, checkpoint_error(&self->cps)); + goto err; + } + + Py_RETURN_NONE; + + err: + self->suspend_cb = NULL; + Py_XDECREF(suspend_cb); + self->postcopy_cb = NULL; + Py_XDECREF(postcopy_cb); + self->checkpoint_cb = NULL; + Py_XDECREF(checkpoint_cb); + + return NULL; +} + +static PyMethodDef Checkpoint_methods[] = { + { "open", pycheckpoint_open, METH_VARARGS, + "open connection to xen" }, + { "close", pycheckpoint_close, METH_NOARGS, + "close connection to xen" }, + { "start", pycheckpoint_start, METH_VARARGS | METH_KEYWORDS, + "begin a checkpoint" }, + { NULL, NULL, 0, NULL } +}; + +static PyTypeObject CheckpointType = { + PyObject_HEAD_INIT(NULL) + 0, /* ob_size */ + PKG ".checkpointer", /* tp_name */ + sizeof(CheckpointObject), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)Checkpoint_dealloc, /* tp_dealloc */ + NULL, /* tp_print */ + NULL, /* tp_getattr */ + NULL, /* tp_setattr */ + NULL, /* tp_compare */ + NULL, /* tp_repr */ + NULL, /* tp_as_number */ + NULL, /* tp_as_sequence */ + NULL, /* tp_as_mapping */ + NULL, /* tp_hash */ + NULL, /* tp_call */ + NULL, /* tp_str */ + NULL, /* tp_getattro */ + NULL, /* tp_setattro */ + NULL, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "Checkpoint object", /* tp_doc */ + NULL, /* tp_traverse */ + NULL, /* tp_clear */ + NULL, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + NULL, /* tp_iter */ + NULL, /* tp_iternext */ + Checkpoint_methods, /* tp_methods */ + NULL, /* tp_members */ + NULL, /* tp_getset */ + NULL, /* tp_base */ + NULL, /* tp_dict */ + NULL, /* tp_descr_get */ + NULL, /* tp_descr_set */ + 0, /* tp_dictoffset */ + (initproc)Checkpoint_init, /* tp_init */ + NULL, /* tp_alloc */ + Checkpoint_new, /* tp_new */ +}; + +static PyMethodDef methods[] = { + { NULL } +}; + +static char doc[] = "checkpoint API"; + +PyMODINIT_FUNC initcheckpoint(void) { + PyObject *m; + + if (PyType_Ready(&CheckpointType) < 0) + return; + + m = Py_InitModule3(PKG, methods, doc); + + if (!m) + return; + + Py_INCREF(&CheckpointType); + PyModule_AddObject(m, "checkpointer", (PyObject*)&CheckpointType); + + CheckpointError = PyErr_NewException(PKG ".error", NULL, NULL); + Py_INCREF(CheckpointError); + PyModule_AddObject(m, "error", CheckpointError); + + block_timer(); +} + +/* private functions */ + +/* bounce C suspend call into python equivalent. + * returns 1 on success or 0 on failure */ +static int suspend_trampoline(void* data) +{ + CheckpointObject* self = (CheckpointObject*)data; + + PyObject* result; + + /* call default suspend function, then python hook if available */ + if (self->armed) { + if (checkpoint_wait(&self->cps) < 0) { + fprintf(stderr, "%s\n", checkpoint_error(&self->cps)); + return 0; + } + } else { + if (self->interval) { + self->armed = 1; + checkpoint_settimer(&self->cps, self->interval); + } + + if (!checkpoint_suspend(&self->cps)) { + fprintf(stderr, "%s\n", checkpoint_error(&self->cps)); + return 0; + } + } + + if (!self->suspend_cb) + return 1; + + PyEval_RestoreThread(self->threadstate); + result = PyObject_CallFunction(self->suspend_cb, NULL); + self->threadstate = PyEval_SaveThread(); + + if (!result) + return 0; + + if (result == Py_None || PyObject_IsTrue(result)) { + Py_DECREF(result); + return 1; + } + + Py_DECREF(result); + + return 0; +} + +static int postcopy_trampoline(void* data) +{ + CheckpointObject* self = (CheckpointObject*)data; + + PyObject* result; + int rc = 0; + + if (!self->postcopy_cb) + goto resume; + + PyEval_RestoreThread(self->threadstate); + result = PyObject_CallFunction(self->postcopy_cb, NULL); + + if (result && (result == Py_None || PyObject_IsTrue(result))) + rc = 1; + + Py_XDECREF(result); + self->threadstate = PyEval_SaveThread(); + + resume: + if (checkpoint_resume(&self->cps) < 0) { + fprintf(stderr, "%s\n", checkpoint_error(&self->cps)); + return 0; + } + + return rc; +} + +static int checkpoint_trampoline(void* data) +{ + CheckpointObject* self = (CheckpointObject*)data; + + PyObject* result; + + if (checkpoint_postflush(&self->cps) < 0) { + fprintf(stderr, "%s\n", checkpoint_error(&self->cps)); + return -1; + } + + if (!self->checkpoint_cb) + return 0; + + PyEval_RestoreThread(self->threadstate); + result = PyObject_CallFunction(self->checkpoint_cb, NULL); + self->threadstate = PyEval_SaveThread(); + + if (!result) + return 0; + + if (result == Py_None || PyObject_IsTrue(result)) { + Py_DECREF(result); + return 1; + } + + Py_DECREF(result); + + return 0; +} diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/checkpoint/checkpoint.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/checkpoint/checkpoint.h Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,59 @@ +/* API for checkpointing */ + +#ifndef _CHECKPOINT_H_ +#define _CHECKPOINT_H_ 1 + +#include <pthread.h> +#include <semaphore.h> +#include <time.h> + +#include <xenguest.h> +#include <xs.h> + +typedef enum { + dt_unknown, + dt_pv, + dt_hvm, + dt_pvhvm /* HVM with PV drivers */ +} checkpoint_domtype; + +typedef struct { + int xch; /* xc handle */ + int xce; /* event channel handle */ + struct xs_handle* xsh; /* xenstore handle */ + int watching_shutdown; /* state of watch on @releaseDomain */ + + unsigned int domid; + checkpoint_domtype domtype; + int fd; + + int suspend_evtchn; + + char* errstr; + + /* suspend deadline thread support */ + volatile int suspended; + volatile int done; + pthread_t suspend_thr; + sem_t suspended_sem; + sem_t resumed_sem; + timer_t timer; +} checkpoint_state; + +char* checkpoint_error(checkpoint_state* s); + +void checkpoint_init(checkpoint_state* s); +int checkpoint_open(checkpoint_state* s, unsigned int domid); +void checkpoint_close(checkpoint_state* s); +int checkpoint_start(checkpoint_state* s, int fd, + struct save_callbacks* callbacks); +int checkpoint_suspend(checkpoint_state* s); +int checkpoint_resume(checkpoint_state* s); +int checkpoint_postflush(checkpoint_state* s); + +int checkpoint_settimer(checkpoint_state* s, int millis); +int checkpoint_wait(checkpoint_state* s); +void block_timer(void); +void unblock_timer(void); + +#endif diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/checkpoint/libcheckpoint.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/checkpoint/libcheckpoint.c Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,782 @@ +/* API for checkpointing */ + +#include <fcntl.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <signal.h> +#include <sys/stat.h> + +#include <xenctrl.h> +#include <xenguest.h> +#include <xs.h> + +#include "checkpoint.h" + +static char errbuf[256]; + +static int setup_suspend_evtchn(checkpoint_state* s); +static void release_suspend_evtchn(checkpoint_state *s); +static int setup_shutdown_watch(checkpoint_state* s); +static int check_shutdown_watch(checkpoint_state* s); +static void release_shutdown_watch(checkpoint_state* s); +static int poll_evtchn(checkpoint_state* s); + +static int switch_qemu_logdirty(checkpoint_state* s, int enable); +static int suspend_hvm(checkpoint_state* s); +static int suspend_qemu(checkpoint_state* s); +static int resume_qemu(checkpoint_state* s); +static int send_qemu(checkpoint_state* s); + +static int create_suspend_timer(checkpoint_state* s); +static int delete_suspend_timer(checkpoint_state* s); +static int create_suspend_thread(checkpoint_state* s); +static void stop_suspend_thread(checkpoint_state* s); + +/* Returns a string describing the most recent error returned by + * a checkpoint function. Static -- do not free. */ +char* checkpoint_error(checkpoint_state* s) +{ + return s->errstr; +} + +void checkpoint_init(checkpoint_state* s) +{ + s->xch = -1; + s->xce = -1; + s->xsh = NULL; + s->watching_shutdown = 0; + + s->domid = 0; + s->domtype = dt_unknown; + s->fd = -1; + + s->suspend_evtchn = -1; + + s->errstr = NULL; + + s->suspended = 0; + s->done = 0; + s->suspend_thr = 0; + s->timer = 0; +} + +/* open a checkpoint session to guest domid */ +int checkpoint_open(checkpoint_state* s, unsigned int domid) +{ + xc_dominfo_t dominfo; + unsigned long pvirq; + + s->domid = domid; + + s->xch = xc_interface_open(); + if (s->xch < 0) { + s->errstr = "could not open control interface (are you root?)"; + + return -1; + } + + s->xsh = xs_daemon_open(); + if (!s->xsh) { + checkpoint_close(s); + s->errstr = "could not open xenstore handle"; + + return -1; + } + + s->xce = xc_evtchn_open(); + if (s->xce < 0) { + checkpoint_close(s); + s->errstr = "could not open event channel handle"; + + return -1; + } + + if (xc_domain_getinfo(s->xch, s->domid, 1, &dominfo) < 0) { + checkpoint_close(s); + s->errstr = "could not get domain info"; + + return -1; + } + if (dominfo.hvm) { + if (xc_get_hvm_param(s->xch, s->domid, HVM_PARAM_CALLBACK_IRQ, &pvirq)) { + checkpoint_close(s); + s->errstr = "could not get HVM callback IRQ"; + + return -1; + } + s->domtype = pvirq ? dt_pvhvm : dt_hvm; + } else + s->domtype = dt_pv; + + if (setup_shutdown_watch(s) < 0) { + checkpoint_close(s); + + return -1; + } + + if (s->domtype == dt_pv) { + if (setup_suspend_evtchn(s) < 0) { + checkpoint_close(s); + + return -1; + } + } else if (s->domtype == dt_pvhvm) { + checkpoint_close(s); + s->errstr = "PV-on-HVM is unsupported"; + + return -1; + } + + return 0; +} + +void checkpoint_close(checkpoint_state* s) +{ + if (s->timer) + delete_suspend_timer(s); + if (s->suspend_thr) + stop_suspend_thread(s); + + release_shutdown_watch(s); + release_suspend_evtchn(s); + + if (s->xch >= 0) { + xc_interface_close(s->xch); + s->xch = -1; + } + if (s->xce >= 0) { + xc_evtchn_close(s->xce); + s->xce = -1; + } + if (s->xsh) { + xs_daemon_close(s->xsh); + s->xsh = NULL; + } + + s->domid = 0; + s->fd = -1; + s->suspend_evtchn = -1; +} + +/* we toggle logdirty ourselves around the xc_domain_save call -- + * it avoids having to pass around checkpoint_state */ +static void noop_switch_logdirty(int domid, unsigned enable) +{ + return; +} + +int checkpoint_start(checkpoint_state* s, int fd, + struct save_callbacks* callbacks) +{ + int hvm, rc; + int flags = XCFLAGS_LIVE; + + if (!s->domid) { + s->errstr = "checkpoint state not opened"; + return -1; + } + + s->fd = fd; + + hvm = s->domtype > dt_pv; + if (hvm) { + flags |= XCFLAGS_HVM; + if ((rc = switch_qemu_logdirty(s, 1))) + return rc; + } + + rc = xc_domain_save(s->xch, fd, s->domid, 0, 0, flags, callbacks, hvm, + noop_switch_logdirty); + + if (hvm) + switch_qemu_logdirty(s, 0); + + return rc; +} + +/* suspend the domain. Returns 0 on failure, 1 on success */ +int checkpoint_suspend(checkpoint_state* s) +{ + struct timeval tv; + int rc; + + gettimeofday(&tv, NULL); + fprintf(stderr, "PROF: suspending at %lu.%06lu\n", (unsigned long)tv.tv_sec, + (unsigned long)tv.tv_usec); + + if (s->domtype == dt_hvm) { + return suspend_hvm(s) < 0 ? 0 : 1; + } + + rc = xc_evtchn_notify(s->xce, s->suspend_evtchn); + if (rc < 0) { + snprintf(errbuf, sizeof(errbuf), + "failed to notify suspend event channel: %d", rc); + s->errstr = errbuf; + + return 0; + } + + do { + rc = poll_evtchn(s); + } while (rc >= 0 && rc != s->suspend_evtchn); + if (rc <= 0) { + snprintf(errbuf, sizeof(errbuf), + "failed to receive suspend notification: %d", rc); + s->errstr = errbuf; + + return 0; + } + if (xc_evtchn_unmask(s->xce, s->suspend_evtchn) < 0) { + snprintf(errbuf, sizeof(errbuf), + "failed to unmask suspend notification channel: %d", rc); + s->errstr = errbuf; + + return 0; + } + + return 1; +} + +/* wait for a suspend to be triggered by another thread */ +int checkpoint_wait(checkpoint_state* s) +{ + int rc; + + if (!s->suspend_thr) { + s->errstr = "checkpoint timer is not active\n"; + return -1; + } + + do { + rc = sem_wait(&s->suspended_sem); + if (rc < 0 && errno != EINTR) { + snprintf(errbuf, sizeof(errbuf), + "error waiting for suspend semaphore: %d %d\n", rc, errno); + s->errstr = errbuf; + return -1; + } + } while (rc < 0); + + if (!s->suspended) { + snprintf(errbuf, sizeof(errbuf), "domain not suspended?\n"); + s->errstr = errbuf; + return -1; + } + + return 0; +} + +/* let guest execution resume */ +int checkpoint_resume(checkpoint_state* s) +{ + struct timeval tv; + int rc; + + if (xc_domain_resume(s->xch, s->domid, 1)) { + snprintf(errbuf, sizeof(errbuf), "error resuming domain: %d", errno); + s->errstr = errbuf; + + return -1; + } + + gettimeofday(&tv, NULL); + fprintf(stderr, "PROF: resumed at %lu.%06lu\n", (unsigned long)tv.tv_sec, + (unsigned long)tv.tv_usec); + + if (s->domtype > dt_pv && resume_qemu(s) < 0) + return -1; + + /* restore watchability in xenstore */ + if (xs_resume_domain(s->xsh, s->domid) < 0) + fprintf(stderr, "error resuming domain in xenstore\n"); + + s->suspended = 0; + + if (s->suspend_thr) { + if ((rc = sem_post(&s->resumed_sem))) + fprintf(stderr, "error posting resume semaphore\n"); + } + + return 0; +} + +/* called after xc_domain_save has flushed its buffer */ +int checkpoint_postflush(checkpoint_state *s) +{ + if (s->domtype > dt_pv && send_qemu(s) < 0) + return -1; + + return 0; +} + +/* force suspend within millis ms if copy hasn't completed yet */ +int checkpoint_settimer(checkpoint_state* s, int millis) +{ + struct itimerspec t; + int err; + + if (!s->suspend_thr) { + if (create_suspend_timer(s) < 0) + return -1; + + if (create_suspend_thread(s) < 0) { + delete_suspend_timer(s); + return -1; + } + } + + t.it_value.tv_sec = millis / 1000; + t.it_value.tv_nsec = (millis % 1000) * 1000000L; + t.it_interval.tv_sec = t.it_value.tv_sec; + t.it_interval.tv_nsec = t.it_value.tv_nsec; + + if ((err = timer_settime(s->timer, 0, &t, NULL))) { + fprintf(stderr, "Error arming timer: %d\n", err); + return -1; + } + + return 0; +} + +int delete_suspend_timer(checkpoint_state* s) +{ + int rc = 0; + + if (s->timer) { + if ((rc = timer_delete(s->timer))) + fprintf(stderr, "Error deleting timer: %s\n", strerror(errno)); + s->timer = NULL; + } + + return rc; +} + +/* Set up event channel used to signal a guest to suspend itself */ +static int setup_suspend_evtchn(checkpoint_state* s) +{ + int port; + + port = xs_suspend_evtchn_port(s->domid); + if (port < 0) { + s->errstr = "failed to read suspend event channel"; + return -1; + } + + s->suspend_evtchn = xc_suspend_evtchn_init(s->xch, s->xce, s->domid, port); + if (s->suspend_evtchn < 0) { + snprintf(errbuf, sizeof(errbuf), "failed to bind suspend event channel"); + s->errstr = errbuf; + + return -1; + } + + fprintf(stderr, "bound to suspend event channel %u:%d as %d\n", s->domid, port, + s->suspend_evtchn); + + return 0; +} + +/* release suspend event channels bound to guest */ +static void release_suspend_evtchn(checkpoint_state *s) +{ + /* TODO: teach xen to clean up if port is unbound */ + if (s->xce >= 0 && s->suspend_evtchn > 0) { + xc_suspend_evtchn_release(s->xce, s->suspend_evtchn); + s->suspend_evtchn = 0; + } +} + +static int setup_shutdown_watch(checkpoint_state* s) +{ + char buf[16]; + + /* write domain ID to watch so we can ignore other domain shutdowns */ + snprintf(buf, sizeof(buf), "%u", s->domid); + if ( !xs_watch(s->xsh, "@releaseDomain", buf) ) { + fprintf(stderr, "Could not bind to shutdown watch\n"); + return -1; + } + /* watch fires once on registration */ + s->watching_shutdown = 1; + check_shutdown_watch(s); + + return 0; +} + +static int check_shutdown_watch(checkpoint_state* s) { + unsigned int count; + char **vec; + char buf[16]; + + vec = xs_read_watch(s->xsh, &count); + if (s->watching_shutdown == 1) { + s->watching_shutdown = 2; + return 0; + } + if (!vec) { + fprintf(stderr, "empty watch fired\n"); + return 0; + } + snprintf(buf, sizeof(buf), "%d", s->domid); + if (!strcmp(vec[XS_WATCH_TOKEN], buf)) { + fprintf(stderr, "domain %d shut down\n", s->domid); + return -1; + } + + return 0; +} + +static void release_shutdown_watch(checkpoint_state* s) { + char buf[16]; + + if (!s->xsh) + return; + + if (!s->watching_shutdown) + return; + + snprintf(buf, sizeof(buf), "%u", s->domid); + if (!xs_unwatch(s->xsh, "@releaseDomain", buf)) + fprintf(stderr, "Could not release shutdown watch\n"); +} + +/* wrapper around xc_evtchn_pending which detects errors */ +static int poll_evtchn(checkpoint_state* s) +{ + int fd, xsfd, maxfd; + fd_set rfds, efds; + struct timeval tv; + int rc; + + fd = xc_evtchn_fd(s->xce); + xsfd = xs_fileno(s->xsh); + maxfd = fd > xsfd ? fd : xsfd; + FD_ZERO(&rfds); + FD_ZERO(&efds); + FD_SET(fd, &rfds); + FD_SET(xsfd, &rfds); + FD_SET(fd, &efds); + FD_SET(xsfd, &efds); + + /* give it 500 ms to respond */ + tv.tv_sec = 0; + tv.tv_usec = 500000; + + rc = select(maxfd + 1, &rfds, NULL, &efds, &tv); + if (rc < 0) + fprintf(stderr, "error polling event channel: %s\n", strerror(errno)); + else if (!rc) + fprintf(stderr, "timeout waiting for event channel\n"); + else if (FD_ISSET(fd, &rfds)) + return xc_evtchn_pending(s->xce); + else if (FD_ISSET(xsfd, &rfds)) + return check_shutdown_watch(s); + + return -1; +} + +/* adapted from the eponymous function in xc_save */ +static int switch_qemu_logdirty(checkpoint_state *s, int enable) +{ + char path[128]; + char *tail, *cmd, *response; + char **vec; + unsigned int len; + + sprintf(path, "/local/domain/0/device-model/%u/logdirty/", s->domid); + tail = path + strlen(path); + + strcpy(tail, "ret"); + if (!xs_watch(s->xsh, path, "qemu-logdirty-ret")) { + s->errstr = "error watching qemu logdirty return"; + return -1; + } + /* null fire. XXX unify with shutdown watch! */ + vec = xs_read_watch(s->xsh, &len); + free(vec); + + strcpy(tail, "cmd"); + cmd = enable ? "enable" : "disable"; + if (!xs_write(s->xsh, XBT_NULL, path, cmd, strlen(cmd))) { + s->errstr = "error signalling qemu logdirty"; + return -1; + } + + vec = xs_read_watch(s->xsh, &len); + free(vec); + + strcpy(tail, "ret"); + xs_unwatch(s->xsh, path, "qemu-logdirty-ret"); + + response = xs_read(s->xsh, XBT_NULL, path, &len); + if (!len || strcmp(response, cmd)) { + if (len) + free(response); + s->errstr = "qemu logdirty command failed"; + return -1; + } + free(response); + fprintf(stderr, "qemu logdirty mode: %s\n", cmd); + + return 0; +} + +static int suspend_hvm(checkpoint_state *s) +{ + int rc = -1; + + fprintf(stderr, "issuing HVM suspend hypercall\n"); + rc = xc_domain_shutdown(s->xch, s->domid, SHUTDOWN_suspend); + if (rc < 0) { + s->errstr = "shutdown hypercall failed"; + return -1; + } + fprintf(stderr, "suspend hypercall returned %d\n", rc); + + if (check_shutdown_watch(s) >= 0) + return -1; + + rc = suspend_qemu(s); + + return rc; +} + +static int suspend_qemu(checkpoint_state *s) +{ + char path[128]; + + fprintf(stderr, "pausing QEMU\n"); + + sprintf(path, "/local/domain/0/device-model/%d/command", s->domid); + if (!xs_write(s->xsh, XBT_NULL, path, "save", 4)) { + fprintf(stderr, "error signalling QEMU to save\n"); + return -1; + } + + sprintf(path, "/local/domain/0/device-model/%d/state", s->domid); + + do { + char* state; + unsigned int len; + + state = xs_read(s->xsh, XBT_NULL, path, &len); + if (!state) { + s->errstr = "error reading QEMU state"; + return -1; + } + + if (!strcmp(state, "paused")) { + free(state); + return 0; + } + + free(state); + usleep(1000); + } while(1); + + return -1; +} + +static int resume_qemu(checkpoint_state *s) +{ + char path[128]; + fprintf(stderr, "resuming QEMU\n"); + + sprintf(path, "/local/domain/0/device-model/%d/command", s->domid); + if (!xs_write(s->xsh, XBT_NULL, path, "continue", 8)) { + fprintf(stderr, "error signalling QEMU to resume\n"); + return -1; + } + + return 0; +} + +static int send_qemu(checkpoint_state *s) +{ + char buf[8192]; + char path[128]; + struct stat sb; + uint32_t qlen = 0; + int qfd; + int rc; + + if (s->fd < 0) + return -1; + + sprintf(path, "/var/lib/xen/qemu-save.%d", s->domid); + + if (stat(path, &sb) < 0) { + snprintf(errbuf, sizeof(errbuf), + "error getting QEMU state file status: %s", strerror(errno)); + s->errstr = errbuf; + return -1; + } + + qlen = sb.st_size; + qfd = open(path, O_RDONLY); + if (qfd < 0) { + snprintf(errbuf, sizeof(errbuf), "error opening QEMU state file: %s", + strerror(errno)); + s->errstr = errbuf; + return -1; + } + + fprintf(stderr, "Sending %u bytes of QEMU state\n", qlen); + if (write(s->fd, "RemusDeviceModelState", 21) != 21) { + s->errstr = "error writing QEMU header"; + close(qfd); + return -1; + } + if (write(s->fd, &qlen, sizeof(qlen)) != sizeof(qlen)) { + s->errstr = "error writing QEMU size"; + close(qfd); + return -1; + } + + while ((rc = read(qfd, buf, qlen > sizeof(buf) ? sizeof(buf) : qlen)) > 0) { + qlen -= rc; + if (write(s->fd, buf, rc) != rc) { + rc = -1; + break; + } + } + if (rc < 0) { + snprintf(errbuf, sizeof(errbuf), "error writing QEMU state: %s", + strerror(errno)); + s->errstr = errbuf; + } + + close(qfd); + + return rc; +} + +/*thread responsible to suspend the domain early if necessary*/ +static void *suspend_thread(void *arg) +{ + checkpoint_state* s = (checkpoint_state*)arg; + sigset_t tss; + int rc; + int sig; + + fprintf(stderr, "Suspend thread started\n"); + + sigemptyset(&tss); + sigaddset(&tss, SIGRTMIN); + + while (1) { + /* wait for checkpoint thread to signal resume */ + if ((rc = sem_wait(&s->resumed_sem))) + fprintf(stderr, "Error waiting on resume semaphore\n"); + + if ((rc = sigwait(&tss, &sig))) { + fprintf(stderr, "sigwait failed: %d %d\n", rc, errno); + break; + } + if (sig != SIGRTMIN) + fprintf(stderr, "received unexpected signal %d\n", sig); + + if (s->done) + break; + + if (s->suspended) { + fprintf(stderr, "domain already suspended?\n"); + } else { + rc = checkpoint_suspend(s); + if (rc) + s->suspended = 1; + else + fprintf(stderr, "checkpoint_suspend failed\n"); + } + + if ((rc = sem_post(&s->suspended_sem))) + fprintf(stderr, "Error posting suspend semaphore\n"); + } + + fprintf(stderr, "Suspend thread exiting\n"); + + return NULL; +} + +static int create_suspend_timer(checkpoint_state* s) +{ + struct sigevent event; + int err; + + event.sigev_notify = SIGEV_SIGNAL; + event.sigev_signo = SIGRTMIN; + event.sigev_value.sival_int = 0; + + if ((err = timer_create(CLOCK_REALTIME, &event, &s->timer))) { + snprintf(errbuf, sizeof(errbuf), "Error creating timer: %d\n", err); + s->errstr = errbuf; + return -1; + } + + return 0; +} + +void block_timer(void) +{ + sigset_t tss; + + sigemptyset(&tss); + sigaddset(&tss, SIGRTMIN); + + pthread_sigmask(SIG_BLOCK, &tss, NULL); +} + +void unblock_timer(void) +{ + sigset_t tss; + + sigemptyset(&tss); + sigaddset(&tss, SIGRTMIN); + + pthread_sigmask(SIG_UNBLOCK, &tss, NULL); +} + +static int create_suspend_thread(checkpoint_state* s) +{ + int err; + + if ((err = sem_init(&s->suspended_sem, 0, 0))) { + snprintf(errbuf, sizeof(errbuf), + "Error initializing suspend semaphore: %d\n", err); + s->errstr = errbuf; + return -1; + } + + if ((err = sem_init(&s->resumed_sem, 0, 0))) { + snprintf(errbuf, sizeof(errbuf), + "Error initializing resume semaphore: %d\n", err); + s->errstr = errbuf; + return -1; + } + + /* signal mask should be inherited */ + block_timer(); + + if ((err = pthread_create(&s->suspend_thr, NULL, suspend_thread, s))) { + snprintf(errbuf, sizeof(errbuf), "Error creating suspend thread: %d\n", err); + s->errstr = errbuf; + return -1; + } + + return 0; +} + +static void stop_suspend_thread(checkpoint_state* s) +{ + int err; + + s->done = 1; + + err = sem_post(&s->resumed_sem); + + err = pthread_join(s->suspend_thr, NULL); + s->suspend_thr = 0; +} diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/netlink/libnetlink.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/netlink/libnetlink.c Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,585 @@ +/* + * libnetlink.c RTnetlink service routines. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + * Authors: Alexey Kuznetsov, <kuznet@xxxxxxxxxxxxx> + * + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <syslog.h> +#include <fcntl.h> +#include <net/if_arp.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <string.h> +#include <errno.h> +#include <time.h> +#include <sys/uio.h> + +#include "libnetlink.h" + +void rtnl_close(struct rtnl_handle *rth) +{ + close(rth->fd); +} + +int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions, + int protocol) +{ + socklen_t addr_len; + int sndbuf = 32768; + int rcvbuf = 32768; + + memset(rth, 0, sizeof(rth)); + + rth->fd = socket(AF_NETLINK, SOCK_RAW, protocol); + if (rth->fd < 0) { + perror("Cannot open netlink socket"); + return -1; + } + + if (setsockopt(rth->fd,SOL_SOCKET,SO_SNDBUF,&sndbuf,sizeof(sndbuf)) < 0) { + perror("SO_SNDBUF"); + return -1; + } + + if (setsockopt(rth->fd,SOL_SOCKET,SO_RCVBUF,&rcvbuf,sizeof(rcvbuf)) < 0) { + perror("SO_RCVBUF"); + return -1; + } + + memset(&rth->local, 0, sizeof(rth->local)); + rth->local.nl_family = AF_NETLINK; + rth->local.nl_groups = subscriptions; + + if (bind(rth->fd, (struct sockaddr*)&rth->local, sizeof(rth->local)) < 0) { + perror("Cannot bind netlink socket"); + return -1; + } + addr_len = sizeof(rth->local); + if (getsockname(rth->fd, (struct sockaddr*)&rth->local, &addr_len) < 0) { + perror("Cannot getsockname"); + return -1; + } + if (addr_len != sizeof(rth->local)) { + fprintf(stderr, "Wrong address length %d\n", addr_len); + return -1; + } + if (rth->local.nl_family != AF_NETLINK) { + fprintf(stderr, "Wrong address family %d\n", rth->local.nl_family); + return -1; + } + rth->seq = time(NULL); + return 0; +} + +int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions) +{ + return rtnl_open_byproto(rth, subscriptions, NETLINK_ROUTE); +} + +int rtnl_wilddump_request(struct rtnl_handle *rth, int family, int type) +{ + struct { + struct nlmsghdr nlh; + struct rtgenmsg g; + } req; + struct sockaddr_nl nladdr; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + memset(&req, 0, sizeof(req)); + req.nlh.nlmsg_len = sizeof(req); + req.nlh.nlmsg_type = type; + req.nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; + req.nlh.nlmsg_pid = 0; + req.nlh.nlmsg_seq = rth->dump = ++rth->seq; + req.g.rtgen_family = family; + + return sendto(rth->fd, (void*)&req, sizeof(req), 0, + (struct sockaddr*)&nladdr, sizeof(nladdr)); +} + +int rtnl_send(struct rtnl_handle *rth, const char *buf, int len) +{ + struct sockaddr_nl nladdr; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + return sendto(rth->fd, buf, len, 0, (struct sockaddr*)&nladdr, sizeof(nladdr)); +} + +int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int len) +{ + struct nlmsghdr nlh; + struct sockaddr_nl nladdr; + struct iovec iov[2] = { + { .iov_base = &nlh, .iov_len = sizeof(nlh) }, + { .iov_base = req, .iov_len = len } + }; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = iov, + .msg_iovlen = 2, + }; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + nlh.nlmsg_len = NLMSG_LENGTH(len); + nlh.nlmsg_type = type; + nlh.nlmsg_flags = NLM_F_ROOT|NLM_F_MATCH|NLM_F_REQUEST; + nlh.nlmsg_pid = 0; + nlh.nlmsg_seq = rth->dump = ++rth->seq; + + return sendmsg(rth->fd, &msg, 0); +} + +int rtnl_dump_filter(struct rtnl_handle *rth, + rtnl_filter_t filter, + void *arg1, + rtnl_filter_t junk, + void *arg2) +{ + struct sockaddr_nl nladdr; + struct iovec iov; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + char buf[16384]; + + iov.iov_base = buf; + while (1) { + int status; + struct nlmsghdr *h; + + iov.iov_len = sizeof(buf); + status = recvmsg(rth->fd, &msg, 0); + + if (status < 0) { + if (errno == EINTR) + continue; + perror("OVERRUN"); + continue; + } + + if (status == 0) { + fprintf(stderr, "EOF on netlink\n"); + return -1; + } + + h = (struct nlmsghdr*)buf; + while (NLMSG_OK(h, status)) { + int err; + + if (nladdr.nl_pid != 0 || + h->nlmsg_pid != rth->local.nl_pid || + h->nlmsg_seq != rth->dump) { + if (junk) { + err = junk(&nladdr, h, arg2); + if (err < 0) + return err; + } + goto skip_it; + } + + if (h->nlmsg_type == NLMSG_DONE) + return 0; + if (h->nlmsg_type == NLMSG_ERROR) { + struct nlmsgerr *err = (struct nlmsgerr*)NLMSG_DATA(h); + if (h->nlmsg_len < NLMSG_LENGTH(sizeof(struct nlmsgerr))) { + fprintf(stderr, "ERROR truncated\n"); + } else { + errno = -err->error; + perror("RTNETLINK answers"); + } + return -1; + } + err = filter(&nladdr, h, arg1); + if (err < 0) + return err; + +skip_it: + h = NLMSG_NEXT(h, status); + } + if (msg.msg_flags & MSG_TRUNC) { + fprintf(stderr, "Message truncated\n"); + continue; + } + if (status) { + fprintf(stderr, "!!!Remnant of size %d\n", status); + exit(1); + } + } +} + +int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer, + unsigned groups, struct nlmsghdr *answer, + rtnl_filter_t junk, + void *jarg) +{ + int status; + unsigned seq; + struct nlmsghdr *h; + struct sockaddr_nl nladdr; + struct iovec iov = { + .iov_base = (void*) n, + .iov_len = n->nlmsg_len + }; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + char buf[16384]; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + nladdr.nl_pid = peer; + nladdr.nl_groups = groups; + + n->nlmsg_seq = seq = ++rtnl->seq; + + if (answer == NULL) + n->nlmsg_flags |= NLM_F_ACK; + + status = sendmsg(rtnl->fd, &msg, 0); + + if (status < 0) { + perror("Cannot talk to rtnetlink"); + return -1; + } + + memset(buf,0,sizeof(buf)); + + iov.iov_base = buf; + + while (1) { + iov.iov_len = sizeof(buf); + status = recvmsg(rtnl->fd, &msg, 0); + + if (status < 0) { + if (errno == EINTR) + continue; + perror("OVERRUN"); + continue; + } + if (status == 0) { + fprintf(stderr, "EOF on netlink\n"); + return -1; + } + if (msg.msg_namelen != sizeof(nladdr)) { + fprintf(stderr, "sender address length == %d\n", msg.msg_namelen); + exit(1); + } + for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) { + int err; + int len = h->nlmsg_len; + int l = len - sizeof(*h); + + if (l<0 || len>status) { + if (msg.msg_flags & MSG_TRUNC) { + fprintf(stderr, "Truncated message\n"); + return -1; + } + fprintf(stderr, "!!!malformed message: len=%d\n", len); + exit(1); + } + + if (nladdr.nl_pid != peer || + h->nlmsg_pid != rtnl->local.nl_pid || + h->nlmsg_seq != seq) { + if (junk) { + err = junk(&nladdr, h, jarg); + if (err < 0) + return err; + } + continue; + } + + if (h->nlmsg_type == NLMSG_ERROR) { + struct nlmsgerr *err = (struct nlmsgerr*)NLMSG_DATA(h); + if (l < sizeof(struct nlmsgerr)) { + fprintf(stderr, "ERROR truncated\n"); + } else { + errno = -err->error; + if (errno == 0) { + if (answer) + memcpy(answer, h, h->nlmsg_len); + return 0; + } + perror("RTNETLINK answers"); + } + return -1; + } + if (answer) { + memcpy(answer, h, h->nlmsg_len); + return 0; + } + + fprintf(stderr, "Unexpected reply!!!\n"); + + status -= NLMSG_ALIGN(len); + h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len)); + } + if (msg.msg_flags & MSG_TRUNC) { + fprintf(stderr, "Message truncated\n"); + continue; + } + if (status) { + fprintf(stderr, "!!!Remnant of size %d\n", status); + exit(1); + } + } +} + +int rtnl_listen(struct rtnl_handle *rtnl, + rtnl_filter_t handler, + void *jarg) +{ + int status; + struct nlmsghdr *h; + struct sockaddr_nl nladdr; + struct iovec iov; + struct msghdr msg = { + .msg_name = &nladdr, + .msg_namelen = sizeof(nladdr), + .msg_iov = &iov, + .msg_iovlen = 1, + }; + char buf[8192]; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + nladdr.nl_pid = 0; + nladdr.nl_groups = 0; + + iov.iov_base = buf; + while (1) { + iov.iov_len = sizeof(buf); + status = recvmsg(rtnl->fd, &msg, 0); + + if (status < 0) { + if (errno == EINTR) + continue; + perror("OVERRUN"); + continue; + } + if (status == 0) { + fprintf(stderr, "EOF on netlink\n"); + return -1; + } + if (msg.msg_namelen != sizeof(nladdr)) { + fprintf(stderr, "Sender address length == %d\n", msg.msg_namelen); + exit(1); + } + for (h = (struct nlmsghdr*)buf; status >= sizeof(*h); ) { + int err; + int len = h->nlmsg_len; + int l = len - sizeof(*h); + + if (l<0 || len>status) { + if (msg.msg_flags & MSG_TRUNC) { + fprintf(stderr, "Truncated message\n"); + return -1; + } + fprintf(stderr, "!!!malformed message: len=%d\n", len); + exit(1); + } + + err = handler(&nladdr, h, jarg); + if (err < 0) + return err; + + status -= NLMSG_ALIGN(len); + h = (struct nlmsghdr*)((char*)h + NLMSG_ALIGN(len)); + } + if (msg.msg_flags & MSG_TRUNC) { + fprintf(stderr, "Message truncated\n"); + continue; + } + if (status) { + fprintf(stderr, "!!!Remnant of size %d\n", status); + exit(1); + } + } +} + +int rtnl_from_file(FILE *rtnl, rtnl_filter_t handler, + void *jarg) +{ + int status; + struct sockaddr_nl nladdr; + char buf[8192]; + struct nlmsghdr *h = (void*)buf; + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + nladdr.nl_pid = 0; + nladdr.nl_groups = 0; + + while (1) { + int err, len, type; + int l; + + status = fread(&buf, 1, sizeof(*h), rtnl); + + if (status < 0) { + if (errno == EINTR) + continue; + perror("rtnl_from_file: fread"); + return -1; + } + if (status == 0) + return 0; + + len = h->nlmsg_len; + type= h->nlmsg_type; + l = len - sizeof(*h); + + if (l<0 || len>sizeof(buf)) { + fprintf(stderr, "!!!malformed message: len=%d @%lu\n", + len, ftell(rtnl)); + return -1; + } + + status = fread(NLMSG_DATA(h), 1, NLMSG_ALIGN(l), rtnl); + + if (status < 0) { + perror("rtnl_from_file: fread"); + return -1; + } + if (status < l) { + fprintf(stderr, "rtnl-from_file: truncated message\n"); + return -1; + } + + err = handler(&nladdr, h, jarg); + if (err < 0) + return err; + } +} + +int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data) +{ + int len = RTA_LENGTH(4); + struct rtattr *rta; + if (NLMSG_ALIGN(n->nlmsg_len) + len > maxlen) { + fprintf(stderr,"addattr32: Error! max allowed bound %d exceeded\n",maxlen); + return -1; + } + rta = NLMSG_TAIL(n); + rta->rta_type = type; + rta->rta_len = len; + memcpy(RTA_DATA(rta), &data, 4); + n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + len; + return 0; +} + +int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void *data, + int alen) +{ + int len = RTA_LENGTH(alen); + struct rtattr *rta; + + if (NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len) > maxlen) { + fprintf(stderr, "addattr_l ERROR: message exceeded bound of %d\n",maxlen); + return -1; + } + rta = NLMSG_TAIL(n); + rta->rta_type = type; + rta->rta_len = len; + memcpy(RTA_DATA(rta), data, alen); + n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + RTA_ALIGN(len); + return 0; +} + +int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len) +{ + if (NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len) > maxlen) { + fprintf(stderr, "addraw_l ERROR: message exceeded bound of %d\n",maxlen); + return -1; + } + + memcpy(NLMSG_TAIL(n), data, len); + memset((void *) NLMSG_TAIL(n) + len, 0, NLMSG_ALIGN(len) - len); + n->nlmsg_len = NLMSG_ALIGN(n->nlmsg_len) + NLMSG_ALIGN(len); + return 0; +} + +int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data) +{ + int len = RTA_LENGTH(4); + struct rtattr *subrta; + + if (RTA_ALIGN(rta->rta_len) + len > maxlen) { + fprintf(stderr,"rta_addattr32: Error! max allowed bound %d exceeded\n",maxlen); + return -1; + } + subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len)); + subrta->rta_type = type; + subrta->rta_len = len; + memcpy(RTA_DATA(subrta), &data, 4); + rta->rta_len = NLMSG_ALIGN(rta->rta_len) + len; + return 0; +} + +int rta_addattr_l(struct rtattr *rta, int maxlen, int type, + const void *data, int alen) +{ + struct rtattr *subrta; + int len = RTA_LENGTH(alen); + + if (RTA_ALIGN(rta->rta_len) + RTA_ALIGN(len) > maxlen) { + fprintf(stderr,"rta_addattr_l: Error! max allowed bound %d exceeded\n",maxlen); + return -1; + } + subrta = (struct rtattr*)(((char*)rta) + RTA_ALIGN(rta->rta_len)); + subrta->rta_type = type; + subrta->rta_len = len; + memcpy(RTA_DATA(subrta), data, alen); + rta->rta_len = NLMSG_ALIGN(rta->rta_len) + RTA_ALIGN(len); + return 0; +} + +int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len) +{ + memset(tb, 0, sizeof(struct rtattr *) * (max + 1)); + while (RTA_OK(rta, len)) { + if (rta->rta_type <= max) + tb[rta->rta_type] = rta; + rta = RTA_NEXT(rta,len); + } + if (len) + fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, rta->rta_len); + return 0; +} + +int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr *rta, int len) +{ + int i = 0; + + memset(tb, 0, sizeof(struct rtattr *) * max); + while (RTA_OK(rta, len)) { + if (rta->rta_type <= max && i < max) + tb[i++] = rta; + rta = RTA_NEXT(rta,len); + } + if (len) + fprintf(stderr, "!!!Deficit %d, rta_len=%d\n", len, rta->rta_len); + return i; +} diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/netlink/libnetlink.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/netlink/libnetlink.h Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,58 @@ +#ifndef __LIBNETLINK_H__ +#define __LIBNETLINK_H__ 1 + +#include <netinet/in.h> +#include <asm/types.h> +#include <linux/netlink.h> +#include <linux/rtnetlink.h> + +struct rtnl_handle +{ + int fd; + struct sockaddr_nl local; + struct sockaddr_nl peer; + __u32 seq; + __u32 dump; +}; + +extern int rtnl_open(struct rtnl_handle *rth, unsigned subscriptions); +extern int rtnl_open_byproto(struct rtnl_handle *rth, unsigned subscriptions, int protocol); +extern void rtnl_close(struct rtnl_handle *rth); +extern int rtnl_wilddump_request(struct rtnl_handle *rth, int fam, int type); +extern int rtnl_dump_request(struct rtnl_handle *rth, int type, void *req, int len); + +typedef int (*rtnl_filter_t)(const struct sockaddr_nl *, + struct nlmsghdr *n, void *); +extern int rtnl_dump_filter(struct rtnl_handle *rth, rtnl_filter_t filter, + void *arg1, + rtnl_filter_t junk, + void *arg2); +extern int rtnl_talk(struct rtnl_handle *rtnl, struct nlmsghdr *n, pid_t peer, + unsigned groups, struct nlmsghdr *answer, + rtnl_filter_t junk, + void *jarg); +extern int rtnl_send(struct rtnl_handle *rth, const char *buf, int); + + +extern int addattr32(struct nlmsghdr *n, int maxlen, int type, __u32 data); +extern int addattr_l(struct nlmsghdr *n, int maxlen, int type, const void *data, int alen); +extern int addraw_l(struct nlmsghdr *n, int maxlen, const void *data, int len); +extern int rta_addattr32(struct rtattr *rta, int maxlen, int type, __u32 data); +extern int rta_addattr_l(struct rtattr *rta, int maxlen, int type, const void *data, int alen); + +extern int parse_rtattr(struct rtattr *tb[], int max, struct rtattr *rta, int len); +extern int parse_rtattr_byindex(struct rtattr *tb[], int max, struct rtattr *rta, int len); + +#define parse_rtattr_nested(tb, max, rta) \ + (parse_rtattr((tb), (max), RTA_DATA(rta), RTA_PAYLOAD(rta))) + +extern int rtnl_listen(struct rtnl_handle *, rtnl_filter_t handler, + void *jarg); +extern int rtnl_from_file(FILE *, rtnl_filter_t handler, + void *jarg); + +#define NLMSG_TAIL(nmsg) \ + ((struct rtattr *) (((void *) (nmsg)) + NLMSG_ALIGN((nmsg)->nlmsg_len))) + +#endif /* __LIBNETLINK_H__ */ + diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/lowlevel/netlink/netlink.c --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/lowlevel/netlink/netlink.c Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,211 @@ +/* python binding to libnetlink */ + +#include <Python.h> +#include "libnetlink.h" + +#define PKG "xen.lowlevel.netlink" + +typedef struct { + PyObject_HEAD + int opened; + struct rtnl_handle rth; +} PyRtnlObject; + +/* todo: subscriptions? */ +static PyObject* PyRtnl_new(PyTypeObject* type, PyObject* args, + PyObject* kwargs) +{ + return type->tp_alloc(type, 0); +} + +static int PyRtnl_init(PyObject* obj, PyObject* args, PyObject* kwargs) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + + if (rtnl_open(&self->rth, 0) < 0) { + PyErr_SetString(PyExc_IOError, "could not open rtnl handle"); + return -1; + } + + return 0; +} + +static void PyRtnl_dealloc(PyRtnlObject* obj) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + + rtnl_close(&self->rth); +} + +static PyObject* pyrtnl_talk(PyObject* obj, PyObject* args) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + char* msg; + int len; + int peer = 0; + int groups = 0; + + if (!PyArg_ParseTuple(args, "s#|ii", &msg, &len, &peer, &groups)) + return NULL; + + if (rtnl_talk(&self->rth, (struct nlmsghdr*)msg, peer, groups, NULL, NULL, + NULL) < 0) + { + PyErr_SetString(PyExc_IOError, "error sending message"); + return NULL; + } + + Py_RETURN_NONE; +} + +static PyObject* pyrtnl_wilddump_request(PyObject* obj, PyObject* args) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + int family, type; + + if (!PyArg_ParseTuple(args, "ii", &family, &type)) + return NULL; + + if (rtnl_wilddump_request(&self->rth, family, type) < 0) { + PyErr_SetString(PyExc_IOError, "could not send dump request"); + return NULL; + } + + Py_RETURN_NONE; +} + +static PyObject* pyrtnl_dump_request(PyObject* obj, PyObject* args) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + int type; + char* req; + int len; + + if (!PyArg_ParseTuple(args, "is#", &type, &req, &len)) + return NULL; + + if (rtnl_dump_request(&self->rth, type, req, len) < 0) { + PyErr_SetString(PyExc_IOError, "could not send dump request"); + return NULL; + } + + Py_RETURN_NONE; +} + +/* translate args to python and call python callback */ +static int dump_filter_helper(const struct sockaddr_nl *who, + struct nlmsghdr *n, void *arg) +{ + PyObject* filter = arg; + PyObject* args; + PyObject* result; + + args = Py_BuildValue("s#s#", who, sizeof(*who), n, n->nlmsg_len); + result = PyObject_CallObject(filter, args); + Py_DECREF(args); + if (!result) + return -1; + + /* result is ignored as long as an exception isn't raised */ + Py_DECREF(result); + return 0; +} + +static PyObject* pyrtnl_dump_filter(PyObject* obj, PyObject* args) +{ + PyRtnlObject* self = (PyRtnlObject*)obj; + PyObject *filter; + + if (!PyArg_ParseTuple(args, "O:dump_filter", &filter)) + return NULL; + + if (!PyCallable_Check(filter)) { + PyErr_SetString(PyExc_TypeError, "parameter must be callable"); + return NULL; + } + + Py_INCREF(filter); + if (rtnl_dump_filter(&self->rth, dump_filter_helper, filter, NULL, + NULL) < 0) + { + Py_DECREF(filter); + return NULL; + } + Py_DECREF(filter); + + Py_RETURN_NONE; +} + +static PyMethodDef PyRtnl_methods[] = { + { "talk", pyrtnl_talk, METH_VARARGS, + "send a message to rtnetlink and receive a response.\n" }, + { "wilddump_request", pyrtnl_wilddump_request, METH_VARARGS, + "dump objects.\n" }, + { "dump_request", pyrtnl_dump_request, METH_VARARGS, + "start a dump of a particular netlink type.\n" }, + { "dump_filter", pyrtnl_dump_filter, METH_VARARGS, + "iterate over an rtnl dump.\n" }, + { NULL } +}; + +static PyTypeObject PyRtnlType = { + PyObject_HEAD_INIT(NULL) + 0, /* ob_size */ + PKG ".rtnl", /* tp_name */ + sizeof(PyRtnlObject), /* tp_basicsize */ + 0, /* tp_itemsize */ + (destructor)PyRtnl_dealloc, /* tp_dealloc */ + NULL, /* tp_print */ + NULL, /* tp_getattr */ + NULL, /* tp_setattr */ + NULL, /* tp_compare */ + NULL, /* tp_repr */ + NULL, /* tp_as_number */ + NULL, /* tp_as_sequence */ + NULL, /* tp_as_mapping */ + NULL, /* tp_hash */ + NULL, /* tp_call */ + NULL, /* tp_str */ + NULL, /* tp_getattro */ + NULL, /* tp_setattro */ + NULL, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT, /* tp_flags */ + "rtnetlink handle", /* tp_doc */ + NULL, /* tp_traverse */ + NULL, /* tp_clear */ + NULL, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + NULL, /* tp_iter */ + NULL, /* tp_iternext */ + PyRtnl_methods, /* tp_methods */ + NULL, /* tp_members */ + NULL, /* tp_getset */ + NULL, /* tp_base */ + NULL, /* tp_dict */ + NULL, /* tp_descr_get */ + NULL, /* tp_descr_set */ + 0, /* tp_dictoffset */ + PyRtnl_init, /* tp_init */ + NULL, /* tp_alloc */ + PyRtnl_new, /* tp_new */ +}; + +static PyMethodDef methods[] = { + { NULL } +}; + +static char doc[] = "libnetlink wrapper"; + +PyMODINIT_FUNC initnetlink(void) +{ + PyObject *mod; + + if (PyType_Ready(&PyRtnlType) == -1) + return; + + if (!(mod = Py_InitModule3(PKG, methods, doc))) + return; + + Py_INCREF(&PyRtnlType); + PyModule_AddObject(mod, "rtnl", (PyObject *)&PyRtnlType); +} diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/blkdev.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/blkdev.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,31 @@ +handlers = [] + +class BlkDevException(Exception): pass + +class BlkDev(object): + "Object representing a VM block device" + def __init__(self, **props): + self.uname = '' + if 'dev' not in props: + raise BlkDevException('no device') + #if 'uname' not in props: + #raise BlkDevException('no uname') + if 'mode' not in props: + raise BlkDevException('no mode') + self.__dict__.update(props) + self.dev = props['dev'].rstrip(':disk') + + def __str__(self): + return '%s,%s,%s' % (self.uname, self.dev, self.mode) + +def register(handler): + "register a block device class with parser" + if handler not in handlers: + handlers.insert(0, handler) + +def parse(props): + "turn a vm device dictionary into a blkdev object" + for handler in handlers: + if handler.handles(**props): + return handler(**props) + return BlkDev(**props) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/image.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/image.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,227 @@ +# VM image file manipulation + +import logging, struct + +import vm + +SIGNATURE = 'LinuxGuestRecord' +LONGLEN = struct.calcsize('L') +INTLEN = struct.calcsize('i') +PAGE_SIZE = 4096 +# ~0L +P2M_EXT_SIG = 4294967295L +# frames per page +FPP = 1024 +LTAB_MASK = 0xf << 28 +BATCH_SIZE = 1024 +IDXLEN = INTLEN + BATCH_SIZE * LONGLEN + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger() + +class VMParseException(Exception): pass + +class VMImage(object): + def __init__(self, img=None): + """img may be a path or a file object. + If compact is True, apply checkpoints to base image instead + of simply concatenating them. + """ + self.img = img + + self.dom = None + self.fd = None + self.header = None + self.nr_pfns = 0 + # p2m extension header (unparsed) + self.p2mext = None + + if self.img: + self.open(self.img) + + def open(self, img): + if isinstance(img, str): + self.fd = file(img, 'rb') + else: + self.fd = img + + self.readheader() + + def readheader(self): + sig = self.fd.read(len(SIGNATURE)) + if sig != SIGNATURE: + raise VMParseException("Bad signature in image") + + hlen = self.fd.read(INTLEN) + hlen, = struct.unpack('!i', hlen) + + self.header = self.fd.read(hlen) + self.dom = parseheader(self.header) + + def readp2mfl(self): + "read the P2M frame list" + pfnlen = self.fd.read(LONGLEN) + self.nr_pfns, = struct.unpack('L', pfnlen) + p2m0 = self.fd.read(LONGLEN) + + p2mhdr = p2m0 + p2m0, = struct.unpack('L', p2m0) + if p2m0 == P2M_EXT_SIG: + elen = self.fd.read(INTLEN) + elen, = struct.unpack('I', elen) + + self.p2mext = self.fd.read(elen) + + p2m0 = self.fd.read(LONGLEN) + p2m0, = struct.unpack('L', p2m0) + p2mfl = [p2m0] + + p2mfle = (self.nr_pfns + FPP - 1)/FPP - 1 + p2ms = self.fd.read(LONGLEN * p2mfle) + p2mfl.extend(struct.unpack('%dL' % p2mfle, p2ms)) + + self.p2mfl = p2mfl + + def flush(self): + self.ofd.write(self.tail) + +class Writer(object): + """compress a stream of checkpoints into a single image of the + last checkpoint""" + def __init__(self, fd, compact=False): + self.fd = fd + self.compact = compact + + self.vm = None + self.tail = None + # offset to first batch of pages + self.imgstart = 0 + # PFN mappings + self.pfns = [] + + def __del__(self): + self.close() + + def writeheader(self): + hlen = struct.pack('!i', len(self.vm.header)) + header = ''.join([SIGNATURE, hlen, self.vm.header]) + self.fd.write(header) + + def writep2mfl(self): + p2m = [struct.pack('L', self.vm.nr_pfns)] + if self.vm.p2mext: + p2m.extend([struct.pack('L', P2M_EXT_SIG), self.vm.p2mext]) + p2m.append(struct.pack('%dL' % len(self.vm.p2mfl), *self.vm.p2mfl)) + self.fd.write(''.join(p2m)) + + def writebatch(self, batch): + def offset(pfn): + isz = (pfn / BATCH_SIZE + 1) * IDXLEN + return self.imgstart + isz + pfn * PAGE_SIZE + + if not self.compact: + return self.fd.write(batch) + + batch = parsebatch(batch) + # sort pages for better disk seek behaviour + batch.sort(lambda x, y: cmp(x[0] & ~LTAB_MASK, y[0] & ~LTAB_MASK)) + + for pfndesc, page in batch: + pfn = pfndesc & ~LTAB_MASK + if pfn > self.vm.nr_pfns: + log.error('INVALID PFN: %d' % pfn) + if len(self.pfns) <= pfn: + self.pfns.extend([0] * (pfn - len(self.pfns) + 1)) + self.pfns[pfn] = pfndesc + self.fd.seek(offset(pfn)) + self.fd.write(page) + + #print "max offset: %d, %d" % (len(self.pfns), offset(self.pfns[-1])) + + def writeindex(self): + "Write batch header in front of each page" + hdrlen = INTLEN + BATCH_SIZE * LONGLEN + batches = (len(self.pfns) + BATCH_SIZE - 1) / BATCH_SIZE + + for i in xrange(batches): + offset = self.imgstart + i * (hdrlen + (PAGE_SIZE * BATCH_SIZE)) + pfnoff = i * BATCH_SIZE + # python auto-clamps overreads + pfns = self.pfns[pfnoff:pfnoff + BATCH_SIZE] + + self.fd.seek(offset) + self.fd.write(struct.pack('i', len(pfns))) + self.fd.write(struct.pack('%dL' % len(pfns), *pfns)) + + def slurp(self, ifd): + """Apply an incremental checkpoint to a loaded image. + accepts a path or a file object.""" + if isinstance(ifd, str): + ifd = file(ifd, 'rb') + + if not self.vm: + self.vm = VMImage(ifd) + self.writeheader() + + self.vm.readp2mfl() + self.writep2mfl() + self.imgstart = self.fd.tell() + + while True: + l, batch = readbatch(ifd) + if l <= 0: + break + self.writebatch(batch) + self.tail = batch + ifd.read() + + def flush(self): + if self.tail: + self.fd.seek(0, 2) + self.fd.write(self.tail) + if self.compact: + self.writeindex() + self.tail = None + + def close(self): + self.flush() + +def parseheader(header): + "parses a header sexpression" + return vm.parsedominfo(vm.strtosxpr(header)) + +def makeheader(dominfo): + "create an image header from a VM dominfo sxpr" + items = [SIGNATURE] + sxpr = vm.sxprtostr(dominfo) + items.append(struct.pack('!i', len(sxpr))) + items.append(sxpr) + return ''.join(items) + +def readbatch(fd): + batch = [] + batchlen = fd.read(INTLEN) + batch.append(batchlen) + batchlen, = struct.unpack('i', batchlen) + log.info("batch length: %d" % batchlen) + if batchlen <= 0: + return (batchlen, batch[0]) + + batchfns = fd.read(LONGLEN * batchlen) + batch.append(batchfns) + pages = fd.read(PAGE_SIZE * batchlen) + if len(pages) != PAGE_SIZE * batchlen: + log.error('SHORT READ: %d' % len(pages)) + batch.append(pages) + + return (batchlen, ''.join(batch)) + +def parsebatch(batch): + "parse a batch string into pages" + batchlen, batch = batch[:INTLEN], batch[INTLEN:] + batchlen, = struct.unpack('i', batchlen) + #print 'batch length: %d' % batchlen + pfnlen = batchlen * LONGLEN + pfns = struct.unpack('%dL' % batchlen, batch[:pfnlen]) + pagebuf = batch[pfnlen:] + pages = [pagebuf[i*PAGE_SIZE:(i+1)*PAGE_SIZE] for i in xrange(batchlen)] + return zip(pfns, pages) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/netlink.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/netlink.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,314 @@ +# netlink wrappers + +import socket, struct +import xen.lowlevel.netlink + +NETLINK_ROUTE = 0 + +NLM_F_REQUEST = 1 # It is request message. +NLM_F_MULTI = 2 # Multipart message, terminated by NLMSG_DONE +NLM_F_ACK = 4 # Reply with ack, with zero or error code +NLM_F_ECHO = 8 # Echo this request + +# Modifiers to GET request +NLM_F_ROOT = 0x100 # specify tree root +NLM_F_MATCH = 0x200 # return all matching +NLM_F_ATOMIC = 0x400 # atomic GET +NLM_F_DUMP = NLM_F_ROOT|NLM_F_MATCH + +# Modifiers to NEW request +NLM_F_REPLACE = 0x100 # Override existing +NLM_F_EXCL = 0x200 # Do not touch, if it exists +NLM_F_CREATE = 0x400 # Create, if it does not exist +NLM_F_APPEND = 0x800 # Add to end of list + +RTM_NEWLINK = 16 +RTM_GETLINK = 18 +RTM_NEWQDISC = 36 +RTM_DELQDISC = 37 +RTM_GETQDISC = 38 + +IFLA_UNSPEC = 0 +IFLA_ADDRESS = 1 +IFLA_BROADCAST = 2 +IFLA_IFNAME = 3 +IFLA_MTU = 4 +IFLA_LINK = 5 +IFLA_QDISC = 6 +IFLA_STATS = 7 +IFLA_COST = 8 +IFLA_PRIORITY = 9 +IFLA_MASTER = 10 +IFLA_WIRELESS = 11 +IFLA_PROTINFO = 12 +IFLA_TXQLEN = 13 +IFLA_MAP = 14 +IFLA_WEIGHT = 15 + +TCA_UNSPEC = 0 +TCA_KIND = 1 +TCA_OPTIONS = 2 +TCA_STATS = 3 +TCA_XSTATS = 4 +TCA_RATE = 5 +TCA_FCNT = 6 +TCA_STATS2 = 7 + +class RTNLException(Exception): pass + +def align(l, alignto=4): + return (l + alignto - 1) & ~(alignto - 1) + +class rtattr(object): + "rtattribute" + fmt = "HH" + fmtlen = struct.calcsize(fmt) + + def __init__(self, msg=None): + if msg: + self.unpack(msg) + else: + self.rta_len = 0 + self.rta_type = 0 + + self.body = '' + + def __len__(self): + return align(self.rta_len) + + def pack(self): + self.rta_len = align(self.fmtlen + len(self.body)) + s = struct.pack(self.fmt, self.rta_len, self.rta_type) + self.body + pad = self.rta_len - len(s) + if pad: + s += '\0' * pad + return s + + def unpack(self, msg): + args = struct.unpack(self.fmt, msg[:self.fmtlen]) + self.rta_len, self.rta_type = args + + self.body = msg[align(self.fmtlen):self.rta_len] + +class rtattrlist(object): + def __init__(self, msg): + self.start = msg + + def __iter__(self): + body = self.start + while len(body) > rtattr.fmtlen: + rta = rtattr(body) + yield rta + body = body[len(rta):] + +class nlmsg(object): + "netlink message header" + fmt = "IHHII" + fmtlen = struct.calcsize(fmt) + + def __init__(self, msg=None): + if msg: + self.unpack(msg) + else: + self.nlmsg_len = 0 + self.nlmsg_type = 0 + self.nlmsg_flags = 0 + self.nlmsg_seq = 0 + self.nlmsg_pid = 0 + + self.rta = '' + self.body = '' + + def __len__(self): + return align(self.fmtlen + len(self.body) + len(self.rta)) + + def addattr(self, type, data): + attr = rtattr() + attr.rta_type = type + attr.body = data + self.rta += attr.pack() + + def settype(self, cmd): + self.nlmsg_type = cmd + + def pack(self): + return struct.pack(self.fmt, len(self), self.nlmsg_type, + self.nlmsg_flags, self.nlmsg_seq, + self.nlmsg_pid) + self.body + self.rta + + def unpack(self, msg): + args = struct.unpack(self.fmt, msg[:self.fmtlen]) + self.nlmsg_len, self.nlmsg_type, self.nlmsg_flags = args[:3] + self.nlmsg_seq, self.nlmsg_pid = args[3:] + + self.body = msg[align(self.fmtlen):] + self.rta = '' + + def __str__(self): + return '<netlink message, len %d, type %d>' % \ + (self.nlmsg_len, self.nlmsg_type) + +class ifinfomsg(object): + "interface info message" + fmt = "BxHiII" + fmtlen = struct.calcsize(fmt) + + def __init__(self, msg=None): + if msg: + self.unpack(msg) + else: + self.ifi_family = 0 + self.ifi_type = 0 + self.ifi_index = 0 + self.ifi_flags = 0 + self.ifi_change = 0 + + self.body = '' + + def unpack(self, msg): + args = struct.unpack(self.fmt, msg[:self.fmtlen]) + self.ifi_family, self.ifi_type, self.ifi_index= args[:3] + self.ifi_flags, self.ifi_change = args[3:] + + self.body = msg[align(self.fmtlen):] + + def __str__(self): + return '<ifinfo message, family %d, type %d, index %d>' % \ + (self.ifi_family, self.ifi_type, self.ifi_index) + +class tcmsg(object): + "TC message" + fmt = "BxxxiIII" + fmtlen = struct.calcsize(fmt) + + def __init__(self, msg=None): + if msg: + self.unpack(msg) + else: + self.tcm_family = socket.AF_UNSPEC + self.tcm_ifindex = 0 + self.tcm_handle = 0 + self.tcm_parent = 0 + self.tcm_info = 0 + + self.rta = '' + + def unpack(self, msg): + args = struct.unpack(self.fmt, msg[:self.fmtlen]) + self.tcm_family, self.tcm_ifindex, self.tcm_handle = args[:3] + self.tcm_parent, self.tcm_info = args[3:] + + self.rta = msg[align(self.fmtlen):] + + def pack(self): + return struct.pack(self.fmt, self.tcm_family, self.tcm_ifindex, + self.tcm_handle, self.tcm_parent, self.tcm_info) + + def __str__(self): + return '<tc message, family %d, index %d>' % \ + (self.tcm_family, self.tcm_ifindex) + +class newlinkmsg(object): + def __init__(self, nlmsg): + if nlmsg.nlmsg_type != RTM_NEWLINK: + raise RTNLException("wrong message type") + self.nlmsg = nlmsg + self.ifi = ifinfomsg(self.nlmsg.body) + + self.rtattrs = {} + for rta in rtattrlist(self.ifi.body): + self.rtattrs[rta.rta_type] = rta.body + +class newqdiscmsg(object): + def __init__(self, nlmsg): + if nlmsg.nlmsg_type != RTM_NEWQDISC: + raise RTNLException("wrong message type") + self.nlmsg = nlmsg + self.t = tcmsg(self.nlmsg.body) + + self.rtattrs = {} + for rta in rtattrlist(self.t.rta): + self.rtattrs[rta.rta_type] = rta.body + +class rtnl(object): + def __init__(self): + self._rth = xen.lowlevel.netlink.rtnl() + self._linkcache = None + + def getlink(self, key, cached=False): + """returns the interface object corresponding to the key, which + may be an index number or device name.""" + if not cached: + self._linkcache = None + if self._linkcache is None: + self._linkcache = self.getlinks() + + if isinstance(key, int): + return self._linkcache.get(key) + + for k, v in self._linkcache.iteritems(): + if v['name'] == key: + return v + + return None + + def getlinks(self): + """returns a dictionary of interfaces keyed by kernel + interface index""" + links = {} + def dumpfilter(addr, msgstr): + msg = newlinkmsg(nlmsg(msgstr)) + idx = msg.ifi.ifi_index + ifname = msg.rtattrs[IFLA_IFNAME].strip('\0') + address = msg.rtattrs.get(IFLA_ADDRESS) + + link = {'index': idx, + 'type': msg.ifi.ifi_type, + 'name': ifname, + 'address': address} + links[idx] = link + + self._rth.wilddump_request(socket.AF_UNSPEC, RTM_GETLINK) + self._rth.dump_filter(dumpfilter) + + return links + + def getqdisc(self, dev): + """returns the queueing discipline on device dev, which may be + specified by kernel index or device name""" + qdiscs = self.getqdiscs(dev) + if qdiscs: + return qdiscs.values()[0] + return None + + def getqdiscs(self, dev=None): + """returns a dictionary of queueing disciplines keyed by kernel + interface index""" + qdiscs = {} + def dumpfilter(addr, msgstr): + msg = newqdiscmsg(nlmsg(msgstr)) + idx = msg.t.tcm_ifindex + handle = msg.t.tcm_handle + kind = msg.rtattrs[TCA_KIND].strip('\0') + opts = msg.rtattrs.get(TCA_OPTIONS) + + qdisc = {'index': idx, + 'handle': handle, + 'kind': kind, + 'options': opts} + qdiscs[idx] = qdisc + + tcm = tcmsg() + if dev: + link = self.getlink(dev) + if not link: + raise QdiscException('device %s not found' % dev) + tcm.tcm_ifindex = link['index'] + + msg = tcm.pack() + self._rth.dump_request(RTM_GETQDISC, msg) + self._rth.dump_filter(dumpfilter) + return qdiscs + + def talk(self, req): + self._rth.talk(req) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/profile.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/profile.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,56 @@ +"""Simple profiling module +""" + +import time + +class ProfileBlock(object): + """A section of code to be profiled""" + def __init__(self, name): + self.name = name + + def enter(self): + print "PROF: entered %s at %f" % (self.name, time.time()) + + def exit(self): + print "PROF: exited %s at %f" % (self.name, time.time()) + +class NullProfiler(object): + def enter(self, name): + pass + + def exit(self, name=None): + pass + +class Profiler(object): + def __init__(self): + self.blocks = {} + self.running = [] + + def enter(self, name): + try: + block = self.blocks[name] + except KeyError: + block = ProfileBlock(name) + self.blocks[name] = block + + block.enter() + self.running.append(block) + + def exit(self, name=None): + if name is not None: + block = None + while self.running: + tmp = self.running.pop() + if tmp.name == name: + block = tmp + break + tmp.exit() + if not block: + raise KeyError('block %s not running' % name) + else: + try: + block = self.running.pop() + except IndexError: + raise KeyError('no block running') + + block.exit() diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/qdisc.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/qdisc.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,178 @@ +import socket, struct + +import netlink + +qdisc_kinds = {} + +TC_H_ROOT = 0xFFFFFFFF + +class QdiscException(Exception): pass + +class request(object): + "qdisc request message" + def __init__(self, cmd, flags=0, dev=None, handle=0): + self.n = netlink.nlmsg() + self.t = netlink.tcmsg() + + self.n.nlmsg_flags = netlink.NLM_F_REQUEST|flags + self.n.nlmsg_type = cmd + self.t.tcm_family = socket.AF_UNSPEC + + if not handle: + handle = TC_H_ROOT + self.t.tcm_parent = handle + + if dev: + self.t.tcm_ifindex = dev + + def pack(self): + t = self.t.pack() + self.n.body = t + return self.n.pack() + +class addrequest(request): + def __init__(self, dev, handle, qdisc): + flags = netlink.NLM_F_EXCL|netlink.NLM_F_CREATE + super(addrequest, self).__init__(netlink.RTM_NEWQDISC, flags=flags, + dev=dev, handle=handle) + self.n.addattr(netlink.TCA_KIND, qdisc.kind) + opts = qdisc.pack() + if opts: + self.n.addattr(netlink.TCA_OPTIONS, opts) + +class delrequest(request): + def __init__(self, dev, handle): + super(delrequest, self).__init__(netlink.RTM_DELQDISC, dev=dev, + handle=handle) + +class changerequest(request): + def __init__(self, dev, handle, qdisc): + super(changerequest, self).__init__(netlink.RTM_NEWQDISC, + dev=dev, handle=handle) + self.n.addattr(netlink.TCA_KIND, qdisc.kind) + opts = qdisc.pack() + if opts: + self.n.addattr(netlink.TCA_OPTIONS, opts) + +class Qdisc(object): + def __new__(cls, qdict=None, *args, **opts): + if qdict: + kind = qdict.get('kind') + cls = qdisc_kinds.get(kind, cls) + obj = super(Qdisc, cls).__new__(cls, qdict=qdict, *args, **opts) + return obj + + def __init__(self, qdict): + self._qdict = qdict + self.kind = qdict['kind'] + self.handle = qdict['handle'] >> 16 + + def parse(self, opts): + if opts: + raise QdiscException('cannot parse qdisc parameters') + + def optstr(self): + if self.qdict['options']: + return '[cannot parse qdisc parameters]' + else: + return '' + + def pack(self): + return '' + +TC_PRIO_MAX = 15 +class PrioQdisc(Qdisc): + fmt = 'i%sB' % (TC_PRIO_MAX + 1) + + def __init__(self, qdict): + super(PrioQdisc, self).__init__(qdict) + + if qdict.get('options'): + self.unpack(qdict['options']) + else: + self.bands = 3 + self.priomap = [1, 2, 2, 2, 1, 2, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1] + + def pack(self): + #return struct.pack(self.fmt, self.bands, *self.priomap) + return '' + + def unpack(self, opts): + args = struct.unpack(self.fmt, opts) + self.bands = args[0] + self.priomap = args[1:] + + def optstr(self): + mapstr = ' '.join([str(p) for p in self.priomap]) + return 'bands %d priomap %s' % (self.bands, mapstr) + +qdisc_kinds['prio'] = PrioQdisc +qdisc_kinds['pfifo_fast'] = PrioQdisc + +class CfifoQdisc(Qdisc): + fmt = 'II' + + def __init__(self, qdict): + super(CfifoQdisc, self).__init__(qdict) + + if qdict.get('options'): + self.unpack(qdict['options']) + else: + self.epoch = 0 + self.vmid = 0 + + def pack(self): + return struct.pack(self.fmt, self.epoch, self.vmid) + + def unpack(self, opts): + self.epoch, self.vmid = struct.unpack(self.fmt, opts) + + def parse(self, opts): + args = list(opts) + try: + while args: + arg = args.pop(0) + if arg == 'epoch': + self.epoch = int(args.pop(0)) + continue + if arg.lower() == 'vmid': + self.vmid = int(args.pop(0)) + continue + except Exception, inst: + raise QdiscException(str(inst)) + + def optstr(self): + return 'epoch %d vmID %d' % (self.epoch, self.vmid) + +qdisc_kinds['cfifo'] = CfifoQdisc + +TC_QUEUE_CHECKPOINT = 0 +TC_QUEUE_RELEASE = 1 + +class QueueQdisc(Qdisc): + fmt = 'I' + + def __init__(self, qdict=None): + if not qdict: + qdict = {'kind': 'queue', + 'handle': TC_H_ROOT} + super(QueueQdisc, self).__init__(qdict) + + self.action = 0 + + def pack(self): + return struct.pack(self.fmt, self.action) + + def parse(self, args): + if not args: + raise QdiscException('no action given') + arg = args[0] + + if arg == 'checkpoint': + self.action = TC_QUEUE_CHECKPOINT + elif arg == 'release': + self.action = TC_QUEUE_RELEASE + else: + raise QdiscException('unknown action') + +qdisc_kinds['queue'] = QueueQdisc diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/save.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/save.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,172 @@ +#!/usr/bin/env python + +import os, select, socket, threading, time, signal, xmlrpclib + +from xen.xend.XendClient import server +from xen.xend.xenstore.xswatch import xswatch + +import xen.lowlevel.xc +from xen.xend.xenstore import xsutil +xc = xen.lowlevel.xc.xc() + +import xen.lowlevel.checkpoint + +import vm, image + +XCFLAGS_LIVE = 1 + +xcsave = '/usr/lib/xen/bin/xc_save' + +class _proxy(object): + "proxy simulates an object without inheritance" + def __init__(self, obj): + self._obj = obj + + def __getattr__(self, name): + return getattr(self._obj, name) + + def proxy(self, obj): + self._obj = obj + +class CheckpointError(Exception): pass + +class CheckpointingFile(_proxy): + """Tee writes into separate file objects for each round. + This is necessary because xc_save gets a single file descriptor + for the duration of checkpointing. + """ + def __init__(self, path): + self.path = path + + self.round = 0 + self.rfd, self.wfd = os.pipe() + self.fd = file(path, 'wb') + + # this pipe is used to notify the writer thread of checkpoints + self.cprfd, self.cpwfd = os.pipe() + + super(CheckpointingFile, self).__init__(self.fd) + + wt = threading.Thread(target=self._wrthread, name='disk-write-thread') + wt.setDaemon(True) + wt.start() + self.wt = wt + + def fileno(self): + return self.wfd + + def close(self): + os.close(self.wfd) + # closing wfd should signal writer to stop + self.wt.join() + os.close(self.rfd) + os.close(self.cprfd) + os.close(self.cpwfd) + self.fd.close() + self.wt = None + + def checkpoint(self): + os.write(self.cpwfd, '1') + + def _wrthread(self): + while True: + r, o, e = select.select((self.rfd, self.cprfd), (), ()) + if self.rfd in r: + data = os.read(self.rfd, 256 * 1024) + if not data: + break + self.fd.write(data) + if self.cprfd in r: + junk = os.read(self.cprfd, 1) + self.round += 1 + self.fd = file('%s.%d' % (self.path, self.round), 'wb') + self.proxy(self.fd) + +class MigrationSocket(_proxy): + def __init__(self, address): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(address) + + sock.send("receive\n") + sock.recv(80) + + fd = os.fdopen(sock.fileno(), 'w+') + + self.sock = sock + super(MigrationSocket, self).__init__(fd) + +class Keepalive(object): + "Call a keepalive method at intervals" + def __init__(self, method, interval=0.1): + self.keepalive = method + self.interval = interval + + self.thread = None + self.running = False + + def start(self): + if not self.interval: + return + self.thread = threading.Thread(target=self.run, name='keepalive-thread') + self.thread.setDaemon(True) + self.running = True + self.thread.start() + + def stop(self): + if not self.thread: + return + self.running = False + self.thread.join() + self.thread = None + + def run(self): + while self.running: + self.keepalive() + time.sleep(self.interval) + self.keepalive(stop=True) + +class Saver(object): + def __init__(self, domid, fd, suspendcb=None, resumecb=None, + checkpointcb=None, interval=0): + """Create a Saver object for taking guest checkpoints. + domid: name, number or UUID of a running domain + fd: a stream to which checkpoint data will be written. + suspendcb: callback invoked after guest is suspended + resumecb: callback invoked before guest resumes + checkpointcb: callback invoked when a checkpoint is complete. Return + True to take another checkpoint, or False to stop. + """ + self.fd = fd + self.suspendcb = suspendcb + self.resumecb = resumecb + self.checkpointcb = checkpointcb + self.interval = interval + + self.vm = vm.VM(domid) + + self.checkpointer = None + + def start(self): + vm.getshadowmem(self.vm) + + hdr = image.makeheader(self.vm.dominfo) + self.fd.write(hdr) + self.fd.flush() + + self.checkpointer = xen.lowlevel.checkpoint.checkpointer() + try: + self.checkpointer.open(self.vm.domid) + self.checkpointer.start(self.fd, self.suspendcb, self.resumecb, + self.checkpointcb, self.interval) + self.checkpointer.close() + except xen.lowlevel.checkpoint.error, e: + raise CheckpointError(e) + + def _resume(self): + """low-overhead version of XendDomainInfo.resumeDomain""" + # TODO: currently assumes SUSPEND_CANCEL is available + if True: + xc.domain_resume(self.vm.domid, 1) + xsutil.ResumeDomain(self.vm.domid) + else: + server.xend.domain.resumeDomain(self.vm.domid) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/tapdisk.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/tapdisk.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,4 @@ +import blkdev + +class TapDisk(BlkDev): + pass diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/util.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/util.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,31 @@ +# utility functions + +import os, subprocess + +class PipeException(Exception): + def __init__(self, message, errno): + self.errno = errno + message = '%s: %d, %s' % (message, errno, os.strerror(errno)) + Exception.__init__(self, message) + +def canonifymac(mac): + return ':'.join(['%02x' % int(field, 16) for field in mac.split(':')]) + +def runcmd(args, cwd=None): + # TODO: stdin handling + if type(args) == str: + args = args.split(' ') + try: + proc = subprocess.Popen(args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, close_fds=True, + cwd=cwd) + stdout = proc.stdout.read() + stderr = proc.stderr.read() + proc.wait() + if proc.returncode: + print ' '.join(args) + print stderr.strip() + raise PipeException('%s failed' % args[0], proc.returncode) + return stdout + except (OSError, IOError), inst: + raise PipeException('could not run %s' % args[0], inst.errno) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vbd.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/vbd.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,9 @@ +import blkdev + +class VBD(blkdev.BlkDev): + def handles(self, **props): + uname = props.get('uname', '') + return uname.startswith('phy:') + handles = classmethod(handles) + +blkdev.register(VBD) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vdi.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/vdi.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,121 @@ +#code to play with vdis and snapshots + +import os + +def run(cmd): + fd = os.popen(cmd) + res = [l for l in fd if l.rstrip()] + return not fd.close(), res + + +_blockstore = '/blockstore.dat' + +def set_blockstore(blockstore): + global _blockstore + __blockstore = blockstore + + +class SnapShot: + def __init__(self, vdi, block, index): + self.__vdi = vdi + self.__block = block + self.__index = index + + #TODO add snapshot date and radix + + def __str__(self): + return '%d %d %d' % (self.__vdi.id(), self.__block, self.__index) + + def vdi(self): + return self.__vdi + + def block(self): + return self.__block + + def index(self): + return self.__index + + def match(self, block, index): + return self.__block == block and self.__index == index + + +class VDIException(Exception): + pass + + +class VDI: + def __init__(self, id, name): + self.__id = id + self.__name = name + + def __str__(self): + return 'vdi: %d %s' % (self.__id, self.__name) + + def id(self): + return self.__id + + def name(self): + return self.__name + + def list_snapshots(self): + res, ls = run('vdi_snap_list %s %d' % (_blockstore, self.__id)) + if res: + return [SnapShot(self, int(l[0]), int(l[1])) for l in [l.split() for l in ls[1:]]] + else: + raise VDIException("Error reading snapshot list") + + def snapshot(self): + res, ls = run('vdi_checkpoint %s %d' % (_blockstore, self.__id)) + if res: + _, block, idx = ls[0].split() + return SnapShot(self, int(block), int(idx)) + else: + raise VDIException("Error taking vdi snapshot") + + +def create(name, snap): + res, _ = run('vdi_create %s %s %d %d' + % (_blockstore, name, snap.block(), snap.index())) + if res: + return lookup_by_name(name) + else: + raise VDIException('Unable to create vdi from snapshot') + + +def fill(name, img_file): + res, _ = run('vdi_create %s %s' % (_blockstore, name)) + + if res: + vdi = lookup_by_name(name) + res, _ = run('vdi_fill %d %s' % (vdi.id(), img_file)) + if res: + return vdi + raise VDIException('Unable to create vdi from disk img file') + + +def list_vdis(): + vdis = [] + res, lines = run('vdi_list %s' % _blockstore) + if res: + for l in lines: + r = l.split() + vdis.append(VDI(int(r[0]), r[1])) + return vdis + else: + raise VDIException("Error doing vdi list") + + +def lookup_by_id(id): + vdis = list_vdis() + for v in vdis: + if v.id() == id: + return v + raise VDIException("No match from vdi id") + + +def lookup_by_name(name): + vdis = list_vdis() + for v in vdis: + if v.name() == name: + return v + raise VDIException("No match for vdi name") diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vif.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/vif.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,14 @@ +from xen.remus.util import canonifymac + +class VIF(object): + def __init__(self, **props): + self.__dict__.update(props) + if 'mac' in props: + self.mac = canonifymac(props['mac']) + + def __str__(self): + return self.mac + +def parse(props): + "turn a vm device dictionary into a vif object" + return VIF(**props) diff -r 64599a2d310d -r ea0e302362bb tools/python/xen/remus/vm.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/vm.py Fri Nov 13 15:33:37 2009 +0000 @@ -0,0 +1,156 @@ +#!/usr/bin/env python + +import xmlrpclib + +from xen.xend.XendClient import server +from xen.xend import sxp +# XXX XendDomain is voodoo to let balloon import succeed +from xen.xend import XendDomain, balloon + +import vif +import blkdev +# need a nicer way to load disk drivers +import vbd + +class VMException(Exception): pass + +class VM(object): + "Representation of a virtual machine" + def __init__(self, domid=None, dominfo=None): + self.dominfo = dominfo + + self.domid = -1 + self.name = 'unknown' + self.dom = {} + self.disks = [] + self.vifs = [] + + if domid: + try: + self.dominfo = server.xend.domain(domid, 'all') + except xmlrpclib.Fault: + raise VMException('error looking up domain %s' % str(domid)) + + if self.dominfo: + self.loaddominfo() + + def loaddominfo(self): + self.dom = parsedominfo(self.dominfo) + self.domid = self.dom['domid'] + self.name = self.dom['name'] + + self.disks = getdisks(self.dom) + self.vifs = getvifs(self.dom) + + def __str__(self): + return 'VM %d (%s), MACs: [%s], disks: [%s]' % \ + (self.domid, self.name, self.epoch, ', '.join(self.macs), + ', '.join([str(d) for d in self.disks])) + +def parsedominfo(dominfo): + "parses a dominfo sexpression in the form of python lists of lists" + def s2d(s): + r = {} + for elem in s: + if len(elem) == 0: + continue + name = elem[0] + if len(elem) == 1: + val = None + else: + val = elem[1] + if isinstance(val, list): + val = s2d(elem[1:]) + if isinstance(name, list): + # hack for ['cpus', [[1]]] + return s2d(elem) + if name in r: + for k, v in val.iteritems(): + if k in r[name]: + if not isinstance(r[name][k], list): + r[name][k] = [r[name][k]] + r[name][k].append(v) + else: + r[name][k] = v + else: + r[name] = val + return r + + return s2d(dominfo[1:]) + +def domtosxpr(dom): + "convert a dominfo into a python sxpr" + def d2s(d): + r = [] + for k, v in d.iteritems(): + elem = [k] + if isinstance(v, dict): + elem.extend(d2s(v)) + else: + if v is None: + v = '' + elem.append(v) + r.append(elem) + return r + + sxpr = ['domain'] + sxpr.extend(d2s(dom)) + return sxpr + +def strtosxpr(s): + "convert a string to a python sxpr" + p = sxp.Parser() + p.input(s) + return p.get_val() + +def sxprtostr(sxpr): + "convert an sxpr to string" + return sxp.to_string(sxpr) + +def getvifs(dom): + "return vif objects for devices in dom" + vifs = dom['device'].get('vif', []) + if type(vifs) != list: + vifs = [vifs] + + return [vif.parse(v) for v in vifs] + +def getdisks(dom): + "return block device objects for devices in dom" + disks = dom['device'].get('vbd', []) + if type(disks) != list: + disks = [disks] + + # tapdisk1 devices + tap1s = dom['device'].get('tap', []) + if type(tap1s) != list: + disks.append(tap1s) + else: + disks.extend(tap1s) + + # tapdisk2 devices + tap2s = dom['device'].get('tap2', []) + if type(tap2s) != list: + disks.append(tap2s) + else: + disks.extend(tap2s) + + return [blkdev.parse(disk) for disk in disks] + +def fromxend(domid): + "create a VM object from xend information" + return VM(domid) + +def getshadowmem(vm): + "Balloon down domain0 to create free memory for shadow paging." + maxmem = int(vm.dom['maxmem']) + shadow = int(vm.dom['shadow_memory']) + vcpus = int(vm.dom['vcpus']) + + # from XendDomainInfo.checkLiveMigrateMemory: + # 1MB per vcpu plus 4Kib/Mib of RAM. This is higher than + # the minimum that Xen would allocate if no value were given. + needed = vcpus * 1024 + maxmem * 4 - shadow * 1024 + if needed > 0: + print "Freeing %d kB for shadow mode" % needed + balloon.free(needed, vm.dominfo) _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |