[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] Add some locking to console handling.
ChangeSet 1.1327.2.3, 2005/04/21 15:11:29+01:00, mjw@xxxxxxxxxxxxxxxxxxx Add some locking to console handling. Remove a dead file. Signed-off-by: Mike Wray <mike.wray@xxxxxx> b/tools/python/xen/xend/server/console.py | 202 ++++++++++++++++++------------ tools/python/xen/xend/EventTypes.py | 34 ----- 2 files changed, 126 insertions(+), 110 deletions(-) diff -Nru a/tools/python/xen/xend/EventTypes.py b/tools/python/xen/xend/EventTypes.py --- a/tools/python/xen/xend/EventTypes.py 2005-05-13 16:03:44 -04:00 +++ /dev/null Wed Dec 31 16:00:00 196900 @@ -1,34 +0,0 @@ -# Copyright (C) 2004 Mike Wray <mike.wray@xxxxxx> - -## XEND_DOMAIN_CREATE = "xend.domain.create": dom -## create: -## xend.domain.destroy: dom, reason:died/crashed -## xend.domain.up ? - -## xend.domain.unpause: dom -## xend.domain.pause: dom -## xend.domain.shutdown: dom -## xend.domain.destroy: dom - -## xend.domain.migrate.begin: dom, to -## Begin tells: src host, src domain uri, dst host. Dst id known? -## err: src host, src domain uri, dst host, dst id if known, status (of domain: ok, dead,...), reason -## end: src host, src domain uri, dst host, dst uri - -## Events for both ends of migrate: for exporter and importer? -## Include migrate id so can tie together. -## Have uri /xend/migrate/<id> for migrate info (migrations in progress). - -## (xend.domain.migrate.begin (src <host>) (src.domain <id>) -## (dst <host>) (id <migrate id>)) - -## xend.domain.migrate.end: -## (xend.domain.migrate.end (domain <id>) (to <host>) - -## xend.node.up: xend uri -## xend.node.down: xend uri - -## xend.error ? - -## format: - diff -Nru a/tools/python/xen/xend/server/console.py b/tools/python/xen/xend/server/console.py --- a/tools/python/xen/xend/server/console.py 2005-05-13 16:03:44 -04:00 +++ b/tools/python/xen/xend/server/console.py 2005-05-13 16:03:44 -04:00 @@ -1,6 +1,7 @@ # Copyright (C) 2004 Mike Wray <mike.wray@xxxxxx> import socket +import threading from xen.web import reactor, protocol @@ -86,6 +87,7 @@ def __init__(self, controller, id, config, recreate=False): Dev.__init__(self, controller, id, config) + self.lock = threading.RLock() self.status = self.STATUS_NEW self.addr = None self.conn = None @@ -107,9 +109,13 @@ [self.id, self.getDomain(), self.console_port]) def init(self, recreate=False, reboot=False): - self.destroyed = False - self.channel = self.getChannel() - self.listen() + try: + self.lock.acquire() + self.destroyed = False + self.channel = self.getChannel() + self.listen() + finally: + self.lock.release() def checkConsolePort(self, console_port): """Check that a console port is not in use by another console. @@ -121,29 +127,41 @@ ctrl.checkConsolePort(console_port) def sxpr(self): - val = ['console', - ['status', self.status ], - ['id', self.id ], - ['domain', self.getDomain() ] ] - val.append(['local_port', self.getLocalPort() ]) - val.append(['remote_port', self.getRemotePort() ]) - val.append(['console_port', self.console_port ]) - val.append(['index', self.getIndex()]) - if self.addr: - val.append(['connected', self.addr[0], self.addr[1]]) + try: + self.lock.acquire() + val = ['console', + ['status', self.status ], + ['id', self.id ], + ['domain', self.getDomain() ] ] + val.append(['local_port', self.getLocalPort() ]) + val.append(['remote_port', self.getRemotePort() ]) + val.append(['console_port', self.console_port ]) + val.append(['index', self.getIndex()]) + if self.addr: + val.append(['connected', self.addr[0], self.addr[1]]) + finally: + self.lock.release() return val def getLocalPort(self): - if self.channel: - return self.channel.getLocalPort() - else: - return 0 + try: + self.lock.acquire() + if self.channel: + return self.channel.getLocalPort() + else: + return 0 + finally: + self.lock.release() def getRemotePort(self): - if self.channel: - return self.channel.getRemotePort() - else: - return 0 + try: + self.lock.acquire() + if self.channel: + return self.channel.getRemotePort() + else: + return 0 + finally: + self.lock.release() def uri(self): """Get the uri to use to connect to the console. @@ -166,23 +184,31 @@ print 'ConsoleDev>destroy>', self, reboot if reboot: return - self.status = self.STATUS_CLOSED - if self.conn: - self.conn.loseConnection() - self.listener.stopListening() + try: + self.lock.acquire() + self.status = self.STATUS_CLOSED + if self.conn: + self.conn.loseConnection() + self.listener.stopListening() + finally: + self.lock.release() def listen(self): """Listen for TCP connections to the console port.. """ - if self.closed(): - return - if self.listener: - pass - else: - self.status = self.STATUS_LISTENING - cf = ConsoleFactory(self, self.id) - interface = xroot.get_console_address() - self.listener = reactor.listenTCP(self.console_port, cf, interface=interface) + try: + self.lock.acquire() + if self.closed(): + return + if self.listener: + pass + else: + self.status = self.STATUS_LISTENING + cf = ConsoleFactory(self, self.id) + interface = xroot.get_console_address() + self.listener = reactor.listenTCP(self.console_port, cf, interface=interface) + finally: + self.lock.release() def connect(self, addr, conn): """Connect a TCP connection to the console. @@ -193,27 +219,35 @@ returns 0 if ok, negative otherwise """ - if self.closed(): - return -1 - if self.connected(): - return -1 - self.addr = addr - self.conn = conn - self.status = self.STATUS_CONNECTED - self.writeOutput() + try: + self.lock.acquire() + if self.closed(): + return -1 + if self.connected(): + return -1 + self.addr = addr + self.conn = conn + self.status = self.STATUS_CONNECTED + self.writeOutput() + finally: + self.lock.release() return 0 def disconnect(self, conn=None): """Disconnect the TCP connection to the console. """ print 'ConsoleDev>disconnect>', conn - if conn and conn != self.conn: return - if self.conn: - self.conn.loseConnection() - self.addr = None - self.conn = None - self.status = self.STATUS_LISTENING - self.listen() + try: + self.lock.acquire() + if conn and conn != self.conn: return + if self.conn: + self.conn.loseConnection() + self.addr = None + self.conn = None + self.status = self.STATUS_LISTENING + self.listen() + finally: + self.lock.release() def receiveOutput(self, msg): """Receive output console data from the console channel. @@ -223,30 +257,38 @@ subtype minor message typ """ # Treat the obuf as a ring buffer. - data = msg.get_payload() - data_n = len(data) - if self.obuf.space() < data_n: - self.obuf.discard(data_n) - if self.obuf.space() < data_n: - data = data[-self.obuf.space():] - self.obuf.write(data) - self.writeOutput() + try: + self.lock.acquire() + data = msg.get_payload() + data_n = len(data) + if self.obuf.space() < data_n: + self.obuf.discard(data_n) + if self.obuf.space() < data_n: + data = data[-self.obuf.space():] _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |