[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH 13/17] tools: block-remus: connect to backup asynchronously



Use the API to connect to backup asynchronously.
Before the connection is established, we queue
all I/O requests, and handle them when the connection
is established.

Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
---
 tools/blktap2/drivers/block-remus.c       | 508 +++++++++++++-----------------
 tools/blktap2/drivers/block-replication.h |   1 +
 2 files changed, 221 insertions(+), 288 deletions(-)

diff --git a/tools/blktap2/drivers/block-remus.c 
b/tools/blktap2/drivers/block-remus.c
index e5ad782..a2b9f62 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -40,6 +40,7 @@
 #include "hashtable.h"
 #include "hashtable_itr.h"
 #include "hashtable_utility.h"
+#include "block-replication.h"
 
 #include <errno.h>
 #include <inttypes.h>
@@ -49,10 +50,7 @@
 #include <string.h>
 #include <sys/time.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
 #include <netinet/in.h>
-#include <arpa/inet.h>
 #include <sys/param.h>
 #include <sys/sysctl.h>
 #include <unistd.h>
@@ -63,10 +61,12 @@
 #define RAMDISK_HASHSIZE 128
 
 /* connect retry timeout (seconds) */
-#define REMUS_CONNRETRY_TIMEOUT 10
+#define REMUS_CONNRETRY_TIMEOUT 1
 
 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
 
+#define MAX_REMUS_REQUESTS      TAPDISK_DATA_REQUESTS
+
 enum tdremus_mode {
        mode_invalid = 0,
        mode_unprotected,
@@ -75,16 +75,14 @@ enum tdremus_mode {
 };
 
 struct tdremus_req {
-       uint64_t sector;
-       int nb_sectors;
-       char buf[4096];
+       td_request_t treq;
 };
 
 struct req_ring {
        /* waste one slot to distinguish between empty and full */
-       struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
-       unsigned int head;
-       unsigned int tail;
+       struct tdremus_req pending_requests[MAX_REMUS_REQUESTS + 1];
+       unsigned int prod;
+       unsigned int cons;
 };
 
 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
@@ -161,13 +159,14 @@ struct tdremus_state {
        char*     msg_path; /* output completion message here */
        poll_fd_t msg_fd;
 
-  /* replication host */
-       struct sockaddr_in sa;
-       poll_fd_t server_fd;    /* server listen port */
+       td_replication_connect_t t;
        poll_fd_t stream_fd;     /* replication channel */
 
-       /* queue write requests, batch-replicate at submit */
-       struct req_ring write_ring;
+       /*
+        * queue I/O requests, batch-replicate when
+        * the connection is established.
+        */
+       struct req_ring queued_io;
 
        /* ramdisk data*/
        struct ramdisk ramdisk;
@@ -206,11 +205,13 @@ static int tdremus_close(td_driver_t *driver);
 
 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
 static int ctl_respond(struct tdremus_state *s, const char *response);
+static int ctl_register(struct tdremus_state *s);
+static void ctl_unregister(struct tdremus_state *s);
 
 /* ring functions */
-static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+static inline unsigned int ring_next(unsigned int pos)
 {
-       if (++pos >= MAX_REQUESTS * 2 + 1)
+       if (++pos >= MAX_REMUS_REQUESTS + 1)
                return 0;
 
        return pos;
@@ -218,13 +219,26 @@ static inline unsigned int ring_next(struct req_ring* 
ring, unsigned int pos)
 
 static inline int ring_isempty(struct req_ring* ring)
 {
-       return ring->head == ring->tail;
+       return ring->cons == ring->prod;
 }
 
 static inline int ring_isfull(struct req_ring* ring)
 {
-       return ring_next(ring, ring->tail) == ring->head;
+       return ring_next(ring->prod) == ring->cons;
 }
+
+static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
+{
+       /* If ring is full, it means that tapdisk2 has some bug */
+       if (ring_isfull(ring)) {
+               RPRINTF("OOPS, ring is full\n");
+               exit(1);
+       }
+
+       ring->pending_requests[ring->prod].treq = *treq;
+       ring->prod = ring_next(ring->prod);
+}
+
 /* Prototype declarations */
 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
 
@@ -724,89 +738,113 @@ static int mwrite(int fd, void* buf, size_t len)
        select(fd + 1, NULL, &wfds, NULL, &tv);
 }
 
-
-static void inline close_stream_fd(struct tdremus_state *s)
-{
-       if (s->stream_fd.fd < 0)
-               return;
-
-       /* XXX: -2 is magic. replace with macro perhaps? */
-       tapdisk_server_unregister_event(s->stream_fd.id);
-       close(s->stream_fd.fd);
-       s->stream_fd.fd = -2;
-}
-
-static void close_server_fd(struct tdremus_state *s)
-{
-       if (s->server_fd.fd < 0)
-               return;
-
-       tapdisk_server_unregister_event(s->server_fd.id);
-       s->server_fd.id = -1;
-       close(s->stream_fd.fd);
-       s->stream_fd.fd = -1;
-}
-
 /* primary functions */
 static void remus_client_event(event_id_t, char mode, void *private);
+static int primary_forward_request(struct tdremus_state *s,
+                                  const td_request_t *treq);
 
-static int primary_blocking_connect(struct tdremus_state *state)
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdremus_state *s, int rc)
 {
-       int fd;
-       int id;
+       td_replication_connect_kill(&s->t);
+       if (rc == ERROR_INTERNAL)
+               RPRINTF("switch to unprotected mode due to internal error");
+       UNREGISTER_EVENT(s->stream_fd.id);
+       switch_mode(s->tdremus_driver, mode_unprotected);
+}
+
+static int remus_handle_queued_io(struct tdremus_state *s)
+{
+       struct req_ring *queued_io = &s->queued_io;
+       unsigned int cons;
+       td_request_t *treq;
        int rc;
-       int flags;
 
-       RPRINTF("client connecting to %s:%d...\n", 
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+       while (!ring_isempty(queued_io)) {
+               cons = queued_io->cons;
+               treq = &queued_io->pending_requests[cons].treq;
 
-       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
-               RPRINTF("could not create client socket: %d\n", errno);
-               return -1;
-       }
-
-       do {
-               if ((rc = connect(fd, (struct sockaddr *)&state->sa,
-                   sizeof(state->sa))) < 0)
-               {
-                       if (errno == ECONNREFUSED) {
-                               RPRINTF("connection refused -- retrying in 1 
second\n");
-                               sleep(1);
-                       } else {
-                               RPRINTF("connection failed: %d\n", errno);
-                               close(fd);
-                               return -1;
-                       }
+               if (treq->op == TD_OP_WRITE) {
+                       rc = primary_forward_request(s, treq);
+                       if (rc)
+                               return rc;
                }
-       } while (rc < 0);
 
-       RPRINTF("client connected\n");
-
-       /* make socket nonblocking */
-       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
-               flags = 0;
-       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
-       {
-               RPRINTF("error making socket nonblocking\n");
-               close(fd);
-               return -1;
+               td_forward_request(*treq);
+               queued_io->cons = ring_next(cons);
        }
 
-       if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, 
remus_client_event, state)) < 0) {
-               RPRINTF("error registering client event handler: %s\n", 
strerror(id));
-               close(fd);
-               return -1;
-       }
-
-       state->stream_fd.fd = fd;
-       state->stream_fd.id = id;
        return 0;
 }
 
-/* on read, just pass request through */
+static void remus_client_established(td_replication_connect_t *t, int rc)
+{
+       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+       event_id_t id;
+
+       if (rc) {
+               primary_failed(s, rc);
+               return;
+       }
+
+       /* the connect succeeded */
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
+                                          0, remus_client_event, s);
+       if(id < 0) {
+               RPRINTF("error registering client event handler: %s\n",
+                       strerror(id));
+               primary_failed(s, ERROR_INTERNAL);
+               return;
+       }
+
+       s->stream_fd.fd = t->fd;
+       s->stream_fd.id = id;
+
+       /* handle the queued requests */
+       rc = remus_handle_queued_io(s);
+       if (rc)
+               primary_failed(s, rc);
+}
+
 static void primary_queue_read(td_driver_t *driver, td_request_t treq)
 {
-       /* just pass read through */
-       td_forward_request(treq);
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       struct req_ring *ring = &s->queued_io;
+
+       if (ring_isempty(ring)) {
+               /* just pass read through */
+               td_forward_request(treq);
+               return;
+       }
+
+       ring_add_request(ring, &treq);
+}
+
+static int primary_forward_request(struct tdremus_state *s,
+                                  const td_request_t *treq)
+{
+       char header[sizeof(uint32_t) + sizeof(uint64_t)];
+       uint32_t *sectors = (uint32_t *)header;
+       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+       td_driver_t *driver = s->tdremus_driver;
+
+       *sectors = treq->secs;
+       *sector = treq->sec;
+
+       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+               return ERROR_IO;
+
+       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+               return ERROR_IO;
+
+       if (mwrite(s->stream_fd.fd, treq->buf,
+           treq->secs * driver->info.sector_size) < 0)
+               return ERROR_IO;
+
+       return 0;
 }
 
 /* TODO:
@@ -819,28 +857,28 @@ static void primary_queue_read(td_driver_t *driver, 
td_request_t treq)
 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
-       char header[sizeof(uint32_t) + sizeof(uint64_t)];
-       uint32_t *sectors = (uint32_t *)header;
-       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+       int rc, ret;
 
        // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
 
-       /* -1 means we haven't connected yet, -2 means the connection was lost 
*/
-       if(s->stream_fd.fd == -1) {
+       ret = td_replication_connect_status(&s->t);
+       if(ret == -1) {
                RPRINTF("connecting to backup...\n");
-               primary_blocking_connect(s);
+               s->t.callback = remus_client_established;
+               rc = td_replication_client_start(&s->t);
+               if (rc)
+                       goto fail;
        }
 
-       *sectors = treq.secs;
-       *sector = treq.sec;
+       /* The connection is not established, just queue the request */
+       if (ret != 1) {
+               ring_add_request(&s->queued_io, &treq);
+               return;
+       }
 
-       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
-               goto fail;
-       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
-               goto fail;
-
-       if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * 
driver->info.sector_size) < 0)
+       /* The connection is established */
+       rc = primary_forward_request(s, &treq);
+       if (rc)
                goto fail;
 
        td_forward_request(treq);
@@ -850,7 +888,7 @@ static void primary_queue_write(td_driver_t *driver, 
td_request_t treq)
  fail:
        /* switch to unprotected mode and tell tapdisk to retry */
        RPRINTF("write request replication failed, switching to unprotected 
mode");
-       switch_mode(s->tdremus_driver, mode_unprotected);
+       primary_failed(s, rc);
        td_complete_request(treq, -EBUSY);
 }
 
@@ -867,7 +905,7 @@ static int client_flush(td_driver_t *driver)
 
        if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 
0) {
                RPRINTF("error flushing output");
-               close_stream_fd(s);
+               primary_failed(s, ERROR_IO);
                return -1;
        }
 
@@ -886,6 +924,26 @@ static int server_flush(td_driver_t *driver)
        return ramdisk_flush(driver, s);        
 }
 
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = driver->data;
+       struct req_ring *ring = &s->queued_io;
+       unsigned int cons;
+
+       if (ring_isempty(ring))
+               return 0;
+
+       while (!ring_isempty(ring)) {
+               cons = ring->cons;
+               ring->cons = ring_next(cons);
+
+               td_forward_request(ring->pending_requests[cons].treq);
+       }
+
+       return client_flush(driver);
+}
+
 static int primary_start(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -894,7 +952,7 @@ static int primary_start(td_driver_t *driver)
 
        tapdisk_remus.td_queue_read = primary_queue_read;
        tapdisk_remus.td_queue_write = primary_queue_write;
-       s->queue_flush = client_flush;
+       s->queue_flush = primary_flush;
 
        s->stream_fd.fd = -1;
        s->stream_fd.id = -1;
@@ -913,7 +971,7 @@ static void remus_client_event(event_id_t id, char mode, 
void *private)
        if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
                /* replication stream closed or otherwise broken (timeout, 
reset, &c) */
                RPRINTF("error reading from backup\n");
-               close_stream_fd(s);
+               primary_failed(s, ERROR_IO);
                return;
        }
 
@@ -924,7 +982,7 @@ static void remus_client_event(event_id_t id, char mode, 
void *private)
                ctl_respond(s, TDREMUS_DONE);
        else {
                RPRINTF("received unknown message: %s\n", req);
-               close_stream_fd(s);
+               primary_failed(s, ERROR_IO);
        }
 
        return;
@@ -933,84 +991,36 @@ static void remus_client_event(event_id_t id, char mode, 
void *private)
 /* backup functions */
 static void remus_server_event(event_id_t id, char mode, void *private);
 
-/* returns the socket that receives write requests */
-static void remus_server_accept(event_id_t id, char mode, void* private)
+/* It is called when we find some I/O error */
+static void backup_failed(struct tdremus_state *s, int rc)
 {
-       struct tdremus_state* s = (struct tdremus_state *) private;
+       UNREGISTER_EVENT(s->stream_fd.id);
+       td_replication_connect_kill(&s->t);
+       /* We will switch to unprotected mode in backup_queue_write() */
+}
 
-       int stream_fd;
-       event_id_t cid;
+/* returns the socket that receives write requests */
+static void remus_server_established(td_replication_connect_t *t, int rc)
+{
+       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
+       event_id_t id;
 
-       /* XXX: add address-based black/white list */
-       if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
-               RPRINTF("error accepting connection: %d\n", errno);
-               return;
-       }
-
-       /* TODO: check to see if we are already replicating. if so just close 
the
-        * connection (or do something smarter) */
-       RPRINTF("server accepted connection\n");
+       /* rc is always 0 */
 
        /* add tapdisk event for replication stream */
-       cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 
0,
-                                           remus_server_event, s);
+       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0,
+                                          remus_server_event, s);
 
-       if(cid < 0) {
-               RPRINTF("error registering connection event handler: %s\n", 
strerror(errno));
-               close(stream_fd);
+       if (id < 0) {
+               RPRINTF("error registering connection event handler: %s\n",
+                       strerror(errno));
+               td_replication_server_restart(t);
                return;
        }
 
        /* store replication file descriptor */
-       s->stream_fd.fd = stream_fd;
-       s->stream_fd.id = cid;
-}
-
-/* returns -2 if EADDRNOTAVAIL */
-static int remus_bind(struct tdremus_state* s)
-{
-//  struct sockaddr_in sa;
-       int opt;
-       int rc = -1;
-
-       if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-               RPRINTF("could not create server socket: %d\n", errno);
-               return rc;
-       }
-       opt = 1;
-       if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, 
sizeof(opt)) < 0)
-               RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, 
errno);
-
-       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 
0) {
-               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", 
s->server_fd.fd,
-                       inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), 
errno, strerror(errno));
-               if (errno != EADDRINUSE)
-                       rc = -2;
-               goto err_sfd;
-       }
-       if (listen(s->server_fd.fd, 10)) {
-               RPRINTF("could not listen on socket: %d\n", errno);
-               goto err_sfd;
-       }
-
-       /* The socket s now bound to the address and listening so we may now 
register
-   * the fd with tapdisk */
-
-       if((s->server_fd.id = 
tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
-                                                           s->server_fd.fd, 0,
-                                                           
remus_server_accept, s)) < 0) {
-               RPRINTF("error registering server connection event handler: %s",
-                       strerror(s->server_fd.id));
-               goto err_sfd;
-       }
-
-       return 0;
-
- err_sfd:
-       close(s->server_fd.fd);
-       s->server_fd.fd = -1;
-
-       return rc;
+       s->stream_fd.fd = t->fd;
+       s->stream_fd.id = id;
 }
 
 /* wait for latest checkpoint to be applied */
@@ -1053,6 +1063,8 @@ void backup_queue_write(td_driver_t *driver, td_request_t 
treq)
         * handle the write
         */
 
+       /* If we have called backup_failed, calling it again is harmless */
+       backup_failed(s, ERROR_INTERNAL);
        switch_mode(driver, mode_unprotected);
        /* TODO: call the appropriate write function rather than return EBUSY */
        td_complete_request(treq, -EBUSY);
@@ -1061,7 +1073,6 @@ void backup_queue_write(td_driver_t *driver, td_request_t 
treq)
 static int backup_start(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
-       int fd;
 
        if (ramdisk_start(driver) < 0)
                return -1;
@@ -1073,12 +1084,12 @@ static int backup_start(td_driver_t *driver)
        return 0;
 }
 
-static int server_do_wreq(td_driver_t *driver)
+static void server_do_wreq(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
        static tdremus_wire_t twreq;
        char buf[4096];
-       int len, rc;
+       int len, rc = ERROR_IO;
 
        char header[sizeof(uint32_t) + sizeof(uint64_t)];
        uint32_t *sectors = (uint32_t *) header;
@@ -1097,28 +1108,28 @@ static int server_do_wreq(td_driver_t *driver)
        if (len > sizeof(buf)) {
                /* freak out! */
                RPRINTF("write request too large: %d/%u\n", len, 
(unsigned)sizeof(buf));
-               return -1;
+               goto err;
        }
 
        if (mread(s->stream_fd.fd, buf, len) < 0)
                goto err;
 
-       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+               rc = ERROR_INTERNAL;
                goto err;
+       }
 
-       return 0;
+       return;
 
  err:
        /* should start failover */
        RPRINTF("backup write request error\n");
-       close_stream_fd(s);
-
-       return -1;
+       backup_failed(s, rc);
 }
 
 /* at this point, the server can start applying the most recent
  * ramdisk. */
-static int server_do_creq(td_driver_t *driver)
+static void server_do_creq(td_driver_t *driver)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
 
@@ -1128,9 +1139,7 @@ static int server_do_creq(td_driver_t *driver)
 
        /* XXX this message should not be sent until flush completes! */
        if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
-               return -1;
-
-       return 0;
+               backup_failed(s, ERROR_IO);
 }
 
 
@@ -1213,11 +1222,6 @@ static int unprotected_start(td_driver_t *driver)
 
        RPRINTF("failure detected, activating passthrough\n");
 
-       /* close the server socket */
-       close_stream_fd(s);
-
-       close_server_fd(s);
-
        /* install the unprotected read/write handlers */
        tapdisk_remus.td_queue_read = unprotected_queue_read;
        tapdisk_remus.td_queue_write = unprotected_queue_write;
@@ -1227,90 +1231,6 @@ static int unprotected_start(td_driver_t *driver)
 
 
 /* control */
-
-static inline int resolve_address(const char* addr, struct in_addr* ia)
-{
-       struct hostent* he;
-       uint32_t ip;
-
-       if (!(he = gethostbyname(addr))) {
-               RPRINTF("error resolving %s: %d\n", addr, h_errno);
-               return -1;
-       }
-
-       if (!he->h_addr_list[0]) {
-               RPRINTF("no address found for %s\n", addr);
-               return -1;
-       }
-
-       /* network byte order */
-       ip = *((uint32_t**)he->h_addr_list)[0];
-       ia->s_addr = ip;
-
-       return 0;
-}
-
-static int get_args(td_driver_t *driver, const char* name)
-{
-       struct tdremus_state *state = (struct tdremus_state *)driver->data;
-       char* host;
-       char* port;
-//  char* driver_str;
-//  char* parent;
-//  int type;
-//  char* path;
-//  unsigned long ulport;
-//  int i;
-//  struct sockaddr_in server_addr_in;
-
-       int gai_status;
-       int valid_addr;
-       struct addrinfo gai_hints;
-       struct addrinfo *servinfo, *servinfo_itr;
-
-       memset(&gai_hints, 0, sizeof gai_hints);
-       gai_hints.ai_family = AF_UNSPEC;
-       gai_hints.ai_socktype = SOCK_STREAM;
-
-       port = strchr(name, ':');
-       if (!port) {
-               RPRINTF("missing host in %s\n", name);
-               return -ENOENT;
-       }
-       if (!(host = strndup(name, port - name))) {
-               RPRINTF("unable to allocate host\n");
-               return -ENOMEM;
-       }
-       port++;
-
-       if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) 
{
-               RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
-               return -ENOENT;
-       }
-
-       /* TODO: do something smarter here */
-       valid_addr = 0;
-       for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = 
servinfo_itr->ai_next) {
-               void *addr;
-               char *ipver;
-
-               if (servinfo_itr->ai_family == AF_INET) {
-                       valid_addr = 1;
-                       memset(&state->sa, 0, sizeof(state->sa));
-                       state->sa = *(struct sockaddr_in 
*)servinfo_itr->ai_addr;
-                       break;
-               }
-       }
-       freeaddrinfo(servinfo);
-
-       if (!valid_addr)
-               return -ENOENT;
-
-       RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
-
-       return 0;
-}
-
 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1343,6 +1263,20 @@ static int switch_mode(td_driver_t *driver, enum 
tdremus_mode mode)
        return rc;
 }
 
+static void ctl_reopen(struct tdremus_state *s)
+{
+       ctl_unregister(s);
+       CLOSE_FD(s->ctl_fd.fd);
+       RPRINTF("FIFO closed\n");
+
+       if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+               RPRINTF("error reopening FIFO: %d\n", errno);
+               return;
+       }
+
+       ctl_register(s);
+}
+
 static void ctl_request(event_id_t id, char mode, void *private)
 {
        struct tdremus_state *s = (struct tdremus_state *)private;
@@ -1355,11 +1289,7 @@ static void ctl_request(event_id_t id, char mode, void 
*private)
        if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
                RPRINTF("0-byte read received, reopening FIFO\n");
                /*TODO: we may have to unregister/re-register with 
tapdisk_server */
-               close(s->ctl_fd.fd);
-               RPRINTF("FIFO closed\n");
-               if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
-                       RPRINTF("error reopening FIFO: %d\n", errno);
-               }
+               ctl_reopen(s);
                return;
        }
 
@@ -1372,7 +1302,7 @@ static void ctl_request(event_id_t id, char mode, void 
*private)
        msg[rc] = '\0';
        if (!strncmp(msg, "flush", 5)) {
                if (s->mode == mode_primary) {
-                       if ((rc = s->queue_flush(driver))) {
+                       if ((rc = client_flush(driver))) {
                                RPRINTF("error passing flush request to 
backup");
                                ctl_respond(s, TDREMUS_FAIL);
                        }
@@ -1521,6 +1451,7 @@ static void ctl_unregister(struct tdremus_state *s)
 static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid)
 {
        struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       td_replication_connect_t *t = &s->t;
        int rc;
        const char *name = image->name;
        td_flag_t flags = image->flags;
@@ -1531,7 +1462,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
        remus_image = image;
 
        memset(s, 0, sizeof(*s));
-       s->server_fd.fd = -1;
        s->stream_fd.fd = -1;
        s->ctl_fd.fd = -1;
        s->msg_fd.fd = -1;
@@ -1540,8 +1470,11 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
         * the driver stack from the stream_fd event handler */
        s->tdremus_driver = driver;
 
-       /* parse name to get info etc */
-       if ((rc = get_args(driver, name)))
+       t->log_prefix = "remus";
+       t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
+       t->max_connections = 10;
+       t->callback = remus_server_established;
+       if ((rc = td_replication_connect_init(t, name)))
                return rc;
 
        if ((rc = ctl_open(driver, name))) {
@@ -1555,7 +1488,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t 
*image, td_uuid_t uuid)
                return rc;
        }
 
-       if (!(rc = remus_bind(s)))
+       if (!(rc = td_replication_server_start(t)))
                rc = switch_mode(driver, mode_backup);
        else if (rc == -2)
                rc = switch_mode(driver, mode_primary);
@@ -1575,8 +1508,7 @@ static int tdremus_close(td_driver_t *driver)
        if (s->ramdisk.inprogress)
                hashtable_destroy(s->ramdisk.inprogress, 0);
 
-       close_server_fd(s);
-       close_stream_fd(s);
+       td_replication_connect_kill(&s->t);
        ctl_unregister(s);
        ctl_close(s);
 
diff --git a/tools/blktap2/drivers/block-replication.h 
b/tools/blktap2/drivers/block-replication.h
index 9e051cc..07fd630 100644
--- a/tools/blktap2/drivers/block-replication.h
+++ b/tools/blktap2/drivers/block-replication.h
@@ -48,6 +48,7 @@
 enum {
        ERROR_INTERNAL = -1,
        ERROR_CONNECTION = -2,
+       ERROR_IO = -3,
 };
 
 typedef struct td_replication_connect td_replication_connect_t;
-- 
1.9.3


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.