[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC] native python Xenstore module
The current Python interface to Xenstore is just a thin binding to the C libxenstore library. This means that it is architecture-specific and makes it awkward to use in platform-independent code like the XenServer guest agent. The Xenstore protocol is simple and quite straightforward to implement natively in Python. Design & Implementation Users create an instance of the Xenstore object. This is a singleton that holds all the global state (xenbus fd etc). Users invoke methods e.g. read() that block awaiting a response from xenstored. Methods return either the result or throw XenstoreException with a string representation of the error e.g. 'ENOENT'. The methods attempt to present complex results using native datatypes (lists, dictionaries) rather the usual Xenstore string values e.g. {6: 'read-write', 9: 'none'} rather than 'b6n9' Transactions are managed using instances of the Transaction object. Each instance must be utilized thus, either t.start() # zero or more operations t.commit() or t.start() # zero or more operations t.abort() Users should handle commit() raising XenstoreException('EAGAIN') appropriately, e.g. replaying the whole start(), operations, commit() sequence. Watches are complicated because they are not synchronous. When the first watch is registered a background thread is started. This thread takes responsibility for the reading of all data from the xenbus fd. Responses for operations are forwarded to the foreground thread using a Queue. Users of watch() supply the path to watch, a callback function and (optionally) parameters. The callback function is invoked with the path and the optional parameters. watch() returns a string token that is passed to unwatch() to remove a watch. #!/bin/env python import os import Queue import select import struct import tempfile import threading class XenstoreException(StandardError): pass """Background thread that reads from xenbus, acting on watch events and forwarding operation responses to the foreground thread. """ def handle_watches(xs): while not xs.shutdown_event.isSet(): r, _, __ = select.select([xs.pipe_r, xs.xb_fd], [], []) if xs.xb_fd in r: # read and unpack header hdr = xs.xb_fd.read(16) op, _, __, l = struct.unpack('=IIII', hdr) value = None if l > 0: value = xs.xb_fd.read(l) if op == xs.XS_WATCH_EVENT: path, token, _ = value.split('\0', 2) if token in xs.watches: # invoke user callbackxs.watches[token]['cb'](path, *xs.watches[token]['cbargs']) else: # response to an operation, handle in main thread xs.queue.put(hdr) if value: xs.queue.put(value) class Xenstore(object): """ Parent Xenstore handle (singleton). """ # from xen/include/public/io/xs_wire.h XS_READ = 2 XS_WATCH = 4 XS_GET_PERMS = 3 XS_UNWATCH = 5 XS_TRANSACTION_START = 6 XS_TRANSACTION_END = 7 XS_WRITE = 11 XS_MKDIR = 12 XS_RM = 13 XS_SET_PERMS = 14 XS_WATCH_EVENT = 15 XS_ERROR = 16 if os.uname()[0] == 'Linux': DEV_PATH = '/proc/xen/xenbus' elif os.uname()[0] == 'NetBSD': DEV_PATH = '/kern/xen/xenbus' else: DEV_PATH = '/dev/xen/xenbus' PERM_NONE, PERM_READ, PERM_WRITE, PERM_READ_WRITE = range(4) __single = None def __new__(classtype, *args, **kwargs): if classtype != type(classtype.__single): classtype.__single = object.__new__(classtype, *args, **kwargs) return classtype.__single def __init__(self): self.xb_fd = open(self.DEV_PATH, 'r+', 0) self.watch_thread = None self.queue = None self.watches = {} def __del__(self): self.xb_fd.close() def do_op(self, ctxt, op, value = '', req = 0): """ Perform Xenstore operation $op and return response. """ if ctxt: tx_id = ctxt.tx_id else: tx_id = 0 ret = None self.xb_fd.write(struct.pack('=IIII', op, req, tx_id, len(value))) if len(value): self.xb_fd.write(value) if self.queue: hdr = self.queue.get() else: hdr = self.xb_fd.read(16) r_op, req, tx, l = struct.unpack('=IIII', hdr) if l > 0: if self.queue: ret = self.queue.get() else: ret = self.xb_fd.read(l) if r_op == self.XS_ERROR: raise XenstoreException, ret[:-1] return ret def _read(self, ctxt, *vars): d = {} for var in vars: d[var] = self.do_op(ctxt, self.XS_READ, var + '\0') if len(vars) == 1: return d[var] return d def read(self, *vars): """Return a dictionary of Xenstore values for $vars (or a string if only a single key is passed in). """ return self._read(None, *vars) def _mkdir(self, ctxt, *vars): for var in vars: self.do_op(ctxt, self.XS_MKDIR, var + '\0') def mkdir(self, *vars): """ Create empty directories in Xenstore. """ self._mkdir(None, *vars) def _rm(self, ctxt, *vars): for var in vars: self.do_op(ctxt, self.XS_RM, var + '\0') def rm(self, *vars): """ Remove all keys in $vars from Xenstore. """ self._rm(None, *vars) def write(self, vars, ctxt = None): """ Update Xenstore with the keys and values in dictionary $vars. """ for k, v in vars.items(): self.do_op(ctxt, self.XS_WRITE, k + '\0' + v)perm_map = {'n': PERM_NONE, 'r': PERM_READ, 'w': PERM_WRITE, 'b': PERM_READ_WRITE} rev_perm_map = {PERM_NONE: 'n', PERM_READ: 'r', PERM_WRITE: 'w', PERM_READ_WRITE: 'b'} def _get_permissions(self, ctxt, *paths): d = {} for path in paths: t = self.do_op(ctxt, self.XS_GET_PERMS, path + '\0') perm_d = {} for e in t[:-1].split('\0'): perm_d[int(e[1:])] = self.perm_map[e[0]] d[path] = perm_d if len(paths) == 1: return d[path] return d def get_permissions(self, *paths): """ Return a list of permission dictionaries (or a single if only a single key is passed in. """ return self._get_permissions(None, *paths) def set_permissions(self, path, perms, ctxt = None): """ Set the permissions of $path based on the dictionary $perms. """ perm_l = [] for k, v in perms.items(): perm_l.append("%c%d" % (self.rev_perm_map.get(v, 'n'), k))self.do_op(None, self.XS_SET_PERMS, path + '\0' + '\0'.join(perm_l) + '\0') def watch(self, path, callback, *callback_args): """Watch $path (and its subordinates), invoking $callback($path, $callback_args) in a background thread. Returns a token to be used by unwatch(). """ if not self.watch_thread:self.watch_thread = threading.Thread(target=handle_watches, args=(self,)) self.watch_thread.setDaemon(True) self.shutdown_event = threading.Event() self.queue = Queue.Queue(2) self.pipe_r, self.pipe_w = os.pipe() self.watches = {} self.watch_thread.start() fh = tempfile.NamedTemporaryFile(prefix = 'xs') token = fh.nameself.watches[token] = {'fh': fh, 'path': path, 'cb': callback, 'cbargs': callback_args} self.do_op(None, self.XS_WATCH, path + '\0' + token + '\0') return token def unwatch(self, path, token): """Stop watching the path monitored by the watch() call that returned $token. """ if token in self.watches: self.do_op(None, self.XS_UNWATCH, path + '\0' + token + '\0') self.watches[token]['fh'].close() del self.watches[token] def unwatch_all(self): for k, v in self.watches.items(): self.unwatch(v['path'], k) def watch_stop(self): if self.watch_thread: self.shutdown_event.set() os.write(self.pipe_w, 'x') self.watch_thread.join() self.watch_thread = None self.queue = None os.close(self.pipe_r) os.close(self.pipe_w) class Transaction(object): """ Xenstore transaction instance. """ def __init__(self): self.xs = Xenstore() self.tx_id = 0 def start(self): """ Start a new transaction. """ self.tx_id = 0self.tx_id = int(self.xs.do_op(self, self.xs.XS_TRANSACTION_START, '\0')[:-1]) def commit(self): """ Commit all modifications in this transaction to Xenstore. """ self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'T\0') def abort(self): """ Discard all modifications in this transaction. """ self.xs.do_op(self, self.xs.XS_TRANSACTION_END, 'F\0') def read(self, *vars): return self.xs._read(self, *vars) def mkdir(self, *vars): self.xs._mkdir(self, *vars) def rm(self, *vars): self.xs._read(self, *vars) def write(self, vars): self.xs.write(vars, self) def get_permissions(self, *paths): return self.xs_get_permissions(self, *paths) def set_permissions(self, path, perms): self.xs.set_permissions(perms, self) # example to code to dev test if __name__ == '__main__': def watch_cb(key, n): print "Watch fired:", key, n repeat_count = 5 # create xenstore handle xs = Xenstore() # read three keys, returns three values print xs.read('domid', 'vm', 'name') # write a key/value xs.write({'new': 'stuff'}) # verify contents were written assert xs.read('new') == 'stuff' print xs.get_permissions('new')xs.set_permissions('new', {0: xs.PERM_READ_WRITE, os.getpid(): xs.PERM_READ}) print xs.get_permissions('new') xs.mkdir('another-new') # write another key xs.write({'new/path': 'more stuff'}) xs.read('new/path') # delete key xs.rm('new/path') # attempt to read deleted key try: print xs.read('new/path') except XenstoreException, e: print "Failed to read (expected)", e # create and start transaction tx = Transaction() tx.start() tx.write({'tnew': 'committed'}) # commit and verify value is updated tx.commit() assert xs.read('tnew') == 'committed' # start new transaction tx.start() tx.write({'tnew': 'aborted'}) # abort and verify value is NOT updated tx.abort() assert xs.read('tnew') != 'aborted' # watch key xs.watch('new', watch_cb, 7) d = {}# repeatedly run a large transaction to cause collisions between two program instances for n in range(1, 200): d["collision/key"+str(n)] = "value"+str(n) while True: try: tx.start() tx.write(d) tx.commit() except XenstoreException, e: if str(e) == 'EAGAIN': repeat_count -= 1 if repeat_count == 0: break print "Collision on commit, repeating" else: print "Unexpected Xenstore error", e break print "End of test" xs.unwatch_all() # stop background watch thread xs.watch_stop() _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |