[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-changelog] Strip huge piles of cruft from the connection infrastructure. We now actually
# HG changeset patch # User emellor@xxxxxxxxxxxxxxxxxxxxxx # Node ID 76bff6c996b0250739229181e40bc9c349f80c15 # Parent 23168659679678dee6eda3385581c2d825ca2ad6 Strip huge piles of cruft from the connection infrastructure. We now actually block inside accept rather than using select to poll and then calling accept regardless of the outcome of the select call, and then failing because the socket is non-blocking. SocketClientConnection, SocketConnector, TCPClientConnection, TCPConnector, connectTCP, UnixClientConnection, UnixConnector, connectUnix have gone. loseConnection and stopListening and closeSocket (where they are needed) are now called close. startListening is now called listen. Closes bug #379. Relieves a weight from the shoulders of the universe. Signed-off-by: Ewan Mellor <ewan@xxxxxxxxxxxxx> diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/connection.py --- a/tools/python/xen/web/connection.py Thu Dec 8 14:30:15 2005 +++ b/tools/python/xen/web/connection.py Thu Dec 8 15:04:31 2005 @@ -30,11 +30,8 @@ for TCP and unix-domain sockets (see tcp.py and unix.py). """ -"""We make sockets non-blocking so that operations like accept() -don't block. We also select on a timeout. Otherwise we have no way -of getting the threads to shutdown. -""" -SELECT_TIMEOUT = 2.0 +BUFFER_SIZE = 1024 + class SocketServerConnection: """An accepted connection to a server. @@ -45,73 +42,35 @@ self.protocol = protocol self.addr = addr self.server = server - self.buffer_n = 1024 - self.thread = None self.protocol.setTransport(self) + def run(self): - self.thread = threading.Thread(target=self.main) - self.thread.start() + threading.Thread(target=self.main).start() + def main(self): - while True: - if not self.thread: break - if self.select(): break - if not self.thread: break - data = self.read() - if data is None: continue - if data is True: break - if self.dataReceived(data): break + try: + while True: + try: + data = self.sock.recv(BUFFER_SIZE) + if data == '': + break + if self.protocol.dataReceived(data): + break + except socket.error, ex: + if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR): + break + finally: + try: + self.sock.close() + except: + pass - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def read(self): - try: - data = self.sock.recv(self.buffer_n) - if data == '': - self.loseConnection() - return True - return data - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return None - else: - self.loseConnection(ex) - return True - - def dataReceived(self, data): - try: - self.protocol.dataReceived(data) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - return True - return False def write(self, data): self.sock.send(data) - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - self.sock.close() - except SystemExit: - raise - except: - pass class SocketListener: """A server socket, running listen in a thread. @@ -126,192 +85,44 @@ self.backlog = backlog self.thread = None + def createSocket(self): raise NotImplementedError() + def setCloExec(self): fcntl.fcntl(self.sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) + def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, addr, self) - def startListening(self): + + def listen(self): if self.sock or self.thread: raise IOError("already listening") self.sock = self.createSocket() - self.sock.setblocking(0) self.sock.listen(self.backlog) self.run() - def stopListening(self, reason=None): - self.loseConnection(reason) def run(self): self.thread = threading.Thread(target=self.main) self.thread.start() - def main(self): - while True: - if not self.thread: break - if self.select(): break - if self.accept(): break - - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def accept(self): - try: - (sock, addr) = self.sock.accept() - sock.setblocking(0) - return self.accepted(sock, addr) - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def accepted(self, sock, addr): - self.acceptConnection(sock, self.protocol_class(), addr).run() - return False - - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - self.sock.close() - except SystemExit: - raise - except Exception, ex: - pass - - -class SocketClientConnection: - """A connection to a server from a client. - - Call connectionMade() on the protocol in a thread when connected. - It is completely up to the protocol what to do. - """ - - def __init__(self, connector): - self.addr = None - self.connector = connector - self.buffer_n = 1024 - - def createSocket (self): - raise NotImplementedError() - - def write(self, data): - if self.sock: - return self.sock.send(data) - else: - return 0 - - def connect(self, timeout): - #todo: run a timer to cancel on timeout? - try: - sock = self.createSocket() - sock.connect(self.addr) - self.sock = sock - self.protocol = self.connector.protocol_class() - self.protocol.setTransport(self) - except SystemExit: - raise - except Exception, ex: - self.connector.connectionFailed(ex) - return False - - self.thread = threading.Thread(target=self.main) - #self.thread.setDaemon(True) - self.thread.start() - return True def main(self): try: - # Call the protocol in a thread. - # Up to it what to do. - self.protocol.connectionMade(self.addr) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - - def mainLoop(self): - # Something a protocol could call. - while True: - if not self.thread: break - if self.select(): break - if not self.thread: break - data = self.read() - if data is None: continue - if data is True: break - if self.dataReceived(data): break - - def select(self): - try: - select.select([self.sock], [], [], SELECT_TIMEOUT) - return False - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return False - else: - self.loseConnection(ex) - return True - - def read(self): - try: - data = self.sock.recv(self.buffer_n) - return data - except socket.error, ex: - if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR): - return None - else: - self.loseConnection(ex) - return True - - def dataReceived(self, data): - if not self.protocol: - return True - try: - self.protocol.dataReceived(data) - except SystemExit: - raise - except Exception, ex: - self.loseConnection(ex) - return True - return False - - def loseConnection(self, reason=None): - self.thread = None - self.closeSocket(reason) - - def closeSocket(self, reason): - try: - if self.sock: + while True: + try: + (sock, addr) = self.sock.accept() + self.acceptConnection(sock, self.protocol_class(), + addr).run() + except socket.error, ex: + if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR): + break + finally: + try: self.sock.close() - except SystemExit: - raise - except: - pass - -class SocketConnector: - """A client socket. Connects to a server and runs the client protocol - in a thread. - """ - - def __init__(self, protocol_class): - self.protocol_class = protocol_class - self.transport = None - - def connect(self): - pass + except: + pass diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/protocol.py --- a/tools/python/xen/web/protocol.py Thu Dec 8 14:30:15 2005 +++ b/tools/python/xen/web/protocol.py Thu Dec 8 15:04:31 2005 @@ -25,7 +25,7 @@ self.transport = transport def dataReceived(self, data): - print 'Protocol>dataReceived>' + raise NotImplementedError() def write(self, data): if self.transport: diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/tcp.py --- a/tools/python/xen/web/tcp.py Thu Dec 8 14:30:15 2005 +++ b/tools/python/xen/web/tcp.py Thu Dec 8 15:04:31 2005 @@ -13,16 +13,16 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA #============================================================================ # Copyright (C) 2005 Mike Wray <mike.wray@xxxxxx> +# Copyright (C) 2005 XenSource Ltd. #============================================================================ -import sys + import socket -import types import time import errno from connection import * -from protocol import * + class TCPListener(SocketListener): @@ -52,48 +52,8 @@ def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, addr, self) -class TCPClientConnection(SocketClientConnection): - - def __init__(self, host, port, bindAddress, connector): - SocketClientConnection.__init__(self, connector) - self.addr = (host, port) - self.bindAddress = bindAddress - - def createSocket(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - if self.bindAddress is not None: - sock.bind(self.bindAddress) - return sock - -class TCPConnector(SocketConnector): - - def __init__(self, host, port, protocol, timeout=None, bindAddress=None): - SocketConnector.__init__(self, protocol) - self.host = host - self.port = self.servicePort(port) - self.bindAddress = bindAddress - self.timeout = timeout - - def servicePort(self, port): - if isinstance(port, types.StringTypes): - try: - port = socket.getservbyname(port, 'tcp') - except socket.error, ex: - raise IOError("unknown service: " + ex) - return port - - def connect(self): - self.transport = TCPClientConnection( - self.host, self.port, self.bindAddress, self) - self.transport.connect(self.timeout) def listenTCP(port, protocol, interface='', backlog=None): l = TCPListener(port, protocol, interface=interface, backlog=backlog) - l.startListening() - return l - -def connectTCP(host, port, protocol, timeout=None, bindAddress=None): - c = TCPConnector(host, port, protocol, timeout=timeout, - bindAddress=bindAddress) - c.connect() - return c + l.listen() + l.setCloExec() diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/unix.py --- a/tools/python/xen/web/unix.py Thu Dec 8 14:30:15 2005 +++ b/tools/python/xen/web/unix.py Thu Dec 8 15:04:31 2005 @@ -16,13 +16,13 @@ # Copyright (C) 2005 XenSource Ltd. #============================================================================ -import sys + import socket import os import os.path from connection import * -from protocol import * + class UnixListener(SocketListener): @@ -48,33 +48,6 @@ def acceptConnection(self, sock, protocol, addr): return SocketServerConnection(sock, protocol, self.path, self) -class UnixClientConnection(SocketClientConnection): - - def __init__(self, addr, connector): - SocketClientConnection.__init__(self, connector) - self.addr = addr - - def createSocket(self): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - return sock - -class UnixConnector(SocketConnector): - - def __init__(self, path, protocol, timeout=None): - SocketConnector.__init__(self, protocol) - self.addr = path - self.timeout = timeout - - def connect(self): - self.transport = UnixClientConnection(self.addr, self) - self.transport.connect(self.timeout) def listenUNIX(path, protocol, backlog=None): - l = UnixListener(path, protocol, backlog=backlog) - l.startListening() - return l - -def connectUNIX(path, protocol, timeout=None): - c = UnixConnector(path, protocol, timeout=timeout) - c.connect() - return c + UnixListener(path, protocol, backlog=backlog).listen() diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/xend/server/relocate.py --- a/tools/python/xen/xend/server/relocate.py Thu Dec 8 14:30:15 2005 +++ b/tools/python/xen/xend/server/relocate.py Thu Dec 8 15:04:31 2005 @@ -44,15 +44,15 @@ res = self.dispatch(val) self.send_result(res) if self.parser.at_eof(): - self.loseConnection() + self.close() except SystemExit: raise except: self.send_error() - def loseConnection(self): + def close(self): if self.transport: - self.transport.loseConnection() + self.transport.close() def send_reply(self, sxpr): io = StringIO.StringIO() @@ -100,15 +100,13 @@ return l def op_quit(self, _1, _2): - self.loseConnection() + self.close() def op_receive(self, name, _): if self.transport: self.send_reply(["ready", name]) - self.transport.sock.setblocking(1) XendDomain.instance().domain_restore_fd( self.transport.sock.fileno()) - self.transport.sock.setblocking(0) else: log.error(name + ": no transport") raise XendError(name + ": no transport") @@ -122,5 +120,4 @@ if xroot.get_xend_relocation_server(): port = xroot.get_xend_relocation_port() interface = xroot.get_xend_relocation_address() - l = tcp.listenTCP(port, RelocationProtocol, interface=interface) - l.setCloExec() + tcp.listenTCP(port, RelocationProtocol, interface=interface) _______________________________________________ Xen-changelog mailing list Xen-changelog@xxxxxxxxxxxxxxxxxxx http://lists.xensource.com/xen-changelog
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |