[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] [xen-unstable] Remus: move device handling into its own module



# HG changeset patch
# User Keir Fraser <keir.fraser@xxxxxxxxxx>
# Date 1272962063 -3600
# Node ID 321dddf767e2460458313d407fb45e0911f002aa
# Parent  8559e324941fe191067c45ff7c3d20969ec7d141
Remus: move device handling into its own module

Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
---
 tools/python/xen/remus/device.py |  140 +++++++++++++++++++++++++++++++++++++++
 tools/remus/remus                |  138 --------------------------------------
 2 files changed, 143 insertions(+), 135 deletions(-)

diff -r 8559e324941f -r 321dddf767e2 tools/python/xen/remus/device.py
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/python/xen/remus/device.py  Tue May 04 09:34:23 2010 +0100
@@ -0,0 +1,140 @@
+# Remus device interface
+#
+# Coordinates with devices at suspend, resume, and commit hooks
+
+import os
+
+import netlink, qdisc, util
+
+class CheckpointedDevice(object):
+    'Base class for buffered devices'
+
+    def postsuspend(self):
+        'called after guest has suspended'
+        pass
+
+    def preresume(self):
+        'called before guest resumes'
+        pass
+
+    def commit(self):
+        'called when backup has acknowledged checkpoint reception'
+        pass
+
+class ReplicatedDiskException(Exception): pass
+
+class ReplicatedDisk(CheckpointedDevice):
+    """
+    Send a checkpoint message to a replicated disk while the domain
+    is paused between epochs.
+    """
+    FIFODIR = '/var/run/tap'
+
+    def __init__(self, disk):
+        # look up disk, make sure it is tap:buffer, and set up socket
+        # to request commits.
+        self.ctlfd = None
+
+        if not disk.uname.startswith('tap:remus:') and not 
disk.uname.startswith('tap:tapdisk:remus:'):
+            raise ReplicatedDiskException('Disk is not replicated: %s' %
+                                        str(disk))
+        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', 
'_')
+        absfifo = os.path.join(self.FIFODIR, fifo)
+        absmsgfifo = absfifo + '.msg'
+
+        self.installed = False
+        self.ctlfd = open(absfifo, 'w+b')
+        self.msgfd = open(absmsgfifo, 'r+b')
+
+    def __del__(self):
+        self.uninstall()
+
+    def uninstall(self):
+        if self.ctlfd:
+            self.ctlfd.close()
+            self.ctlfd = None
+
+    def postsuspend(self):
+        os.write(self.ctlfd.fileno(), 'flush')
+
+    def commit(self):
+        msg = os.read(self.msgfd.fileno(), 4)
+        if msg != 'done':
+            print 'Unknown message: %s' % msg
+
+class BufferedNICException(Exception): pass
+
+class BufferedNIC(CheckpointedDevice):
+    """
+    Buffer a protected domain's network output between rounds so that
+    nothing is issued that a failover might not know about.
+    """
+    # shared rtnetlink handle
+    rth = None
+
+    def __init__(self, domid):
+        self.installed = False
+
+        if not self.rth:
+            self.rth = netlink.rtnl()
+
+        self.devname = self._startimq(domid)
+        dev = self.rth.getlink(self.devname)
+        if not dev:
+            raise BufferedNICException('could not find device %s' % 
self.devname)
+        self.dev = dev['index']
+        self.handle = qdisc.TC_H_ROOT
+        self.q = qdisc.QueueQdisc()
+
+    def __del__(self):
+        self.uninstall()
+
+    def postsuspend(self):
+        if not self.installed:
+            self._setup()
+
+        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
+
+    def commit(self):
+        '''Called when checkpoint has been acknowledged by
+        the backup'''
+        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
+
+    def _sendqmsg(self, action):
+        self.q.action = action
+        req = qdisc.changerequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+
+    def _setup(self):
+        q = self.rth.getqdisc(self.dev)
+        if q:
+            if q['kind'] == 'queue':
+                self.installed = True
+                return
+            if q['kind'] != 'pfifo_fast':
+                raise BufferedNICException('there is already a queueing '
+                                           'discipline on %s' % self.devname)
+
+        print 'installing buffer on %s' % self.devname
+        req = qdisc.addrequest(self.dev, self.handle, self.q)
+        self.rth.talk(req.pack())
+        self.installed = True
+
+    def uninstall(self):
+        if self.installed:
+            req = qdisc.delrequest(self.dev, self.handle)
+            self.rth.talk(req.pack())
+            self.installed = False
+
+    def _startimq(self, domid):
+        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
+        imqebt = '/usr/lib/xen/bin/imqebt'
+        imqdev = 'imq0'
+        vid = 'vif%d.0' % domid
+        for mod in ['sch_queue', 'imq', 'ebt_imq']:
+            util.runcmd(['modprobe', mod])
+        util.runcmd("ip link set %s up" % (imqdev))
+        util.runcmd("%s -F FORWARD" % (imqebt))
+        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, 
imqdev))
+
+        return imqdev
diff -r 8559e324941f -r 321dddf767e2 tools/remus/remus
--- a/tools/remus/remus Tue May 04 09:31:13 2010 +0100
+++ b/tools/remus/remus Tue May 04 09:34:23 2010 +0100
@@ -7,9 +7,10 @@
 # TODO: fencing.
 
 import optparse, os, re, select, signal, sys, time
-from xen.remus import save, vm
+
+from xen.remus import save, util, vm
+from xen.remus.device import ReplicatedDisk, BufferedNIC
 from xen.xend import XendOptions
-from xen.remus import netlink, qdisc, util
 
 class CfgException(Exception): pass
 
@@ -57,139 +58,6 @@ class Cfg(object):
         self.domid = args[0]
         if (len(args) > 1):
             self.host = args[1]
-
-class ReplicatedDiskException(Exception): pass
-
-class BufferedDevice(object):
-    'Base class for buffered devices'
-
-    def postsuspend(self):
-        'called after guest has suspended'
-        pass
-
-    def preresume(self):
-        'called before guest resumes'
-        pass
-
-    def commit(self):
-        'called when backup has acknowledged checkpoint reception'
-        pass
-
-class ReplicatedDisk(BufferedDevice):
-    """
-    Send a checkpoint message to a replicated disk while the domain
-    is paused between epochs.
-    """
-    FIFODIR = '/var/run/tap'
-
-    def __init__(self, disk):
-        # look up disk, make sure it is tap:buffer, and set up socket
-        # to request commits.
-        self.ctlfd = None
-
-        if not disk.uname.startswith('tap:remus:') and not 
disk.uname.startswith('tap:tapdisk:remus:'):
-            raise ReplicatedDiskException('Disk is not replicated: %s' %
-                                        str(disk))
-        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', 
'_')
-        absfifo = os.path.join(self.FIFODIR, fifo)
-        absmsgfifo = absfifo + '.msg'
-
-        self.installed = False
-        self.ctlfd = open(absfifo, 'w+b')
-        self.msgfd = open(absmsgfifo, 'r+b')
-
-    def __del__(self):
-        self.uninstall()
-
-    def uninstall(self):
-        if self.ctlfd:
-            self.ctlfd.close()
-            self.ctlfd = None
-
-    def postsuspend(self):
-        os.write(self.ctlfd.fileno(), 'flush')
-
-    def commit(self):
-        msg = os.read(self.msgfd.fileno(), 4)
-        if msg != 'done':
-            print 'Unknown message: %s' % msg
-
-class NetbufferException(Exception): pass
-
-class Netbuffer(BufferedDevice):
-    """
-    Buffer a protected domain's network output between rounds so that
-    nothing is issued that a failover might not know about.
-    """
-    # shared rtnetlink handle
-    rth = None
-
-    def __init__(self, domid):
-        self.installed = False
-
-        if not self.rth:
-            self.rth = netlink.rtnl()
-
-        self.devname = self._startimq(domid)
-        dev = self.rth.getlink(self.devname)
-        if not dev:
-            raise NetbufferException('could not find device %s' % self.devname)
-        self.dev = dev['index']
-        self.handle = qdisc.TC_H_ROOT
-        self.q = qdisc.QueueQdisc()
-
-    def __del__(self):
-        self.uninstall()
-
-    def postsuspend(self):
-        if not self.installed:
-            self._setup()
-
-        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
-
-    def commit(self):
-        '''Called when checkpoint has been acknowledged by
-        the backup'''
-        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
-
-    def _sendqmsg(self, action):
-        self.q.action = action
-        req = qdisc.changerequest(self.dev, self.handle, self.q)
-        self.rth.talk(req.pack())
-
-    def _setup(self):
-        q = self.rth.getqdisc(self.dev)
-        if q:
-            if q['kind'] == 'queue':
-                self.installed = True
-                return
-            if q['kind'] != 'pfifo_fast':
-                raise NetbufferException('there is already a queueing '
-                                         'discipline on %s' % self.devname)
-
-        print 'installing buffer on %s' % self.devname
-        req = qdisc.addrequest(self.dev, self.handle, self.q)
-        self.rth.talk(req.pack())
-        self.installed = True
-
-    def uninstall(self):
-        if self.installed:
-            req = qdisc.delrequest(self.dev, self.handle)
-            self.rth.talk(req.pack())
-            self.installed = False
-
-    def _startimq(self, domid):
-        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
-        imqebt = '/usr/lib/xen/bin/imqebt'
-        imqdev = 'imq0'
-        vid = 'vif%d.0' % domid
-        for mod in ['sch_queue', 'imq', 'ebt_imq']:
-            util.runcmd(['modprobe', mod])
-        util.runcmd("ip link set %s up" % (imqdev))
-        util.runcmd("%s -F FORWARD" % (imqebt))
-        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, 
imqdev))
-
-        return imqdev
 
 class SignalException(Exception): pass
 

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.