[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] xsnode.py:



ChangeSet 1.1713.1.21, 2005/06/17 17:23:55+01:00, cl349@xxxxxxxxxxxxxxxxxxxx

        xsnode.py:
          Updated watches/event code from Mike Wray.
        Signed-off-by: Mike Wray <mike.wray@xxxxxx>
        Signed-off-by: Christian Limpach <Christian.Limpach@xxxxxxxxxxxx>



 xsnode.py |  219 +++++++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 141 insertions(+), 78 deletions(-)


diff -Nru a/tools/python/xen/xend/xenstore/xsnode.py 
b/tools/python/xen/xend/xenstore/xsnode.py
--- a/tools/python/xen/xend/xenstore/xsnode.py  2005-06-17 21:03:36 -04:00
+++ b/tools/python/xen/xend/xenstore/xsnode.py  2005-06-17 21:03:36 -04:00
@@ -2,7 +2,9 @@
 import os
 import os.path
 import select
+import socket
 import sys
+import threading
 import time
 
 from xen.lowlevel import xs
@@ -12,18 +14,26 @@
 SELECT_TIMEOUT = 2.0
 
 def getEventPath(event):
-    return os.path.join("/_event", event)
+    if event and event.startswith("/"):
+        event = event[1:]
+    return os.path.join("/event", event)
 
 def getEventIdPath(event):
-    return os.path.join(eventPath(event), "@eid")
+    return os.path.join(getEventPath(event), "@eid")
 
 class Subscription:
 
-    def __init__(self, event, fn, id):
-        self.event = event
+    def __init__(self, path, fn, sid):
+        self.path = path
         self.watcher = None
         self.fn = fn
-        self.id = id
+        self.sid = sid
+
+    def getPath(self):
+        return self.path
+
+    def getSid(self):
+        return self.sid
 
     def watch(self, watcher):
         self.watcher = watcher
@@ -34,10 +44,11 @@
         if watcher:
             self.watcher = None
             watcher.delSubs(self)
+        return watcher
 
-    def notify(self, event):
+    def notify(self, path, val):
         try:
-            self.fn(event, id)
+            self.fn(self, path, val)
         except SystemExitException:
             raise
         except:
@@ -45,45 +56,45 @@
 
 class Watcher:
 
-    def __init__(self, store, event):
-        self.path = getEventPath(event)
-        self.eidPath = getEventIdPath(event)
+    def __init__(self, store, path):
+        self.path = path
         store.mkdirs(self.path)
-        if not store.exists(self.eidPath):
-            store.writeInt(self.eidPath, 0)
         self.xs = None
-        self.subs = []
+        self.subscriptions = []
 
-    def __getattr__(self, k, v):
-        if k == "fileno":
-            if self.xs:
-                return self.xs.fileno
-            else:
-                return -1
+    def fileno(self):
+        if self.xs:
+            return self.xs.fileno
         else:
-            return self.__dict__.get(k, v)
+            return -1
+
+    def getPath(self):
+        return self.path
 
     def addSubs(self, subs):
-        self.subs.append(subs)
+        self.subscriptions.append(subs)
         self.watch()
 
     def delSubs(self, subs):
-        self.subs.remove(subs)
-        if len(self.subs) == 0:
+        self.subscriptions.remove(subs)
+        if len(self.subscriptions) == 0:
             self.unwatch()
 
-    def getEvent(self):
-        return self.event
-
     def watch(self):
         if self.xs: return
         self.xs = xs.open()
-        self.xs.watch(path)
+        self.xs.watch(self.path)
 
     def unwatch(self):
         if self.xs:
-            self.xs.unwatch(self.path)
-            self.xs.close()
+            try:
+                self.xs.unwatch(self.path)
+            except Exception, ex:
+                print 'Watcher>unwatch>', ex
+            try:
+                self.xs.close()
+            except Exception, ex:
+                pass
             self.xs = None
             
     def watching(self):
@@ -92,22 +103,38 @@
     def getNotification(self):
         p = self.xs.read_watch()
         self.xs.acknowledge_watch()
-        eid = self.xs.readInt(self.eidPath)
         return p
 
-    def notify(self, subs):
-        p = self.getNotification()
-        for s in subs:
-            s.notify(p)
-            
+    def notify(self):
+        try:
+            p = self.getNotification()
+            v = self.xs.read(p)
+            for s in subscriptions:
+                s.notify(p, v)
+        except Exception, ex:
+            print 'Notify exception:', ex
+
+class EventWatcher(Watcher):
+
+    def __init__(self, store, path, event):
+        Watcher.__init__(self, store, path)
+        self.event = event
+        self.eidPath = getEventIdPath(event)
+        if not store.exists(self.eidPath):
+            store.write(self.eidPath, str(0))
+
+    def getEvent(self):
+        return self.event
+
 class XenStore:
 
+    xs = None
+    watchThread = None
+    subscription_id = 1
+    
     def __init__(self):
-        self.xs = None
-        #self.xs = xs.open()
-        self.subscription = {}
-        self.subscription_id = 0
-        self.events = {}
+        self.subscriptions = {}
+        self.watchers = {}
         self.write("/", "")
 
     def getxs(self):
@@ -119,8 +146,8 @@
                     ex = None
                     break
                 except Exception, ex:
-                    print >>stderr, "Exception connecting to xsdaemon:", ex
-                    print >>stderr, "Trying again..."
+                    print >>sys.stderr, "Exception connecting to xsdaemon:", ex
+                    print >>sys.stderr, "Trying again..."
                     time.sleep(1)
             else:
                 raise ex
@@ -217,70 +244,85 @@
         self.getxs().write(path, data, create=create, excl=excl)
 
     def begin(self, path):
-        self.getxs().begin_transaction(path)
+        self.getxs().transaction_start(path)
 
     def commit(self, abandon=False):
-        self.getxs().end_transaction(abort=abandon)
+        self.getxs().transaction_end(abort=abandon)
+
+    def watch(self, path, fn):
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(Watcher(self, path))
+        return self.addSubscription(watcher, fn)
+        
+    def unwatch(self, sid):
+        s = self.subscriptions.get(sid)
+        if not s: return
+        del self.subscriptions[s.sid]
+        watcher = s.unwatch()
+        if watcher and not watcher.watching():
+            del self.watchers[path]
 
     def subscribe(self, event, fn):
-        watcher = self.watchEvent(event)
-        self.subscription_id += 1
-        subs = Subscription(event, fn, self.subscription_id)
-        self.subscription[subs.id] = subs
-        subs.watch(watcher)
-        return subs.id
+        path = getEventPath(event)
+        watcher = self.watchers.get(path)
+        if not watcher:
+            watcher = self.addWatcher(EventWatcher(self, path, event))
+        return self.addSubscription(watcher, fn)
 
-    def unsubscribe(self, sid):
-        s = self.subscription.get(sid)
-        if not s: return
-        del self.subscription[s.id]
-        s.unwatch()
-        unwatchEvent(s.event)
+    unsubscribe = unwatch
 
     def sendEvent(self, event, data):
         eventPath = getEventPath(event)
         eidPath = getEventIdPath(event)
         try:
-            self.begin(eventPath)
+            #self.begin(eventPath)
             self.mkdirs(eventPath)
+            eid = 1
             if self.exists(eidPath):
-                eid = self.readInt(eidPath)
-                eid += 1
-            else:
-                eid = 1
-            self.writeInt(eidPath, eid)
+                data = self.read(eidPath)
+                print 'sendEvent>', 'data=', data, type(data)

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.