[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] xsnode.py:
ChangeSet 1.1713.1.21, 2005/06/17 17:23:55+01:00, cl349@xxxxxxxxxxxxxxxxxxxx xsnode.py: Updated watches/event code from Mike Wray. Signed-off-by: Mike Wray <mike.wray@xxxxxx> Signed-off-by: Christian Limpach <Christian.Limpach@xxxxxxxxxxxx> xsnode.py | 219 +++++++++++++++++++++++++++++++++++++++----------------------- 1 files changed, 141 insertions(+), 78 deletions(-) diff -Nru a/tools/python/xen/xend/xenstore/xsnode.py b/tools/python/xen/xend/xenstore/xsnode.py --- a/tools/python/xen/xend/xenstore/xsnode.py 2005-06-17 21:03:36 -04:00 +++ b/tools/python/xen/xend/xenstore/xsnode.py 2005-06-17 21:03:36 -04:00 @@ -2,7 +2,9 @@ import os import os.path import select +import socket import sys +import threading import time from xen.lowlevel import xs @@ -12,18 +14,26 @@ SELECT_TIMEOUT = 2.0 def getEventPath(event): - return os.path.join("/_event", event) + if event and event.startswith("/"): + event = event[1:] + return os.path.join("/event", event) def getEventIdPath(event): - return os.path.join(eventPath(event), "@eid") + return os.path.join(getEventPath(event), "@eid") class Subscription: - def __init__(self, event, fn, id): - self.event = event + def __init__(self, path, fn, sid): + self.path = path self.watcher = None self.fn = fn - self.id = id + self.sid = sid + + def getPath(self): + return self.path + + def getSid(self): + return self.sid def watch(self, watcher): self.watcher = watcher @@ -34,10 +44,11 @@ if watcher: self.watcher = None watcher.delSubs(self) + return watcher - def notify(self, event): + def notify(self, path, val): try: - self.fn(event, id) + self.fn(self, path, val) except SystemExitException: raise except: @@ -45,45 +56,45 @@ class Watcher: - def __init__(self, store, event): - self.path = getEventPath(event) - self.eidPath = getEventIdPath(event) + def __init__(self, store, path): + self.path = path store.mkdirs(self.path) - if not store.exists(self.eidPath): - store.writeInt(self.eidPath, 0) self.xs = None - self.subs = [] + self.subscriptions = [] - def __getattr__(self, k, v): - if k == "fileno": - if self.xs: - return self.xs.fileno - else: - return -1 + def fileno(self): + if self.xs: + return self.xs.fileno else: - return self.__dict__.get(k, v) + return -1 + + def getPath(self): + return self.path def addSubs(self, subs): - self.subs.append(subs) + self.subscriptions.append(subs) self.watch() def delSubs(self, subs): - self.subs.remove(subs) - if len(self.subs) == 0: + self.subscriptions.remove(subs) + if len(self.subscriptions) == 0: self.unwatch() - def getEvent(self): - return self.event - def watch(self): if self.xs: return self.xs = xs.open() - self.xs.watch(path) + self.xs.watch(self.path) def unwatch(self): if self.xs: - self.xs.unwatch(self.path) - self.xs.close() + try: + self.xs.unwatch(self.path) + except Exception, ex: + print 'Watcher>unwatch>', ex + try: + self.xs.close() + except Exception, ex: + pass self.xs = None def watching(self): @@ -92,22 +103,38 @@ def getNotification(self): p = self.xs.read_watch() self.xs.acknowledge_watch() - eid = self.xs.readInt(self.eidPath) return p - def notify(self, subs): - p = self.getNotification() - for s in subs: - s.notify(p) - + def notify(self): + try: + p = self.getNotification() + v = self.xs.read(p) + for s in subscriptions: + s.notify(p, v) + except Exception, ex: + print 'Notify exception:', ex + +class EventWatcher(Watcher): + + def __init__(self, store, path, event): + Watcher.__init__(self, store, path) + self.event = event + self.eidPath = getEventIdPath(event) + if not store.exists(self.eidPath): + store.write(self.eidPath, str(0)) + + def getEvent(self): + return self.event + class XenStore: + xs = None + watchThread = None + subscription_id = 1 + def __init__(self): - self.xs = None - #self.xs = xs.open() - self.subscription = {} - self.subscription_id = 0 - self.events = {} + self.subscriptions = {} + self.watchers = {} self.write("/", "") def getxs(self): @@ -119,8 +146,8 @@ ex = None break except Exception, ex: - print >>stderr, "Exception connecting to xsdaemon:", ex - print >>stderr, "Trying again..." + print >>sys.stderr, "Exception connecting to xsdaemon:", ex + print >>sys.stderr, "Trying again..." time.sleep(1) else: raise ex @@ -217,70 +244,85 @@ self.getxs().write(path, data, create=create, excl=excl) def begin(self, path): - self.getxs().begin_transaction(path) + self.getxs().transaction_start(path) def commit(self, abandon=False): - self.getxs().end_transaction(abort=abandon) + self.getxs().transaction_end(abort=abandon) + + def watch(self, path, fn): + watcher = self.watchers.get(path) + if not watcher: + watcher = self.addWatcher(Watcher(self, path)) + return self.addSubscription(watcher, fn) + + def unwatch(self, sid): + s = self.subscriptions.get(sid) + if not s: return + del self.subscriptions[s.sid] + watcher = s.unwatch() + if watcher and not watcher.watching(): + del self.watchers[path] def subscribe(self, event, fn): - watcher = self.watchEvent(event) - self.subscription_id += 1 - subs = Subscription(event, fn, self.subscription_id) - self.subscription[subs.id] = subs - subs.watch(watcher) - return subs.id + path = getEventPath(event) + watcher = self.watchers.get(path) + if not watcher: + watcher = self.addWatcher(EventWatcher(self, path, event)) + return self.addSubscription(watcher, fn) - def unsubscribe(self, sid): - s = self.subscription.get(sid) - if not s: return - del self.subscription[s.id] - s.unwatch() - unwatchEvent(s.event) + unsubscribe = unwatch def sendEvent(self, event, data): eventPath = getEventPath(event) eidPath = getEventIdPath(event) try: - self.begin(eventPath) + #self.begin(eventPath) self.mkdirs(eventPath) + eid = 1 if self.exists(eidPath): - eid = self.readInt(eidPath) - eid += 1 - else: - eid = 1 - self.writeInt(eidPath, eid) + data = self.read(eidPath) + print 'sendEvent>', 'data=', data, type(data) _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |