[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC] native python Xenstore module



The current Python interface to Xenstore is just a thin binding to the
C libxenstore library. This means that it is architecture-specific and
makes it awkward to use in platform-independent code like the XenServer
guest agent.

The Xenstore protocol is simple and quite straightforward to implement
natively in Python.


Design & Implementation

Users create an instance of the Xenstore object. This is a singleton
that holds all the global state (xenbus fd etc).

Users invoke methods e.g. read() that block awaiting a response from
xenstored. Methods return either the result or throw XenstoreException
with a string representation of the error e.g. 'ENOENT'.

The methods attempt to present complex results using native datatypes
(lists, dictionaries) rather the usual Xenstore string values e.g.

    {6: 'read-write', 9: 'none'}

rather than

    'b6n9'


Transactions are managed using instances of the Transaction object. Each
instance must be utilized thus, either

    t.start()
    # zero or more operations
    t.commit()

or

    t.start()
    # zero or more operations
    t.abort()


Users should handle commit() raising XenstoreException('EAGAIN')
appropriately, e.g. replaying the whole start(), operations, commit()
sequence.

Watches are complicated because they are not synchronous. When the first
watch is registered a background thread is started. This thread takes
responsibility for the reading of all data from the xenbus fd. Responses
for operations are forwarded to the foreground thread using a Queue.

Users of watch() supply the path to watch, a callback function and
(optionally) parameters. The callback function is invoked with the path
and the optional parameters. watch() returns a string token that is
passed to unwatch() to remove a watch.


#!/bin/env python

import os
import Queue
import select
import struct
import tempfile
import threading

class XenstoreException(StandardError):
    pass

"""
Background thread that reads from xenbus, acting on watch events and forwarding
operation responses to the foreground thread.
"""
def handle_watches(xs):
    while not xs.shutdown_event.isSet():
        r, _, __ = select.select([xs.pipe_r, xs.xb_fd], [], [])
        if xs.xb_fd in r:
            # read and unpack header
            hdr = xs.xb_fd.read(16)
            op, _, __, l = struct.unpack('=IIII', hdr)
            value = None
            if l > 0:
                value = xs.xb_fd.read(l)
            if op == xs.XS_WATCH_EVENT:
                path, token, _ = value.split('\0', 2)
                if token in xs.watches:
                    # invoke user callback
xs.watches[token]['cb'](path, *xs.watches[token]['cbargs'])
            else:
                # response to an operation, handle in main thread
                xs.queue.put(hdr)
                if value:
                    xs.queue.put(value)

class Xenstore(object):
    """
    Parent Xenstore handle (singleton).
    """

    # from xen/include/public/io/xs_wire.h
    XS_READ = 2
    XS_WATCH = 4
    XS_GET_PERMS = 3
    XS_UNWATCH = 5
    XS_TRANSACTION_START = 6
    XS_TRANSACTION_END = 7
    XS_WRITE = 11
    XS_MKDIR = 12
    XS_RM = 13
    XS_SET_PERMS = 14
    XS_WATCH_EVENT = 15
    XS_ERROR = 16

    if os.uname()[0] == 'Linux':
        DEV_PATH = '/proc/xen/xenbus'
    elif os.uname()[0] == 'NetBSD':
        DEV_PATH = '/kern/xen/xenbus'
    else:
        DEV_PATH = '/dev/xen/xenbus'

    PERM_NONE, PERM_READ, PERM_WRITE, PERM_READ_WRITE = range(4)

    __single = None

    def __new__(classtype, *args, **kwargs):
        if classtype != type(classtype.__single):
            classtype.__single = object.__new__(classtype, *args, **kwargs)
        return classtype.__single

    def __init__(self):
        self.xb_fd = open(self.DEV_PATH, 'r+', 0)
        self.watch_thread = None
        self.queue = None
        self.watches = {}

    def __del__(self):
        self.xb_fd.close()

    def do_op(self, ctxt, op, value = '', req = 0):
        """
        Perform Xenstore operation $op and return response.
        """

        if ctxt:
            tx_id = ctxt.tx_id
        else:
            tx_id = 0

        ret = None
        self.xb_fd.write(struct.pack('=IIII', op, req, tx_id, len(value)))
        if len(value):
            self.xb_fd.write(value)
        if self.queue:
            hdr = self.queue.get()
        else:
            hdr = self.xb_fd.read(16)
        r_op, req, tx, l = struct.unpack('=IIII', hdr)
        if l > 0:
            if self.queue:
                ret = self.queue.get()
            else:
                ret = self.xb_fd.read(l)
            if r_op == self.XS_ERROR:
                raise XenstoreException, ret[:-1]
        return ret

    def _read(self, ctxt, *vars):
        d = {}
        for var in vars:
            d[var] = self.do_op(ctxt, self.XS_READ, var + '\0')
            if len(vars) == 1:
                return d[var]
        return d

    def read(self, *vars):
        """
Return a dictionary of Xenstore values for $vars (or a string if only a
        single key is passed in).
        """

        return self._read(None, *vars)

    def _mkdir(self, ctxt, *vars):
        for var in vars:
            self.do_op(ctxt, self.XS_MKDIR, var + '\0')

    def mkdir(self, *vars):
        """
        Create empty directories in Xenstore.
        """

        self._mkdir(None, *vars)

    def _rm(self, ctxt, *vars):
        for var in vars:
            self.do_op(ctxt, self.XS_RM, var + '\0')

    def rm(self, *vars):
        """
        Remove all keys in $vars from Xenstore.
        """
        self._rm(None, *vars)

    def write(self, vars, ctxt = None):
        """
        Update Xenstore with the keys and values in dictionary $vars.
        """

        for k, v in vars.items():
            self.do_op(ctxt, self.XS_WRITE, k + '\0' + v)

perm_map = {'n': PERM_NONE, 'r': PERM_READ, 'w': PERM_WRITE, 'b': PERM_READ_WRITE} rev_perm_map = {PERM_NONE: 'n', PERM_READ: 'r', PERM_WRITE: 'w', PERM_READ_WRITE: 'b'}

    def _get_permissions(self, ctxt, *paths):
        d = {}
        for path in paths:
            t = self.do_op(ctxt, self.XS_GET_PERMS, path + '\0')
            perm_d = {}
            for e in t[:-1].split('\0'):
                perm_d[int(e[1:])] = self.perm_map[e[0]]
            d[path] = perm_d
            if len(paths) == 1:
                return d[path]
        return d

    def get_permissions(self, *paths):
        """
        Return a list of permission dictionaries (or a single if only a
        single key is passed in.
        """
        return self._get_permissions(None, *paths)

    def set_permissions(self, path, perms, ctxt = None):
        """
        Set the permissions of $path based on the dictionary $perms.
        """
        perm_l = []
        for k, v in perms.items():
            perm_l.append("%c%d" % (self.rev_perm_map.get(v, 'n'), k))
self.do_op(None, self.XS_SET_PERMS, path + '\0' + '\0'.join(perm_l) + '\0')

    def watch(self, path, callback, *callback_args):
        """
Watch $path (and its subordinates), invoking $callback($path, $callback_args)
        in a background thread.

        Returns a token to be used by unwatch().
        """
        if not self.watch_thread:
self.watch_thread = threading.Thread(target=handle_watches, args=(self,))
            self.watch_thread.setDaemon(True)
            self.shutdown_event = threading.Event()
            self.queue = Queue.Queue(2)
            self.pipe_r, self.pipe_w = os.pipe()
            self.watches = {}
            self.watch_thread.start()

        fh = tempfile.NamedTemporaryFile(prefix = 'xs')
        token = fh.name
self.watches[token] = {'fh': fh, 'path': path, 'cb': callback, 'cbargs': callback_args}
        self.do_op(None, self.XS_WATCH, path + '\0' + token + '\0')
        return token

    def unwatch(self, path, token):
        """
Stop watching the path monitored by the watch() call that returned $token.
        """
        if token in self.watches:
            self.do_op(None, self.XS_UNWATCH, path + '\0' + token + '\0')
            self.watches[token]['fh'].close()
            del self.watches[token]

    def unwatch_all(self):
        for k, v in self.watches.items():
            self.unwatch(v['path'], k)

    def watch_stop(self):
        if self.watch_thread:
            self.shutdown_event.set()
            os.write(self.pipe_w, 'x')
            self.watch_thread.join()
            self.watch_thread = None
            self.queue = None
            os.close(self.pipe_r)
            os.close(self.pipe_w)

class Transaction(object):
    """
    Xenstore transaction instance.
    """

    def __init__(self):
        self.xs = Xenstore()
        self.tx_id = 0

    def start(self):
        """
        Start a new transaction.
        """

        self.tx_id = 0
self.tx_id = int(self.xs.do_op(self, self.xs.XS_TRANSACTION_START, '\0')[:-1])

    def commit(self):
        """
        Commit all modifications in this transaction to Xenstore.
        """

        self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'T\0')

    def abort(self):
        """
        Discard all modifications in this transaction.
        """

        self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'F\0')

    def read(self, *vars):
        return self.xs._read(self, *vars)

    def mkdir(self, *vars):
        self.xs._mkdir(self, *vars)

    def rm(self, *vars):
        self.xs._read(self, *vars)

    def write(self, vars):
        self.xs.write(vars, self)

    def get_permissions(self, *paths):
        return self.xs_get_permissions(self, *paths)

    def set_permissions(self, path, perms):
        self.xs.set_permissions(perms, self)


# example to code to dev test

if __name__ == '__main__':
    def watch_cb(key, n):
        print "Watch fired:", key, n

    repeat_count = 5

    # create xenstore handle
    xs = Xenstore()
    # read three keys, returns three values
    print xs.read('domid', 'vm', 'name')
    # write a key/value
    xs.write({'new': 'stuff'})
    # verify contents were written
    assert xs.read('new') == 'stuff'
    print xs.get_permissions('new')
xs.set_permissions('new', {0: xs.PERM_READ_WRITE, os.getpid(): xs.PERM_READ})
    print xs.get_permissions('new')
    xs.mkdir('another-new')
    # write another key
    xs.write({'new/path': 'more stuff'})
    xs.read('new/path')
    # delete key
    xs.rm('new/path')
    # attempt to read deleted key
    try:
        print xs.read('new/path')
    except XenstoreException, e:
        print "Failed to read (expected)", e

    # create and start transaction
    tx = Transaction()
    tx.start()
    tx.write({'tnew': 'committed'})
    # commit and verify value is updated
    tx.commit()
    assert xs.read('tnew') == 'committed'

    # start new transaction
    tx.start()
    tx.write({'tnew': 'aborted'})
    # abort and verify value is NOT updated
    tx.abort()
    assert xs.read('tnew') != 'aborted'

    # watch key
    xs.watch('new', watch_cb, 7)

    d = {}
# repeatedly run a large transaction to cause collisions between two program instances
    for n in range(1, 200):
        d["collision/key"+str(n)] = "value"+str(n)
    while True:
        try:
            tx.start()
            tx.write(d)
            tx.commit()
        except XenstoreException, e:
            if str(e) == 'EAGAIN':
                repeat_count -= 1
                if repeat_count == 0:
                    break
                print "Collision on commit, repeating"
            else:
                print "Unexpected Xenstore error", e
                break

    print "End of test"
    xs.unwatch_all()
    # stop background watch thread
    xs.watch_stop()


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.