[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] [xen-unstable] Remus: move device handling into its own module
# HG changeset patch # User Keir Fraser <keir.fraser@xxxxxxxxxx> # Date 1272962063 -3600 # Node ID 321dddf767e2460458313d407fb45e0911f002aa # Parent 8559e324941fe191067c45ff7c3d20969ec7d141 Remus: move device handling into its own module Signed-off-by: Brendan Cully <brendan@xxxxxxxxx> --- tools/python/xen/remus/device.py | 140 +++++++++++++++++++++++++++++++++++++++ tools/remus/remus | 138 -------------------------------------- 2 files changed, 143 insertions(+), 135 deletions(-) diff -r 8559e324941f -r 321dddf767e2 tools/python/xen/remus/device.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/python/xen/remus/device.py Tue May 04 09:34:23 2010 +0100 @@ -0,0 +1,140 @@ +# Remus device interface +# +# Coordinates with devices at suspend, resume, and commit hooks + +import os + +import netlink, qdisc, util + +class CheckpointedDevice(object): + 'Base class for buffered devices' + + def postsuspend(self): + 'called after guest has suspended' + pass + + def preresume(self): + 'called before guest resumes' + pass + + def commit(self): + 'called when backup has acknowledged checkpoint reception' + pass + +class ReplicatedDiskException(Exception): pass + +class ReplicatedDisk(CheckpointedDevice): + """ + Send a checkpoint message to a replicated disk while the domain + is paused between epochs. + """ + FIFODIR = '/var/run/tap' + + def __init__(self, disk): + # look up disk, make sure it is tap:buffer, and set up socket + # to request commits. + self.ctlfd = None + + if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): + raise ReplicatedDiskException('Disk is not replicated: %s' % + str(disk)) + fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') + absfifo = os.path.join(self.FIFODIR, fifo) + absmsgfifo = absfifo + '.msg' + + self.installed = False + self.ctlfd = open(absfifo, 'w+b') + self.msgfd = open(absmsgfifo, 'r+b') + + def __del__(self): + self.uninstall() + + def uninstall(self): + if self.ctlfd: + self.ctlfd.close() + self.ctlfd = None + + def postsuspend(self): + os.write(self.ctlfd.fileno(), 'flush') + + def commit(self): + msg = os.read(self.msgfd.fileno(), 4) + if msg != 'done': + print 'Unknown message: %s' % msg + +class BufferedNICException(Exception): pass + +class BufferedNIC(CheckpointedDevice): + """ + Buffer a protected domain's network output between rounds so that + nothing is issued that a failover might not know about. + """ + # shared rtnetlink handle + rth = None + + def __init__(self, domid): + self.installed = False + + if not self.rth: + self.rth = netlink.rtnl() + + self.devname = self._startimq(domid) + dev = self.rth.getlink(self.devname) + if not dev: + raise BufferedNICException('could not find device %s' % self.devname) + self.dev = dev['index'] + self.handle = qdisc.TC_H_ROOT + self.q = qdisc.QueueQdisc() + + def __del__(self): + self.uninstall() + + def postsuspend(self): + if not self.installed: + self._setup() + + self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) + + def commit(self): + '''Called when checkpoint has been acknowledged by + the backup''' + self._sendqmsg(qdisc.TC_QUEUE_RELEASE) + + def _sendqmsg(self, action): + self.q.action = action + req = qdisc.changerequest(self.dev, self.handle, self.q) + self.rth.talk(req.pack()) + + def _setup(self): + q = self.rth.getqdisc(self.dev) + if q: + if q['kind'] == 'queue': + self.installed = True + return + if q['kind'] != 'pfifo_fast': + raise BufferedNICException('there is already a queueing ' + 'discipline on %s' % self.devname) + + print 'installing buffer on %s' % self.devname + req = qdisc.addrequest(self.dev, self.handle, self.q) + self.rth.talk(req.pack()) + self.installed = True + + def uninstall(self): + if self.installed: + req = qdisc.delrequest(self.dev, self.handle) + self.rth.talk(req.pack()) + self.installed = False + + def _startimq(self, domid): + # stopgap hack to set up IMQ for an interface. Wrong in many ways. + imqebt = '/usr/lib/xen/bin/imqebt' + imqdev = 'imq0' + vid = 'vif%d.0' % domid + for mod in ['sch_queue', 'imq', 'ebt_imq']: + util.runcmd(['modprobe', mod]) + util.runcmd("ip link set %s up" % (imqdev)) + util.runcmd("%s -F FORWARD" % (imqebt)) + util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) + + return imqdev diff -r 8559e324941f -r 321dddf767e2 tools/remus/remus --- a/tools/remus/remus Tue May 04 09:31:13 2010 +0100 +++ b/tools/remus/remus Tue May 04 09:34:23 2010 +0100 @@ -7,9 +7,10 @@ # TODO: fencing. import optparse, os, re, select, signal, sys, time -from xen.remus import save, vm + +from xen.remus import save, util, vm +from xen.remus.device import ReplicatedDisk, BufferedNIC from xen.xend import XendOptions -from xen.remus import netlink, qdisc, util class CfgException(Exception): pass @@ -57,139 +58,6 @@ class Cfg(object): self.domid = args[0] if (len(args) > 1): self.host = args[1] - -class ReplicatedDiskException(Exception): pass - -class BufferedDevice(object): - 'Base class for buffered devices' - - def postsuspend(self): - 'called after guest has suspended' - pass - - def preresume(self): - 'called before guest resumes' - pass - - def commit(self): - 'called when backup has acknowledged checkpoint reception' - pass - -class ReplicatedDisk(BufferedDevice): - """ - Send a checkpoint message to a replicated disk while the domain - is paused between epochs. - """ - FIFODIR = '/var/run/tap' - - def __init__(self, disk): - # look up disk, make sure it is tap:buffer, and set up socket - # to request commits. - self.ctlfd = None - - if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): - raise ReplicatedDiskException('Disk is not replicated: %s' % - str(disk)) - fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') - absfifo = os.path.join(self.FIFODIR, fifo) - absmsgfifo = absfifo + '.msg' - - self.installed = False - self.ctlfd = open(absfifo, 'w+b') - self.msgfd = open(absmsgfifo, 'r+b') - - def __del__(self): - self.uninstall() - - def uninstall(self): - if self.ctlfd: - self.ctlfd.close() - self.ctlfd = None - - def postsuspend(self): - os.write(self.ctlfd.fileno(), 'flush') - - def commit(self): - msg = os.read(self.msgfd.fileno(), 4) - if msg != 'done': - print 'Unknown message: %s' % msg - -class NetbufferException(Exception): pass - -class Netbuffer(BufferedDevice): - """ - Buffer a protected domain's network output between rounds so that - nothing is issued that a failover might not know about. - """ - # shared rtnetlink handle - rth = None - - def __init__(self, domid): - self.installed = False - - if not self.rth: - self.rth = netlink.rtnl() - - self.devname = self._startimq(domid) - dev = self.rth.getlink(self.devname) - if not dev: - raise NetbufferException('could not find device %s' % self.devname) - self.dev = dev['index'] - self.handle = qdisc.TC_H_ROOT - self.q = qdisc.QueueQdisc() - - def __del__(self): - self.uninstall() - - def postsuspend(self): - if not self.installed: - self._setup() - - self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) - - def commit(self): - '''Called when checkpoint has been acknowledged by - the backup''' - self._sendqmsg(qdisc.TC_QUEUE_RELEASE) - - def _sendqmsg(self, action): - self.q.action = action - req = qdisc.changerequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - - def _setup(self): - q = self.rth.getqdisc(self.dev) - if q: - if q['kind'] == 'queue': - self.installed = True - return - if q['kind'] != 'pfifo_fast': - raise NetbufferException('there is already a queueing ' - 'discipline on %s' % self.devname) - - print 'installing buffer on %s' % self.devname - req = qdisc.addrequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - self.installed = True - - def uninstall(self): - if self.installed: - req = qdisc.delrequest(self.dev, self.handle) - self.rth.talk(req.pack()) - self.installed = False - - def _startimq(self, domid): - # stopgap hack to set up IMQ for an interface. Wrong in many ways. - imqebt = '/usr/lib/xen/bin/imqebt' - imqdev = 'imq0' - vid = 'vif%d.0' % domid - for mod in ['sch_queue', 'imq', 'ebt_imq']: - util.runcmd(['modprobe', mod]) - util.runcmd("ip link set %s up" % (imqdev)) - util.runcmd("%s -F FORWARD" % (imqebt)) - util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) - - return imqdev class SignalException(Exception): pass _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |