[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [RFC Patch v2 35/45] blktap2: connect to backup asynchronously



tapdisk2 is a single thread process. If we use remus,
we will block in primary_blocking_connect(). The
user will not have any chance to talk with tapdisk2.
So we should connect to backup asynchronously.
Before the connection is established, we queue
all I/O request, and handle it when the connection
is established.

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
 tools/blktap2/drivers/block-remus.c | 760 +++++++++++++++++++++++-------------
 1 file changed, 479 insertions(+), 281 deletions(-)

diff --git a/tools/blktap2/drivers/block-remus.c 
b/tools/blktap2/drivers/block-remus.c
index d358b44..c21f851 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -63,10 +63,28 @@
 #define RAMDISK_HASHSIZE 128
 
 /* connect retry timeout (seconds) */
-#define REMUS_CONNRETRY_TIMEOUT 10
+#define REMUS_CONNRETRY_TIMEOUT 1
 
 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
 
+#define UNREGISTER_EVENT(id)                                   \
+       do {                                                    \
+               if (id >= 0) {                                  \
+                       tapdisk_server_unregister_event(id);    \
+                       id = -1;                                \
+               }                                               \
+       } while (0)
+
+#define CLOSE_FD(fd)                   \
+       do {                            \
+               if (fd >= 0) {          \
+                       close(fd);      \
+                       fd = -1;        \
+               }                       \
+       } while (0)
+
+#define MAX_REMUS_REQUEST       TAPDISK_DATA_REQUESTS
+
 enum tdremus_mode {
        mode_invalid = 0,
        mode_unprotected,
@@ -74,17 +92,21 @@ enum tdremus_mode {
        mode_backup
 };
 
+enum {
+       ERROR_INTERNAL = -1,
+       ERROR_IO = -2,
+       ERROR_CONNECTION = -3,
+};
+
 struct tdremus_req {
-       uint64_t sector;
-       int nb_sectors;
-       char buf[4096];
+       td_request_t treq;
 };
 
 struct req_ring {
        /* waste one slot to distinguish between empty and full */
-       struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
-       unsigned int head;
-       unsigned int tail;
+       struct tdremus_req pending_requests[MAX_REMUS_REQUEST + 1];
+       unsigned int prod;
+       unsigned int cons;
 };
 
 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
@@ -144,10 +166,21 @@ struct ramdisk_write_cbdata {
 
 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
 
-/* poll_fd type for blktap2 fd system. taken from block_log.c */
+/*
+ * If cid, rid and wid are -1, fd must be -1. It means that
+ * we are in unpritected mode or we don't start to connect
+ * to backup.
+ * If fd is an valid fd:
+ *  cid is valid, rid and wid must be invalid. It means that
+ *      the connection is in progress.
+ *  cid is invalid. rid or wid must be valid. It means that
+ *      the connection is established.
+ */
 typedef struct poll_fd {
        int        fd;
-       event_id_t id;
+       event_id_t cid;
+       event_id_t rid;
+       event_id_t wid;
 } poll_fd_t;
 
 struct tdremus_state {
@@ -166,8 +199,11 @@ struct tdremus_state {
        poll_fd_t server_fd;    /* server listen port */
        poll_fd_t stream_fd;     /* replication channel */
 
-       /* queue write requests, batch-replicate at submit */
-       struct req_ring write_ring;
+       /*
+        * queue I/O requests, batch-replicate when
+        * the connection is established.
+        */
+       struct req_ring queued_io;
 
        /* ramdisk data*/
        struct ramdisk ramdisk;
@@ -207,11 +243,13 @@ static int tdremus_close(td_driver_t *driver);
 
 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
 static int ctl_respond(struct tdremus_state *s, const char *response);
+static int ctl_register(struct tdremus_state *s);
+static void ctl_unregister(struct tdremus_state *s);
 
 /* ring functions */
-static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+static inline unsigned int ring_next(unsigned int pos)
 {
-       if (++pos >= MAX_REQUESTS * 2 + 1)
+       if (++pos >= MAX_REMUS_REQUEST + 1)
                return 0;
 
        return pos;
@@ -219,13 +257,26 @@ static inline unsigned int ring_next(struct req_ring* 
ring, unsigned int pos)
 
 static inline int ring_isempty(struct req_ring* ring)
 {
-       return ring->head == ring->tail;
+       return ring->cons == ring->prod;
 }
 
 static inline int ring_isfull(struct req_ring* ring)
 {
-       return ring_next(ring, ring->tail) == ring->head;
+       return ring_next(ring->prod) == ring->cons;
+}
+
+static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
+{
+       /* If ring is full, it means that tapdisk2 has some bug */
+       if (ring_isfull(ring)) {
+               RPRINTF("OOPS, ring is full\n");
+               exit(1);
+       }
+
+       ring->pending_requests[ring->prod].treq = *treq;
+       ring->prod = ring_next(ring->prod);
 }
+
 /* Prototype declarations */
 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
 
@@ -728,30 +779,39 @@ static int mwrite(int fd, void* buf, size_t len)
 
 static void inline close_stream_fd(struct tdremus_state *s)
 {
-       if (s->stream_fd.fd < 0)
-               return;
 
-       /* XXX: -2 is magic. replace with macro perhaps? */
-       tapdisk_server_unregister_event(s->stream_fd.id);
-       close(s->stream_fd.fd);
-       s->stream_fd.fd = -2;
+       UNREGISTER_EVENT(s->stream_fd.cid);
+       UNREGISTER_EVENT(s->stream_fd.rid);
+       UNREGISTER_EVENT(s->stream_fd.wid);
+
+       /* close the connection */
+       CLOSE_FD(s->stream_fd.fd);
 }
 
 static void close_server_fd(struct tdremus_state *s)
 {
-       if (s->server_fd.fd < 0)
-               return;
-
-       tapdisk_server_unregister_event(s->server_fd.id);
-       s->server_fd.id = -1;
-       close(s->stream_fd.fd);
-       s->stream_fd.fd = -1;
+       UNREGISTER_EVENT(s->server_fd.cid);
+       CLOSE_FD(s->server_fd.fd);
 }
 
 /* primary functions */
 static void remus_client_event(event_id_t, char mode, void *private);
 static void remus_connect_event(event_id_t id, char mode, void *private);
 static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+static int primary_forward_request(struct tdremus_state *s,
+                                  const td_request_t *treq);
+
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdremus_state *s, int rc)
+{
+       close_stream_fd(s);
+       if (rc == ERROR_INTERNAL)
+               RPRINTF("switch to unprotected mode due to internal error");
+       switch_mode(s->tdremus_driver, mode_unprotected);
+}
 
 static int primary_do_connect(struct tdremus_state *state)
 {
@@ -760,281 +820,247 @@ static int primary_do_connect(struct tdremus_state 
*state)
        int rc;
        int flags;
 
-       RPRINTF("client connecting to %s:%d...\n", 
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+       RPRINTF("client connecting to %s:%d...\n",
+               inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
 
        if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
                RPRINTF("could not create client socket: %d\n", errno);
-               return -1;
+               return ERROR_INTERNAL;
        }
+       state->stream_fd.fd = fd;
 
        /* make socket nonblocking */
        if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
                flags = 0;
-       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
-               return -1;
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+               RPRINTF("error setting fd %d to non block mode\n", fd);
+               return ERROR_INTERNAL;
+       }
 
-       /* once we have created the socket and populated the address, we can 
now start
-        * our non-blocking connect. rather than duplicating code we trigger a 
timeout
-        * on the socket fd, which calls out nonblocking connect code
+       /*
+        * once we have created the socket and populated the address,
+        * we can now start our non-blocking connect. rather than
+        * duplicating code we trigger a timeout on the socket fd,
+        * which calls out nonblocking connect code
         */
-       if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, 
remus_retry_connect_event, state)) < 0) {
-               RPRINTF("error registering timeout client connection event 
handler: %s\n", strerror(id));
-               /* TODO: we leak a fd here */
-               return -1;
+       if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+                                              remus_retry_connect_event,
+                                              state)) < 0) {
+               RPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
        }
-       state->stream_fd.fd = fd;
-       state->stream_fd.id = id;
+
+       state->stream_fd.cid = id;
        return 0;
 }
 
-static int primary_blocking_connect(struct tdremus_state *state)
+static int remus_handle_queued_io(struct tdremus_state *s)
 {
-       int fd;
-       int id;
+       struct req_ring *queued_io = &s->queued_io;
+       unsigned int cons;
+       td_request_t *treq;
        int rc;
-       int flags;
-
-       RPRINTF("client connecting to %s:%d...\n", 
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
 
-       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-               RPRINTF("could not create client socket: %d\n", errno);
-               return -1;
-       }
+       while (!ring_isempty(queued_io)) {
+               cons = queued_io->cons;
+               treq = &queued_io->pending_requests[cons].treq;
 
-       do {
-               if ((rc = connect(fd, (struct sockaddr *)&state->sa,
-                   sizeof(state->sa))) < 0)
-               {
-                       if (errno == ECONNREFUSED) {
-                               RPRINTF("connection refused -- retrying in 1 
second\n");
-                               sleep(1);
-                       } else {
-                               RPRINTF("connection failed: %d\n", errno);
-                               close(fd);
-                               return -1;
-                       }
+               if (treq->op == TD_OP_WRITE) {
+                       rc = primary_forward_request(s, treq);
+                       if (rc)
+                               return rc;
                }
-       } while (rc < 0);
-
-       RPRINTF("client connected\n");
-
-       /* make socket nonblocking */
-       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
-               flags = 0;
-       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
-       {
-               RPRINTF("error making socket nonblocking\n");
-               close(fd);
-               return -1;
-       }
 
-       if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, 
remus_client_event, state)) < 0) {
-               RPRINTF("error registering client event handler: %s\n", 
strerror(id));
-               close(fd);
-               return -1;
+               td_forward_request(*treq);
+               queued_io->cons = ring_next(cons);
        }
 
-       state->stream_fd.fd = fd;
-       state->stream_fd.id = id;
        return 0;
 }
 
-/* on read, just pass request through */
-static void primary_queue_read(td_driver_t *driver, td_request_t treq)
-{
-       /* just pass read through */
-       td_forward_request(treq);
-}
-
-/* TODO:
- * The primary uses mwrite() to write the contents of a write request to the
- * backup. This effectively blocks until all data has been copied into a system
- * buffer or a timeout has occured. We may wish to instead use tapdisk's
- * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
- * and write data in an asynchronous fashion.
- */
-static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+static int remus_connection_done(struct tdremus_state *s)
 {
-       struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
-       char header[sizeof(uint32_t) + sizeof(uint64_t)];
-       uint32_t *sectors = (uint32_t *)header;
-       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+       event_id_t id;
 
-       // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+       /* the connect succeeded */
+       /* unregister this function and register a new event handler */
+       tapdisk_server_unregister_event(s->stream_fd.cid);
+       s->stream_fd.cid = -1;
 
-       /* -1 means we haven't connected yet, -2 means the connection was lost 
*/
-       if(s->stream_fd.fd == -1) {
-               RPRINTF("connecting to backup...\n");
-               primary_blocking_connect(s);
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->stream_fd.fd,
+                                          0, remus_client_event, s);
+       if(id < 0) {
+               RPRINTF("error registering client event handler: %s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
        }
+       s->stream_fd.rid = id;
 
-       *sectors = treq.secs;
-       *sector = treq.sec;
-
-       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
-               goto fail;
-       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
-               goto fail;
+       /* handle the queued requests */
+       return remus_handle_queued_io(s);
+}
 
-       if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * 
driver->info.sector_size) < 0)
-               goto fail;
+static int remus_retry_connect(struct tdremus_state *s)
+{
+       event_id_t id;
 
-       td_forward_request(treq);
+       tapdisk_server_unregister_event(s->stream_fd.cid);
+       s->stream_fd.cid = -1;
 
-       return;
+       RPRINTF("connect to backup 1 second later");
+       id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+                                          s->stream_fd.fd,
+                                          REMUS_CONNRETRY_TIMEOUT,
+                                          remus_retry_connect_event, s);
+       if (id < 0) {
+               RPRINTF("error registering timeout client connection event 
handler: %s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
+       }
 
- fail:
-       /* switch to unprotected mode and tell tapdisk to retry */
-       RPRINTF("write request replication failed, switching to unprotected 
mode");
-       switch_mode(s->tdremus_driver, mode_unprotected);
-       td_complete_request(treq, -EBUSY);
+       s->stream_fd.cid = id;
+       return 0;
 }
 
-
-/* It is called when the user writes "flush" to control file */
-static int client_flush(td_driver_t *driver)
+static int remus_wait_connect_done(struct tdremus_state *s)
 {
-       struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
-       // RPRINTF("committing output\n");
+       event_id_t id;
 
-       if (s->stream_fd.fd == -1)
-               /* connection not yet established, nothing to flush */
-               return 0;
+       tapdisk_server_unregister_event(s->stream_fd.cid);
+       s->stream_fd.cid = -1;
 
-       if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
-           strlen(TDREMUS_COMMIT)) < 0) {
-               RPRINTF("error flushing output");
-               close_stream_fd(s);
-               return -1;
+       id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+                                          s->stream_fd.fd, 0,
+                                          remus_connect_event, s);
+       if (id < 0) {
+               RPRINTF("error registering client connection event handler: 
%s\n",
+                       strerror(id));
+               return ERROR_INTERNAL;
        }
+       s->stream_fd.cid = id;
 
        return 0;
 }
 
-static int server_flush(td_driver_t *driver)
+/* return 1 if we need to reconnect to backup */
+static int check_connect_errno(int err)
 {
-       struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       /* 
-        * Nothing to flush in beginning.
+       /*
+        * The fd is non-block, so we will not get ETIMEDOUT
+        * after calling connect(). We only can get this errno
+        * by getsockopt().
         */
-       if (!s->ramdisk.prev)
-               return 0;
-       /* Try to flush any remaining requests */
-       return ramdisk_flush(driver, s);        
-}
-
-static int primary_start(td_driver_t *driver)
-{
-       struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
-       RPRINTF("activating client mode\n");
-
-       tapdisk_remus.td_queue_read = primary_queue_read;
-       tapdisk_remus.td_queue_write = primary_queue_write;
-
-       s->stream_fd.fd = -1;
-       s->stream_fd.id = -1;
+       if (err == ECONNREFUSED || err == ENETUNREACH ||
+           err == EAGAIN || err == ECONNABORTED ||
+           err == ETIMEDOUT)
+           return 1;
 
        return 0;
 }
 
-/* timeout callback */
 static void remus_retry_connect_event(event_id_t id, char mode, void *private)
 {
        struct tdremus_state *s = (struct tdremus_state *)private;
+       int rc, ret;
 
        /* do a non-blocking connect */
-       if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
-           && errno != EINPROGRESS)
-       {
-               if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == 
EAGAIN || errno == ECONNABORTED)
-               {
-                       /* try again in a second */
-                       tapdisk_server_unregister_event(s->stream_fd.id);
-                       if((id = 
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, 
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
-                               RPRINTF("error registering timeout client 
connection event handler: %s\n", strerror(id));
-                               return;
-                       }
-                       s->stream_fd.id = id;
-               }
-               else
-               {
-                       /* not recoverable */
-                       RPRINTF("error connection to server %s\n", 
strerror(errno));
+       ret = connect(s->stream_fd.fd,
+                     (struct sockaddr *)&s->sa,
+                     sizeof(s->sa));
+       if (ret) {
+               if (errno == EINPROGRESS) {
+                       /*
+                        * the connect returned EINPROGRESS (nonblocking
+                        * connect) we must wait for the fd to be writeable
+                        * to determine if the connect worked
+                        */
+                       rc = remus_wait_connect_done(s);
+                       if (rc)
+                               goto fail;
                        return;
                }
-       }
-       else
-       {
-               /* the connect returned EINPROGRESS (nonblocking connect) we 
must wait for the fd to be writeable to determine if the connect worked */
 
-               tapdisk_server_unregister_event(s->stream_fd.id);
-               if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, 
s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
-                       RPRINTF("error registering client connection event 
handler: %s\n", strerror(id));
+               if (check_connect_errno(errno)) {
+                       rc = remus_retry_connect(s);
+                       if (rc)
+                               goto fail;
                        return;
                }
-               s->stream_fd.id = id;
+
+               /* not recoverable */
+               RPRINTF("error connection to server %s\n", strerror(errno));
+               rc = ERROR_CONNECTION;
+               goto fail;
        }
+
+       /* The connection is established unexpectedly */
+       rc = remus_connection_done(s);
+       if (rc)
+               goto fail;
+
+       return;
+
+fail:
+       primary_failed(s, rc);
+       return;
 }
 
 /* callback when nonblocking connect() is finished */
-/* called only by primary in unprotected state */
 static void remus_connect_event(event_id_t id, char mode, void *private)
 {
        int socket_errno;
        socklen_t socket_errno_size;
        struct tdremus_state *s = (struct tdremus_state *)private;
+       int rc;
 
-       /* check to se if the connect succeeded */
+       /* check to see if the connect succeeded */
        socket_errno_size = sizeof(socket_errno);
-       if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, 
&socket_errno_size)) {
+       if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
+                      &socket_errno, &socket_errno_size)) {
                RPRINTF("error getting socket errno\n");
                return;
        }
 
        RPRINTF("socket connect returned %d\n", socket_errno);
 
-       if(socket_errno)
-       {
+       if (socket_errno) {
                /* the connect did not succeed */
+               if (check_connect_errno(socket_errno)) {
+                       /*
+                        * we can probably assume that the backup is down.
+                        * just try again later
+                        */
+                       rc = remus_retry_connect(s);
+                       if (rc)
+                               goto fail;
 
-               if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH 
|| socket_errno == ETIMEDOUT
-                  || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
-               {
-                       /* we can probably assume that the backup is down. just 
try again later */
-                       tapdisk_server_unregister_event(s->stream_fd.id);
-                       if((id = 
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, 
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
-                               RPRINTF("error registering timeout client 
connection event handler: %s\n", strerror(id));
-                               return;
-                       }
-                       s->stream_fd.id = id;
-               }
-               else
-               {
-                       RPRINTF("socket connect returned %d, giving up\n", 
socket_errno);
-               }
-       }
-       else
-       {
-               /* the connect succeeded */
-
-               /* unregister this function and register a new event handler */
-               tapdisk_server_unregister_event(s->stream_fd.id);
-               if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
-                       RPRINTF("error registering client event handler: %s\n", 
strerror(id));
                        return;
+               } else {
+                       RPRINTF("socket connect returned %d, giving up\n",
+                               socket_errno);
+                       rc = ERROR_CONNECTION;
+                       goto fail;
                }
-               s->stream_fd.id = id;
 
-               /* switch from unprotected to protected client */
-               switch_mode(s->tdremus_driver, mode_primary);
+               return;
        }
+
+       rc = remus_connection_done(s);
+       if (rc)
+               goto fail;
+
+       return;
+
+fail:
+       primary_failed(s, rc);
 }
 
 
-/* we install this event handler on the primary once we have connected to the 
backup */
+/*
+ * we install this event handler on the primary once we have
+ * connected to the backup.
+ */
 /* wait for "done" message to commit checkpoint */
 static void remus_client_event(event_id_t id, char mode, void *private)
 {
@@ -1043,9 +1069,12 @@ static void remus_client_event(event_id_t id, char mode, 
void *private)
        int rc;
 
        if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
-               /* replication stream closed or otherwise broken (timeout, 
reset, &c) */
+               /*
+                * replication stream closed or otherwise broken
+                * (timeout, reset, &c)
+                */
                RPRINTF("error reading from backup\n");
-               close_stream_fd(s);
+               primary_failed(s, ERROR_IO);
                return;
        }
 
@@ -1056,22 +1085,169 @@ static void remus_client_event(event_id_t id, char 
mode, void *private)
                ctl_respond(s, TDREMUS_DONE);
        else {
                RPRINTF("received unknown message: %s\n", req);
-               close_stream_fd(s);
+               primary_failed(s, ERROR_IO);
+       }
+
+       return;
+}
+
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       struct req_ring *ring = &s->queued_io;
+
+       if (ring_isempty(ring)) {
+               /* just pass read through */
+               td_forward_request(treq);
+               return;
+       }
+
+       ring_add_request(ring, &treq);
+}
+
+static int primary_forward_request(struct tdremus_state *s,
+                                  const td_request_t *treq)
+{
+       char header[sizeof(uint32_t) + sizeof(uint64_t)];
+       uint32_t *sectors = (uint32_t *)header;
+       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+       td_driver_t *driver = s->tdremus_driver;
+
+       *sectors = treq->secs;
+       *sector = treq->sec;
+
+       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+               return ERROR_IO;
+
+       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+               return ERROR_IO;
+
+       if (mwrite(s->stream_fd.fd, treq->buf,
+           treq->secs * driver->info.sector_size) < 0)
+               return ERROR_IO;
+
+       return 0;
+}
+
+/* TODO:
+ * The primary uses mwrite() to write the contents of a write request to the
+ * backup. This effectively blocks until all data has been copied into a system
+ * buffer or a timeout has occured. We may wish to instead use tapdisk's
+ * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
+ * and write data in an asynchronous fashion.
+ */
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int rc;
+
+       // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+
+       if(s->stream_fd.fd < 0) {
+               RPRINTF("connecting to backup...\n");
+               rc = primary_do_connect(s);
+               if (rc)
+                       goto fail;
+       }
+
+       /* The connection is not established, just queue the request */
+       if (s->stream_fd.cid >= 0) {
+               ring_add_request(&s->queued_io, &treq);
+               return;
        }
 
+       /* The connection is established */
+       rc = primary_forward_request(s, &treq);
+       if (rc)
+               goto fail;
+
+       td_forward_request(treq);
+
        return;
+
+fail:
+       /* switch to unprotected mode and forward the request */
+       RPRINTF("write request replication failed, switching to unprotected 
mode");
+       primary_failed(s, rc);
+       td_forward_request(treq);
+}
+
+/* It is called when the user write "flush" to control file. */
+static int client_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       // RPRINTF("committing output\n");
+
+       if (s->stream_fd.fd == -1)
+               /* connection not yet established, nothing to flush */
+               return 0;
+
+       if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
+           strlen(TDREMUS_COMMIT)) < 0) {
+               RPRINTF("error flushing output");
+               primary_failed(s, ERROR_IO);
+               return -1;
+       }
+
+       return 0;
+}
+
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = driver->data;
+       struct req_ring *ring = &s->queued_io;
+       unsigned int cons;
+
+       if (ring_isempty(ring))
+               return 0;
+
+       while (!ring_isempty(ring)) {
+               cons = ring->cons;
+               ring->cons = ring_next(cons);
+
+               td_forward_request(ring->pending_requests[cons].treq);
+       }
+
+       return 0;
+}
+
+static int primary_start(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       RPRINTF("activating client mode\n");
+
+       tapdisk_remus.td_queue_read = primary_queue_read;
+       tapdisk_remus.td_queue_write = primary_queue_write;
+       s->queue_flush = primary_flush;
+
+       s->stream_fd.fd = -1;
+       s->stream_fd.cid = -1;
+       s->stream_fd.rid = -1;
+       s->stream_fd.wid = -1;
+
+       return 0;
 }
 
 /* backup functions */
 static void remus_server_event(event_id_t id, char mode, void *private);
 
+/* It is called when we find some I/O error */
+static void backup_failed(struct tdremus_state *s, int rc)
+{
+       close_stream_fd(s);
+       close_server_fd(s);
+       /* We will switch to unprotected mode in backup_queue_write() */
+}
+
 /* returns the socket that receives write requests */
 static void remus_server_accept(event_id_t id, char mode, void* private)
 {
        struct tdremus_state* s = (struct tdremus_state *) private;
 
        int stream_fd;
-       event_id_t cid;
 
        /* XXX: add address-based black/white list */
        if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
@@ -1079,68 +1255,80 @@ static void remus_server_accept(event_id_t id, char 
mode, void* private)
                return;
        }
 
-       /* TODO: check to see if we are already replicating. if so just close 
the
-        * connection (or do something smarter) */
+       /*
+        * TODO: check to see if we are already replicating.
+        * if so just close the connection (or do something
+        * smarter)
+        */
        RPRINTF("server accepted connection\n");
 
        /* add tapdisk event for replication stream */
-       cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 
0,
-                                           remus_server_event, s);
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+                                          remus_server_event, s);
 
-       if(cid < 0) {
-               RPRINTF("error registering connection event handler: %s\n", 
strerror(errno));
+       if (id < 0) {
+               RPRINTF("error registering connection event handler: %s\n",
+                       strerror(errno));
                close(stream_fd);
                return;
        }
 
        /* store replication file descriptor */
        s->stream_fd.fd = stream_fd;
-       s->stream_fd.id = cid;
+       s->stream_fd.rid = id;
 }
 
 /* returns -2 if EADDRNOTAVAIL */
 static int remus_bind(struct tdremus_state* s)
 {
-//  struct sockaddr_in sa;
        int opt;
        int rc = -1;
+       event_id_t id;
 
        if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
                RPRINTF("could not create server socket: %d\n", errno);
                return rc;
        }
-       opt = 1;
-       if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, 
sizeof(opt)) < 0)
-               RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, 
errno);
 
-       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 
0) {
-               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", 
s->server_fd.fd,
-                       inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), 
errno, strerror(errno));
-               if (errno != EADDRINUSE)
+       opt = 1;
+       if (setsockopt(s->server_fd.fd, SOL_SOCKET,
+                      SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+               RPRINTF("Error setting REUSEADDR on %d: %d\n",
+                       s->server_fd.fd, errno);
+
+       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
+                sizeof(s->sa)) < 0) {
+               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+                       s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
+                       ntohs(s->sa.sin_port), errno, strerror(errno));
+               if (errno == EADDRNOTAVAIL)
                        rc = -2;
                goto err_sfd;
        }
+
        if (listen(s->server_fd.fd, 10)) {
                RPRINTF("could not listen on socket: %d\n", errno);
                goto err_sfd;
        }
 
-       /* The socket s now bound to the address and listening so we may now 
register
-   * the fd with tapdisk */
-
-       if((s->server_fd.id = 
tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
-                                                           s->server_fd.fd, 0,
-                                                           
remus_server_accept, s)) < 0) {
+       /*
+        * The socket s now bound to the address and listening so we
+        * may now register the fd with tapdisk
+        */
+       id =  tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                           s->server_fd.fd, 0,
+                                           remus_server_accept, s);
+       if (id < 0) {
                RPRINTF("error registering server connection event handler: %s",
-                       strerror(s->server_fd.id));
+                       strerror(id));
                goto err_sfd;
        }
+       s->server_fd.cid = id;
 
        return 0;
 
- err_sfd:
-       close(s->server_fd.fd);
-       s->server_fd.fd = -1;
+err_sfd:
+       CLOSE_FD(s->server_fd.fd);
 
        return rc;
 }
@@ -1190,10 +1378,21 @@ void backup_queue_write(td_driver_t *driver, 
td_request_t treq)
        td_complete_request(treq, -EBUSY);
 }
 
+static int server_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       /*
+        * Nothing to flush in beginning.
+        */
+       if (!s->ramdisk.prev)
+               return 0;
+       /* Try to flush any remaining requests */
+       return ramdisk_flush(driver, s);
+}
+
 static int backup_start(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       int fd;
 
        if (ramdisk_start(driver) < 0)
                return -1;
@@ -1201,16 +1400,15 @@ static int backup_start(td_driver_t *driver)
        tapdisk_remus.td_queue_read = backup_queue_read;
        tapdisk_remus.td_queue_write = backup_queue_write;
        s->queue_flush = server_flush;
-       /* TODO set flush function */
        return 0;
 }
 
-static int server_do_wreq(td_driver_t *driver)
+static void server_do_wreq(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
        static tdremus_wire_t twreq;
        char buf[4096];
-       int len, rc;
+       int len, rc = ERROR_IO;
 
        char header[sizeof(uint32_t) + sizeof(uint64_t)];
        uint32_t *sectors = (uint32_t *) header;
@@ -1227,39 +1425,40 @@ static int server_do_wreq(td_driver_t *driver)
        // *sector);
 
        if (len > sizeof(buf)) {
-               /* freak out! */
-               RPRINTF("write request too large: %d/%u\n", len, 
(unsigned)sizeof(buf));
-               return -1;
+               /* freak out! How to handle the remaining data from primary */
+               RPRINTF("write request too large: %d/%u\n",
+                       len, (unsigned)sizeof(buf));
+               goto err;
        }
 
        if (mread(s->stream_fd.fd, buf, len) < 0)
                goto err;
 
-       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+               rc = ERROR_INTERNAL;
                goto err;
+       }
 
-       return 0;
+       return;
 
  err:
        /* should start failover */
        RPRINTF("backup write request error\n");
-       close_stream_fd(s);
-
-       return -1;
+       backup_failed(s, rc);
 }
 
-static int server_do_sreq(td_driver_t *driver)
+static void server_do_sreq(td_driver_t *driver)
 {
        /*
          RPRINTF("submit request received\n");
   */
 
-       return 0;
+       return;
 }
 
 /* at this point, the server can start applying the most recent
  * ramdisk. */
-static int server_do_creq(td_driver_t *driver)
+static void server_do_creq(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
@@ -1269,9 +1468,7 @@ static int server_do_creq(td_driver_t *driver)
 
        /* XXX this message should not be sent until flush completes! */
        if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
-               return -1;
-
-       return 0;
+               backup_failed(s, ERROR_IO);
 }
 
 
@@ -1356,10 +1553,6 @@ static int unprotected_start(td_driver_t *driver)
 
        RPRINTF("failure detected, activating passthrough\n");
 
-       /* close the server socket */
-       close_stream_fd(s);
-
-       close_server_fd(s);
 
        /* install the unprotected read/write handlers */
        tapdisk_remus.td_queue_read = unprotected_queue_read;
@@ -1486,6 +1679,19 @@ static int switch_mode(td_driver_t *driver, enum 
tdremus_mode mode)
        return rc;
 }
 
+static void ctl_reopen(struct tdremus_state *s)
+{
+       ctl_unregister(s);
+       CLOSE_FD(s->ctl_fd.fd);
+       RPRINTF("FIFO closed\n");
+
+       if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+               RPRINTF("error reopening FIFO: %d\n", errno);
+               return;
+       }
+       ctl_register(s);
+}
+
 static void ctl_request(event_id_t id, char mode, void *private)
 {
        struct tdremus_state *s = (struct tdremus_state *)private;
@@ -1497,12 +1703,6 @@ static void ctl_request(event_id_t id, char mode, void 
*private)
 
        if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
                RPRINTF("0-byte read received, reopening FIFO\n");
-               /*TODO: we may have to unregister/re-register with 
tapdisk_server */
-               close(s->ctl_fd.fd);
-               RPRINTF("FIFO closed\n");
-               if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
-                       RPRINTF("error reopening FIFO: %d\n", errno);
-               }
                return;
        }
 
@@ -1641,10 +1841,11 @@ static int ctl_register(struct tdremus_state *s)
        RPRINTF("registering ctl fifo\n");
 
        /* register ctl fd */
-       s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
+       s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
 
-       if (s->ctl_fd.id < 0) {
-               RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, 
s->ctl_fd.id);
+       if (s->ctl_fd.cid < 0) {
+               RPRINTF("error registering ctrl FIFO %s: %d\n",
+                       s->ctl_path, s->ctl_fd.cid);
                return -1;
        }
 
@@ -1655,10 +1856,7 @@ static void ctl_unregister(struct tdremus_state *s)
 {
        RPRINTF("unregistering ctl fifo\n");
 
-       if (s->ctl_fd.id >= 0) {
-               tapdisk_server_unregister_event(s->ctl_fd.id);
-               s->ctl_fd.id = -1;
-       }
+       UNREGISTER_EVENT(s->ctl_fd.cid);
 }
 
 /* interface */
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.