[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Strip huge piles of cruft from the connection infrastructure. We now actually



# HG changeset patch
# User emellor@xxxxxxxxxxxxxxxxxxxxxx
# Node ID 76bff6c996b0250739229181e40bc9c349f80c15
# Parent  23168659679678dee6eda3385581c2d825ca2ad6
Strip huge piles of cruft from the connection infrastructure.  We now actually
block inside accept rather than using select to poll and then calling accept
regardless of the outcome of the select call, and then failing because the
socket is non-blocking.

SocketClientConnection, SocketConnector, TCPClientConnection, TCPConnector,
connectTCP, UnixClientConnection, UnixConnector, connectUnix have gone.

loseConnection and stopListening and closeSocket (where they are needed) are
now called close.  startListening is now called listen.

Closes bug #379.

Relieves a weight from the shoulders of the universe.

Signed-off-by: Ewan Mellor <ewan@xxxxxxxxxxxxx>

diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/connection.py
--- a/tools/python/xen/web/connection.py        Thu Dec  8 14:30:15 2005
+++ b/tools/python/xen/web/connection.py        Thu Dec  8 15:04:31 2005
@@ -30,11 +30,8 @@
 for TCP and unix-domain sockets (see tcp.py and unix.py).
 """
 
-"""We make sockets non-blocking so that operations like accept()
-don't block. We also select on a timeout. Otherwise we have no way
-of getting the threads to shutdown.
-"""
-SELECT_TIMEOUT = 2.0
+BUFFER_SIZE = 1024
+
 
 class SocketServerConnection:
     """An accepted connection to a server.
@@ -45,73 +42,35 @@
         self.protocol = protocol
         self.addr = addr
         self.server = server
-        self.buffer_n = 1024
-        self.thread = None
         self.protocol.setTransport(self)
 
+
     def run(self):
-        self.thread = threading.Thread(target=self.main)
-        self.thread.start()
+        threading.Thread(target=self.main).start()
+
 
     def main(self):
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if not self.thread: break
-            data = self.read()
-            if data is None: continue
-            if data is True: break
-            if self.dataReceived(data): break
+        try:
+            while True:
+                try:
+                    data = self.sock.recv(BUFFER_SIZE)
+                    if data == '':
+                        break
+                    if self.protocol.dataReceived(data):
+                        break
+                except socket.error, ex:
+                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
+                        break
+        finally:
+            try:
+                self.sock.close()
+            except:
+                pass
 
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def read(self):
-        try:
-            data = self.sock.recv(self.buffer_n)
-            if data == '':
-                self.loseConnection()
-                return True
-            return data
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return None
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def dataReceived(self, data):
-        try:
-            self.protocol.dataReceived(data)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-            return True
-        return False
 
     def write(self, data):
         self.sock.send(data)
 
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            self.sock.close()
-        except SystemExit:
-            raise
-        except:
-            pass
 
 class SocketListener:
     """A server socket, running listen in a thread.
@@ -126,192 +85,44 @@
         self.backlog = backlog
         self.thread = None
 
+
     def createSocket(self):
         raise NotImplementedError()
+
 
     def setCloExec(self):
         fcntl.fcntl(self.sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
 
+
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, addr, self)
 
-    def startListening(self):
+
+    def listen(self):
         if self.sock or self.thread:
             raise IOError("already listening")
         self.sock = self.createSocket()
-        self.sock.setblocking(0)
         self.sock.listen(self.backlog)
         self.run()
 
-    def stopListening(self, reason=None):
-        self.loseConnection(reason)
 
     def run(self):
         self.thread = threading.Thread(target=self.main)
         self.thread.start()
 
-    def main(self):
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if self.accept(): break
-
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def accept(self):
-        try:
-            (sock, addr) = self.sock.accept()
-            sock.setblocking(0)
-            return self.accepted(sock, addr)
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def accepted(self, sock, addr):
-        self.acceptConnection(sock, self.protocol_class(), addr).run()
-        return False
-
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            self.sock.close()
-        except SystemExit:
-            raise
-        except Exception, ex:
-            pass
-
-
-class SocketClientConnection:
-    """A connection to a server from a client.
-
-    Call connectionMade() on the protocol in a thread when connected.
-    It is completely up to the protocol what to do.
-    """
-
-    def __init__(self, connector):
-        self.addr = None
-        self.connector = connector
-        self.buffer_n = 1024
-
-    def createSocket (self):
-        raise NotImplementedError()
-
-    def write(self, data):
-        if self.sock:
-            return self.sock.send(data)
-        else:
-            return 0
-
-    def connect(self, timeout):
-        #todo: run a timer to cancel on timeout?
-        try:
-            sock = self.createSocket()
-            sock.connect(self.addr)
-            self.sock = sock
-            self.protocol = self.connector.protocol_class()
-            self.protocol.setTransport(self)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.connector.connectionFailed(ex)
-            return False
-
-        self.thread = threading.Thread(target=self.main)
-        #self.thread.setDaemon(True)
-        self.thread.start()
-        return True
 
     def main(self):
         try:
-            # Call the protocol in a thread.
-            # Up to it what to do.
-            self.protocol.connectionMade(self.addr)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-
-    def mainLoop(self):
-        # Something a protocol could call.
-        while True:
-            if not self.thread: break
-            if self.select(): break
-            if not self.thread: break
-            data = self.read()
-            if data is None: continue
-            if data is True: break
-            if self.dataReceived(data): break
-
-    def select(self):
-        try:
-            select.select([self.sock], [], [], SELECT_TIMEOUT)
-            return False
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return False
-            else:
-                self.loseConnection(ex)
-                return True
-
-    def read(self):
-        try:
-            data = self.sock.recv(self.buffer_n)
-            return data
-        except socket.error, ex:
-            if ex.args[0] in (EWOULDBLOCK, EAGAIN, EINTR):
-                return None
-            else:
-                self.loseConnection(ex)
-                return True
-        
-    def dataReceived(self, data):
-        if not self.protocol:
-            return True
-        try:
-            self.protocol.dataReceived(data)
-        except SystemExit:
-            raise
-        except Exception, ex:
-            self.loseConnection(ex)
-            return True
-        return False
-
-    def loseConnection(self, reason=None):
-        self.thread = None
-        self.closeSocket(reason)
-
-    def closeSocket(self, reason):
-        try:
-            if self.sock:
+            while True:
+                try:
+                    (sock, addr) = self.sock.accept()
+                    self.acceptConnection(sock, self.protocol_class(),
+                                          addr).run()
+                except socket.error, ex:
+                    if ex.args[0] not in (EWOULDBLOCK, EAGAIN, EINTR):
+                        break
+        finally:
+            try:
                 self.sock.close()
-        except SystemExit:
-            raise
-        except:
-            pass
-
-class SocketConnector:
-    """A client socket. Connects to a server and runs the client protocol
-    in a thread.
-    """
-
-    def __init__(self, protocol_class):
-        self.protocol_class = protocol_class
-        self.transport = None
-
-    def connect(self):
-        pass
+            except:
+                pass
diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/protocol.py
--- a/tools/python/xen/web/protocol.py  Thu Dec  8 14:30:15 2005
+++ b/tools/python/xen/web/protocol.py  Thu Dec  8 15:04:31 2005
@@ -25,7 +25,7 @@
         self.transport = transport
 
     def dataReceived(self, data):
-        print 'Protocol>dataReceived>'
+        raise NotImplementedError()
 
     def write(self, data):
         if self.transport:
diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/tcp.py
--- a/tools/python/xen/web/tcp.py       Thu Dec  8 14:30:15 2005
+++ b/tools/python/xen/web/tcp.py       Thu Dec  8 15:04:31 2005
@@ -13,16 +13,16 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 #============================================================================
 # Copyright (C) 2005 Mike Wray <mike.wray@xxxxxx>
+# Copyright (C) 2005 XenSource Ltd.
 #============================================================================
 
-import sys
+
 import socket
-import types
 import time
 import errno
 
 from connection import *
-from protocol import *
+
 
 class TCPListener(SocketListener):
 
@@ -52,48 +52,8 @@
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, addr, self)
 
-class TCPClientConnection(SocketClientConnection):
-
-    def __init__(self, host, port, bindAddress, connector):
-        SocketClientConnection.__init__(self, connector)
-        self.addr = (host, port)
-        self.bindAddress = bindAddress
-
-    def createSocket(self):
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        if self.bindAddress is not None:
-            sock.bind(self.bindAddress)
-        return sock
-    
-class TCPConnector(SocketConnector):
-
-    def __init__(self, host, port, protocol, timeout=None, bindAddress=None):
-        SocketConnector.__init__(self, protocol)
-        self.host = host
-        self.port = self.servicePort(port)
-        self.bindAddress = bindAddress
-        self.timeout = timeout
-
-    def servicePort(self, port):
-        if isinstance(port, types.StringTypes):
-            try:
-                port = socket.getservbyname(port, 'tcp')
-            except socket.error, ex:
-                raise IOError("unknown service: " + ex)
-        return port
-
-    def connect(self):
-        self.transport = TCPClientConnection(
-            self.host, self.port, self.bindAddress, self)
-        self.transport.connect(self.timeout)
 
 def listenTCP(port, protocol, interface='', backlog=None):
     l = TCPListener(port, protocol, interface=interface, backlog=backlog)
-    l.startListening()
-    return l
-
-def connectTCP(host, port, protocol, timeout=None, bindAddress=None):
-    c = TCPConnector(host, port, protocol, timeout=timeout,
-                     bindAddress=bindAddress)
-    c.connect()
-    return c
+    l.listen()
+    l.setCloExec()
diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/web/unix.py
--- a/tools/python/xen/web/unix.py      Thu Dec  8 14:30:15 2005
+++ b/tools/python/xen/web/unix.py      Thu Dec  8 15:04:31 2005
@@ -16,13 +16,13 @@
 # Copyright (C) 2005 XenSource Ltd.
 #============================================================================
 
-import sys
+
 import socket
 import os
 import os.path
 
 from connection import *
-from protocol import *
+
 
 class UnixListener(SocketListener):
 
@@ -48,33 +48,6 @@
     def acceptConnection(self, sock, protocol, addr):
         return SocketServerConnection(sock, protocol, self.path, self)
 
-class UnixClientConnection(SocketClientConnection):
-
-    def __init__(self, addr, connector):
-        SocketClientConnection.__init__(self, connector)
-        self.addr = addr
-        
-    def createSocket(self):
-        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-        return sock
-    
-class UnixConnector(SocketConnector):
-
-    def __init__(self, path, protocol, timeout=None):
-        SocketConnector.__init__(self, protocol)
-        self.addr = path
-        self.timeout = timeout
-
-    def connect(self):
-        self.transport = UnixClientConnection(self.addr, self)
-        self.transport.connect(self.timeout)
 
 def listenUNIX(path, protocol, backlog=None):
-    l = UnixListener(path, protocol, backlog=backlog)
-    l.startListening()
-    return l
-
-def connectUNIX(path, protocol, timeout=None):
-    c = UnixConnector(path, protocol, timeout=timeout)
-    c.connect()
-    return c
+    UnixListener(path, protocol, backlog=backlog).listen()
diff -r 231686596796 -r 76bff6c996b0 tools/python/xen/xend/server/relocate.py
--- a/tools/python/xen/xend/server/relocate.py  Thu Dec  8 14:30:15 2005
+++ b/tools/python/xen/xend/server/relocate.py  Thu Dec  8 15:04:31 2005
@@ -44,15 +44,15 @@
                 res = self.dispatch(val)
                 self.send_result(res)
             if self.parser.at_eof():
-                self.loseConnection()
+                self.close()
         except SystemExit:
             raise
         except:
             self.send_error()
 
-    def loseConnection(self):
+    def close(self):
         if self.transport:
-            self.transport.loseConnection()
+            self.transport.close()
 
     def send_reply(self, sxpr):
         io = StringIO.StringIO()
@@ -100,15 +100,13 @@
         return l
 
     def op_quit(self, _1, _2):
-        self.loseConnection()
+        self.close()
 
     def op_receive(self, name, _):
         if self.transport:
             self.send_reply(["ready", name])
-            self.transport.sock.setblocking(1)
             XendDomain.instance().domain_restore_fd(
                 self.transport.sock.fileno())
-            self.transport.sock.setblocking(0)
         else:
             log.error(name + ": no transport")
             raise XendError(name + ": no transport")
@@ -122,5 +120,4 @@
     if xroot.get_xend_relocation_server():
         port = xroot.get_xend_relocation_port()
         interface = xroot.get_xend_relocation_address()
-        l = tcp.listenTCP(port, RelocationProtocol, interface=interface)
-        l.setCloExec()
+        tcp.listenTCP(port, RelocationProtocol, interface=interface)

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.