[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Xen-devel] [PATCH 13/17] tools: block-remus: connect to backup asynchronously



On 10/20/2014 10:50 AM, Shriram Rajagopalan wrote:
> On Oct 13, 2014 10:13 PM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
>>
>> Use the API to connect to backup asynchronously.
>> Before the connection is established, we queue
>> all I/O requests, and handle them when the connection
>> is established.
>>
>> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
>> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
>> ---
>>  tools/blktap2/drivers/block-remus.c       | 508
> +++++++++++++-----------------
>>  tools/blktap2/drivers/block-replication.h |   1 +
>>  2 files changed, 221 insertions(+), 288 deletions(-)
>>
>> diff --git a/tools/blktap2/drivers/block-remus.c
> b/tools/blktap2/drivers/block-remus.c
>> index e5ad782..a2b9f62 100644
>> --- a/tools/blktap2/drivers/block-remus.c
>> +++ b/tools/blktap2/drivers/block-remus.c
>> @@ -40,6 +40,7 @@
>>  #include "hashtable.h"
>>  #include "hashtable_itr.h"
>>  #include "hashtable_utility.h"
>> +#include "block-replication.h"
>>
>>  #include <errno.h>
>>  #include <inttypes.h>
>> @@ -49,10 +50,7 @@
>>  #include <string.h>
>>  #include <sys/time.h>
>>  #include <sys/types.h>
>> -#include <sys/socket.h>
>> -#include <netdb.h>
>>  #include <netinet/in.h>
>> -#include <arpa/inet.h>
>>  #include <sys/param.h>
>>  #include <sys/sysctl.h>
>>  #include <unistd.h>
>> @@ -63,10 +61,12 @@
>>  #define RAMDISK_HASHSIZE 128
>>
>>  /* connect retry timeout (seconds) */
>> -#define REMUS_CONNRETRY_TIMEOUT 10
>> +#define REMUS_CONNRETRY_TIMEOUT 1
>>
>>  #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
>>
>> +#define MAX_REMUS_REQUESTS      TAPDISK_DATA_REQUESTS
>> +
>>  enum tdremus_mode {
>>         mode_invalid = 0,
>>         mode_unprotected,
>> @@ -75,16 +75,14 @@ enum tdremus_mode {
>>  };
>>
>>  struct tdremus_req {
>> -       uint64_t sector;
>> -       int nb_sectors;
>> -       char buf[4096];
>> +       td_request_t treq;
>>  };
>>
>>  struct req_ring {
>>         /* waste one slot to distinguish between empty and full */
>> -       struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
>> -       unsigned int head;
>> -       unsigned int tail;
>> +       struct tdremus_req pending_requests[MAX_REMUS_REQUESTS + 1];
>> +       unsigned int prod;
>> +       unsigned int cons;
>>  };
>>
>>  /* TODO: This isn't very pretty, but to properly generate our own treqs
> (needed
>> @@ -161,13 +159,14 @@ struct tdremus_state {
>>         char*     msg_path; /* output completion message here */
>>         poll_fd_t msg_fd;
>>
>> -  /* replication host */
>> -       struct sockaddr_in sa;
>> -       poll_fd_t server_fd;    /* server listen port */
>> +       td_replication_connect_t t;
>>         poll_fd_t stream_fd;     /* replication channel */
>>
>> -       /* queue write requests, batch-replicate at submit */
>> -       struct req_ring write_ring;
>> +       /*
>> +        * queue I/O requests, batch-replicate when
>> +        * the connection is established.
>> +        */
>> +       struct req_ring queued_io;
>>
>>         /* ramdisk data*/
>>         struct ramdisk ramdisk;
>> @@ -206,11 +205,13 @@ static int tdremus_close(td_driver_t *driver);
>>
>>  static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
>>  static int ctl_respond(struct tdremus_state *s, const char *response);
>> +static int ctl_register(struct tdremus_state *s);
>> +static void ctl_unregister(struct tdremus_state *s);
>>
>>  /* ring functions */
>> -static inline unsigned int ring_next(struct req_ring* ring, unsigned int
> pos)
>> +static inline unsigned int ring_next(unsigned int pos)
>>  {
>> -       if (++pos >= MAX_REQUESTS * 2 + 1)
>> +       if (++pos >= MAX_REMUS_REQUESTS + 1)
>>                 return 0;
>>
>>         return pos;
>> @@ -218,13 +219,26 @@ static inline unsigned int ring_next(struct
> req_ring* ring, unsigned int pos)
>>
>>  static inline int ring_isempty(struct req_ring* ring)
>>  {
>> -       return ring->head == ring->tail;
>> +       return ring->cons == ring->prod;
>>  }
>>
>>  static inline int ring_isfull(struct req_ring* ring)
>>  {
>> -       return ring_next(ring, ring->tail) == ring->head;
>> +       return ring_next(ring->prod) == ring->cons;
>>  }
>> +
>> +static void ring_add_request(struct req_ring *ring, const td_request_t
> *treq)
>> +{
>> +       /* If ring is full, it means that tapdisk2 has some bug */
>> +       if (ring_isfull(ring)) {
>> +               RPRINTF("OOPS, ring is full\n");
>> +               exit(1);
>> +       }
>> +
>> +       ring->pending_requests[ring->prod].treq = *treq;
>> +       ring->prod = ring_next(ring->prod);
>> +}
>> +
>>  /* Prototype declarations */
>>  static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
>>
>> @@ -724,89 +738,113 @@ static int mwrite(int fd, void* buf, size_t len)
>>         select(fd + 1, NULL, &wfds, NULL, &tv);
>>  }
>>
>> -
>> -static void inline close_stream_fd(struct tdremus_state *s)
>> -{
>> -       if (s->stream_fd.fd < 0)
>> -               return;
>> -
>> -       /* XXX: -2 is magic. replace with macro perhaps? */
>> -       tapdisk_server_unregister_event(s->stream_fd.id);
>> -       close(s->stream_fd.fd);
>> -       s->stream_fd.fd = -2;
>> -}
>> -
>> -static void close_server_fd(struct tdremus_state *s)
>> -{
>> -       if (s->server_fd.fd < 0)
>> -               return;
>> -
>> -       tapdisk_server_unregister_event(s->server_fd.id);
>> -       s->server_fd.id = -1;
>> -       close(s->stream_fd.fd);
>> -       s->stream_fd.fd = -1;
>> -}
>> -
>>  /* primary functions */
>>  static void remus_client_event(event_id_t, char mode, void *private);
>> +static int primary_forward_request(struct tdremus_state *s,
>> +                                  const td_request_t *treq);
>>
>> -static int primary_blocking_connect(struct tdremus_state *state)
>> +/*
>> + * It is called when we cannot connect to backup, or find I/O error when
>> + * reading/writing.
>> + */
>> +static void primary_failed(struct tdremus_state *s, int rc)
>>  {
>> -       int fd;
>> -       int id;
>> +       td_replication_connect_kill(&s->t);
>> +       if (rc == ERROR_INTERNAL)
>> +               RPRINTF("switch to unprotected mode due to internal
> error");
>> +       UNREGISTER_EVENT(s->stream_fd.id);
>> +       switch_mode(s->tdremus_driver, mode_unprotected);
>> +}
>> +
>> +static int remus_handle_queued_io(struct tdremus_state *s)
>> +{
>> +       struct req_ring *queued_io = &s->queued_io;
>> +       unsigned int cons;
>> +       td_request_t *treq;
>>         int rc;
>> -       int flags;
>>
>> -       RPRINTF("client connecting to %s:%d...\n",
> inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
>> +       while (!ring_isempty(queued_io)) {
>> +               cons = queued_io->cons;
>> +               treq = &queued_io->pending_requests[cons].treq;
>>
>> -       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
>> -               RPRINTF("could not create client socket: %d\n", errno);
>> -               return -1;
>> -       }
>> -
>> -       do {
>> -               if ((rc = connect(fd, (struct sockaddr *)&state->sa,
>> -                   sizeof(state->sa))) < 0)
>> -               {
>> -                       if (errno == ECONNREFUSED) {
>> -                               RPRINTF("connection refused -- retrying
> in 1 second\n");
>> -                               sleep(1);
>> -                       } else {
>> -                               RPRINTF("connection failed: %d\n", errno);
>> -                               close(fd);
>> -                               return -1;
>> -                       }
>> +               if (treq->op == TD_OP_WRITE) {
>> +                       rc = primary_forward_request(s, treq);
>> +                       if (rc)
>> +                               return rc;
>>                 }
>> -       } while (rc < 0);
>>
>> -       RPRINTF("client connected\n");
>> -
>> -       /* make socket nonblocking */
>> -       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
>> -               flags = 0;
>> -       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
>> -       {
>> -               RPRINTF("error making socket nonblocking\n");
>> -               close(fd);
>> -               return -1;
>> +               td_forward_request(*treq);
>> +               queued_io->cons = ring_next(cons);
>>         }
>>
>> -       if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> fd, 0, remus_client_event, state)) < 0) {
>> -               RPRINTF("error registering client event handler: %s\n",
> strerror(id));
>> -               close(fd);
>> -               return -1;
>> -       }
>> -
>> -       state->stream_fd.fd = fd;
>> -       state->stream_fd.id = id;
>>         return 0;
>>  }
>>
>> -/* on read, just pass request through */
>> +static void remus_client_established(td_replication_connect_t *t, int rc)
>> +{
>> +       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
>> +       event_id_t id;
>> +
>> +       if (rc) {
>> +               primary_failed(s, rc);
>> +               return;
>> +       }
>> +
>> +       /* the connect succeeded */
>> +       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
>> +                                          0, remus_client_event, s);
>> +       if(id < 0) {
>> +               RPRINTF("error registering client event handler: %s\n",
>> +                       strerror(id));
>> +               primary_failed(s, ERROR_INTERNAL);
>> +               return;
>> +       }
>> +
>> +       s->stream_fd.fd = t->fd;
>> +       s->stream_fd.id = id;
>> +
>> +       /* handle the queued requests */
>> +       rc = remus_handle_queued_io(s);
>> +       if (rc)
>> +               primary_failed(s, rc);
>> +}
>> +
>>  static void primary_queue_read(td_driver_t *driver, td_request_t treq)
>>  {
>> -       /* just pass read through */
>> -       td_forward_request(treq);
>> +       struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> +       struct req_ring *ring = &s->queued_io;
>> +
>> +       if (ring_isempty(ring)) {
>> +               /* just pass read through */
>> +               td_forward_request(treq);
>> +               return;
>> +       }
>> +
>> +       ring_add_request(ring, &treq);
>> +}
>> +
>> +static int primary_forward_request(struct tdremus_state *s,
>> +                                  const td_request_t *treq)
>> +{
>> +       char header[sizeof(uint32_t) + sizeof(uint64_t)];
>> +       uint32_t *sectors = (uint32_t *)header;
>> +       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
>> +       td_driver_t *driver = s->tdremus_driver;
>> +
>> +       *sectors = treq->secs;
>> +       *sector = treq->sec;
>> +
>> +       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE))
> < 0)
>> +               return ERROR_IO;
>> +
>> +       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
>> +               return ERROR_IO;
>> +
>> +       if (mwrite(s->stream_fd.fd, treq->buf,
>> +           treq->secs * driver->info.sector_size) < 0)
>> +               return ERROR_IO;
>> +
>> +       return 0;
>>  }
>>
>>  /* TODO:
>> @@ -819,28 +857,28 @@ static void primary_queue_read(td_driver_t *driver,
> td_request_t treq)
>>  static void primary_queue_write(td_driver_t *driver, td_request_t treq)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> -
>> -       char header[sizeof(uint32_t) + sizeof(uint64_t)];
>> -       uint32_t *sectors = (uint32_t *)header;
>> -       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
>> +       int rc, ret;
>>
>>         // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
>>
>> -       /* -1 means we haven't connected yet, -2 means the connection was
> lost */
>> -       if(s->stream_fd.fd == -1) {
>> +       ret = td_replication_connect_status(&s->t);
>> +       if(ret == -1) {
>>                 RPRINTF("connecting to backup...\n");
>> -               primary_blocking_connect(s);
>> +               s->t.callback = remus_client_established;
>> +               rc = td_replication_client_start(&s->t);
>> +               if (rc)
>> +                       goto fail;
>>         }
>>
>> -       *sectors = treq.secs;
>> -       *sector = treq.sec;
>> +       /* The connection is not established, just queue the request */
>> +       if (ret != 1) {
>> +               ring_add_request(&s->queued_io, &treq);
>> +               return;
>> +       }
>>
>> -       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE))
> < 0)
>> -               goto fail;
>> -       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
>> -               goto fail;
>> -
>> -       if (mwrite(s->stream_fd.fd, treq.buf, treq.secs *
> driver->info.sector_size) < 0)
>> +       /* The connection is established */
>> +       rc = primary_forward_request(s, &treq);
>> +       if (rc)
>>                 goto fail;
>>
>>         td_forward_request(treq);
>> @@ -850,7 +888,7 @@ static void primary_queue_write(td_driver_t *driver,
> td_request_t treq)
>>   fail:
>>         /* switch to unprotected mode and tell tapdisk to retry */
>>         RPRINTF("write request replication failed, switching to
> unprotected mode");
>> -       switch_mode(s->tdremus_driver, mode_unprotected);
>> +       primary_failed(s, rc);
>>         td_complete_request(treq, -EBUSY);
>>  }
>>
>> @@ -867,7 +905,7 @@ static int client_flush(td_driver_t *driver)
>>
>>         if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
> strlen(TDREMUS_COMMIT)) < 0) {
>>                 RPRINTF("error flushing output");
>> -               close_stream_fd(s);
>> +               primary_failed(s, ERROR_IO);
>>                 return -1;
>>         }
>>
>> @@ -886,6 +924,26 @@ static int server_flush(td_driver_t *driver)
>>         return ramdisk_flush(driver, s);
>>  }
>>
>> +/* It is called when switching the mode from primary to unprotected */
>> +static int primary_flush(td_driver_t *driver)
>> +{
>> +       struct tdremus_state *s = driver->data;
>> +       struct req_ring *ring = &s->queued_io;
>> +       unsigned int cons;
>> +
>> +       if (ring_isempty(ring))
>> +               return 0;
>> +
>> +       while (!ring_isempty(ring)) {
>> +               cons = ring->cons;
>> +               ring->cons = ring_next(cons);
>> +
>> +               td_forward_request(ring->pending_requests[cons].treq);
>> +       }
>> +
>> +       return client_flush(driver);
>> +}
>> +
>>  static int primary_start(td_driver_t *driver)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> @@ -894,7 +952,7 @@ static int primary_start(td_driver_t *driver)
>>
>>         tapdisk_remus.td_queue_read = primary_queue_read;
>>         tapdisk_remus.td_queue_write = primary_queue_write;
>> -       s->queue_flush = client_flush;
>> +       s->queue_flush = primary_flush;
>>
>>         s->stream_fd.fd = -1;
>>         s->stream_fd.id = -1;
>> @@ -913,7 +971,7 @@ static void remus_client_event(event_id_t id, char
> mode, void *private)
>>         if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
>>                 /* replication stream closed or otherwise broken
> (timeout, reset, &c) */
>>                 RPRINTF("error reading from backup\n");
>> -               close_stream_fd(s);
>> +               primary_failed(s, ERROR_IO);
>>                 return;
>>         }
>>
>> @@ -924,7 +982,7 @@ static void remus_client_event(event_id_t id, char
> mode, void *private)
>>                 ctl_respond(s, TDREMUS_DONE);
>>         else {
>>                 RPRINTF("received unknown message: %s\n", req);
>> -               close_stream_fd(s);
>> +               primary_failed(s, ERROR_IO);
>>         }
>>
>>         return;
>> @@ -933,84 +991,36 @@ static void remus_client_event(event_id_t id, char
> mode, void *private)
>>  /* backup functions */
>>  static void remus_server_event(event_id_t id, char mode, void *private);
>>
>> -/* returns the socket that receives write requests */
>> -static void remus_server_accept(event_id_t id, char mode, void* private)
>> +/* It is called when we find some I/O error */
>> +static void backup_failed(struct tdremus_state *s, int rc)
>>  {
>> -       struct tdremus_state* s = (struct tdremus_state *) private;
>> +       UNREGISTER_EVENT(s->stream_fd.id);
>> +       td_replication_connect_kill(&s->t);
>> +       /* We will switch to unprotected mode in backup_queue_write() */
>> +}
>>
>> -       int stream_fd;
>> -       event_id_t cid;
>> +/* returns the socket that receives write requests */
>> +static void remus_server_established(td_replication_connect_t *t, int rc)
>> +{
>> +       struct tdremus_state *s = CONTAINER_OF(t, *s, t);
>> +       event_id_t id;
>>
>> -       /* XXX: add address-based black/white list */
>> -       if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
>> -               RPRINTF("error accepting connection: %d\n", errno);
>> -               return;
>> -       }
>> -
>> -       /* TODO: check to see if we are already replicating. if so just
> close the
>> -        * connection (or do something smarter) */
>> -       RPRINTF("server accepted connection\n");
>> +       /* rc is always 0 */
>>
>>         /* add tapdisk event for replication stream */
>> -       cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> stream_fd, 0,
>> -                                           remus_server_event, s);
>> +       id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
> 0,
>> +                                          remus_server_event, s);
>>
>> -       if(cid < 0) {
>> -               RPRINTF("error registering connection event handler:
> %s\n", strerror(errno));
>> -               close(stream_fd);
>> +       if (id < 0) {
>> +               RPRINTF("error registering connection event handler:
> %s\n",
>> +                       strerror(errno));
>> +               td_replication_server_restart(t);
>>                 return;
>>         }
>>
>>         /* store replication file descriptor */
>> -       s->stream_fd.fd = stream_fd;
>> -       s->stream_fd.id = cid;
>> -}
>> -
>> -/* returns -2 if EADDRNOTAVAIL */
>> -static int remus_bind(struct tdremus_state* s)
>> -{
>> -//  struct sockaddr_in sa;
>> -       int opt;
>> -       int rc = -1;
>> -
>> -       if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
>> -               RPRINTF("could not create server socket: %d\n", errno);
>> -               return rc;
>> -       }
>> -       opt = 1;
>> -       if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt,
> sizeof(opt)) < 0)
>> -               RPRINTF("Error setting REUSEADDR on %d: %d\n",
> s->server_fd.fd, errno);
>> -
>> -       if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> sizeof(s->sa)) < 0) {
>> -               RPRINTF("could not bind server socket %d to %s:%d: %d
> %s\n", s->server_fd.fd,
>> -                       inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port),
> errno, strerror(errno));
>> -               if (errno != EADDRINUSE)
>> -                       rc = -2;
>> -               goto err_sfd;
>> -       }
>> -       if (listen(s->server_fd.fd, 10)) {
>> -               RPRINTF("could not listen on socket: %d\n", errno);
>> -               goto err_sfd;
>> -       }
>> -
>> -       /* The socket s now bound to the address and listening so we may
> now register
>> -   * the fd with tapdisk */
>> -
>> -       if((s->server_fd.id =
> tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
>> -
>  s->server_fd.fd, 0,
>> -
>  remus_server_accept, s)) < 0) {
>> -               RPRINTF("error registering server connection event
> handler: %s",
>> -                       strerror(s->server_fd.id));
>> -               goto err_sfd;
>> -       }
>> -
>> -       return 0;
>> -
>> - err_sfd:
>> -       close(s->server_fd.fd);
>> -       s->server_fd.fd = -1;
>> -
>> -       return rc;
>> +       s->stream_fd.fd = t->fd;
>> +       s->stream_fd.id = id;
>>  }
>>
>>  /* wait for latest checkpoint to be applied */
>> @@ -1053,6 +1063,8 @@ void backup_queue_write(td_driver_t *driver,
> td_request_t treq)
>>          * handle the write
>>          */
>>
>> +       /* If we have called backup_failed, calling it again is harmless
> */
>> +       backup_failed(s, ERROR_INTERNAL);
>>         switch_mode(driver, mode_unprotected);
>>         /* TODO: call the appropriate write function rather than return
> EBUSY */
>>         td_complete_request(treq, -EBUSY);
>> @@ -1061,7 +1073,6 @@ void backup_queue_write(td_driver_t *driver,
> td_request_t treq)
>>  static int backup_start(td_driver_t *driver)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> -       int fd;
>>
>>         if (ramdisk_start(driver) < 0)
>>                 return -1;
>> @@ -1073,12 +1084,12 @@ static int backup_start(td_driver_t *driver)
>>         return 0;
>>  }
>>
>> -static int server_do_wreq(td_driver_t *driver)
>> +static void server_do_wreq(td_driver_t *driver)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>>         static tdremus_wire_t twreq;
>>         char buf[4096];
>> -       int len, rc;
>> +       int len, rc = ERROR_IO;
>>
>>         char header[sizeof(uint32_t) + sizeof(uint64_t)];
>>         uint32_t *sectors = (uint32_t *) header;
>> @@ -1097,28 +1108,28 @@ static int server_do_wreq(td_driver_t *driver)
>>         if (len > sizeof(buf)) {
>>                 /* freak out! */
>>                 RPRINTF("write request too large: %d/%u\n", len,
> (unsigned)sizeof(buf));
>> -               return -1;
>> +               goto err;
>>         }
>>
>>         if (mread(s->stream_fd.fd, buf, len) < 0)
>>                 goto err;
>>
>> -       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
>> +       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
>> +               rc = ERROR_INTERNAL;
>>                 goto err;
>> +       }
>>
>> -       return 0;
>> +       return;
>>
>>   err:
>>         /* should start failover */
>>         RPRINTF("backup write request error\n");
>> -       close_stream_fd(s);
>> -
>> -       return -1;
>> +       backup_failed(s, rc);
>>  }
>>
>>  /* at this point, the server can start applying the most recent
>>   * ramdisk. */
>> -static int server_do_creq(td_driver_t *driver)
>> +static void server_do_creq(td_driver_t *driver)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>>
>> @@ -1128,9 +1139,7 @@ static int server_do_creq(td_driver_t *driver)
>>
>>         /* XXX this message should not be sent until flush completes! */
>>         if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) !=
> 4)
>> -               return -1;
>> -
>> -       return 0;
>> +               backup_failed(s, ERROR_IO);
>>  }
>>
>>
>> @@ -1213,11 +1222,6 @@ static int unprotected_start(td_driver_t *driver)
>>
>>         RPRINTF("failure detected, activating passthrough\n");
>>
>> -       /* close the server socket */
>> -       close_stream_fd(s);
>> -
>> -       close_server_fd(s);
>> -
>>         /* install the unprotected read/write handlers */
>>         tapdisk_remus.td_queue_read = unprotected_queue_read;
>>         tapdisk_remus.td_queue_write = unprotected_queue_write;
>> @@ -1227,90 +1231,6 @@ static int unprotected_start(td_driver_t *driver)
>>
>>
>>  /* control */
>> -
>> -static inline int resolve_address(const char* addr, struct in_addr* ia)
>> -{
>> -       struct hostent* he;
>> -       uint32_t ip;
>> -
>> -       if (!(he = gethostbyname(addr))) {
>> -               RPRINTF("error resolving %s: %d\n", addr, h_errno);
>> -               return -1;
>> -       }
>> -
>> -       if (!he->h_addr_list[0]) {
>> -               RPRINTF("no address found for %s\n", addr);
>> -               return -1;
>> -       }
>> -
>> -       /* network byte order */
>> -       ip = *((uint32_t**)he->h_addr_list)[0];
>> -       ia->s_addr = ip;
>> -
>> -       return 0;
>> -}
>> -
>> -static int get_args(td_driver_t *driver, const char* name)
>> -{
>> -       struct tdremus_state *state = (struct tdremus_state
> *)driver->data;
>> -       char* host;
>> -       char* port;
>> -//  char* driver_str;
>> -//  char* parent;
>> -//  int type;
>> -//  char* path;
>> -//  unsigned long ulport;
>> -//  int i;
>> -//  struct sockaddr_in server_addr_in;
>> -
>> -       int gai_status;
>> -       int valid_addr;
>> -       struct addrinfo gai_hints;
>> -       struct addrinfo *servinfo, *servinfo_itr;
>> -
>> -       memset(&gai_hints, 0, sizeof gai_hints);
>> -       gai_hints.ai_family = AF_UNSPEC;
>> -       gai_hints.ai_socktype = SOCK_STREAM;
>> -
>> -       port = strchr(name, ':');
>> -       if (!port) {
>> -               RPRINTF("missing host in %s\n", name);
>> -               return -ENOENT;
>> -       }
>> -       if (!(host = strndup(name, port - name))) {
>> -               RPRINTF("unable to allocate host\n");
>> -               return -ENOMEM;
>> -       }
>> -       port++;
>> -
>> -       if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo))
> != 0) {
>> -               RPRINTF("getaddrinfo error: %s\n",
> gai_strerror(gai_status));
>> -               return -ENOENT;
>> -       }
>> -
>> -       /* TODO: do something smarter here */
>> -       valid_addr = 0;
>> -       for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr =
> servinfo_itr->ai_next) {
>> -               void *addr;
>> -               char *ipver;
>> -
>> -               if (servinfo_itr->ai_family == AF_INET) {
>> -                       valid_addr = 1;
>> -                       memset(&state->sa, 0, sizeof(state->sa));
>> -                       state->sa = *(struct sockaddr_in
> *)servinfo_itr->ai_addr;
>> -                       break;
>> -               }
>> -       }
>> -       freeaddrinfo(servinfo);
>> -
>> -       if (!valid_addr)
>> -               return -ENOENT;
>> -
>> -       RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr),
> ntohs(state->sa.sin_port));
>> -
>> -       return 0;
>> -}
>> -
>>  static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> @@ -1343,6 +1263,20 @@ static int switch_mode(td_driver_t *driver, enum
> tdremus_mode mode)
>>         return rc;
>>  }
>>
>> +static void ctl_reopen(struct tdremus_state *s)
>> +{
>> +       ctl_unregister(s);
>> +       CLOSE_FD(s->ctl_fd.fd);
>> +       RPRINTF("FIFO closed\n");
>> +
>> +       if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
>> +               RPRINTF("error reopening FIFO: %d\n", errno);
>> +               return;
>> +       }
>> +
>> +       ctl_register(s);
>> +}
>> +
>>  static void ctl_request(event_id_t id, char mode, void *private)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)private;
>> @@ -1355,11 +1289,7 @@ static void ctl_request(event_id_t id, char mode,
> void *private)
>>         if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul
> */))) {
>>                 RPRINTF("0-byte read received, reopening FIFO\n");
>>                 /*TODO: we may have to unregister/re-register with
> tapdisk_server */
>> -               close(s->ctl_fd.fd);
>> -               RPRINTF("FIFO closed\n");
>> -               if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
>> -                       RPRINTF("error reopening FIFO: %d\n", errno);
>> -               }
>> +               ctl_reopen(s);
>>                 return;
>>         }
>>
>> @@ -1372,7 +1302,7 @@ static void ctl_request(event_id_t id, char mode,
> void *private)
>>         msg[rc] = '\0';
>>         if (!strncmp(msg, "flush", 5)) {
>>                 if (s->mode == mode_primary) {
>> -                       if ((rc = s->queue_flush(driver))) {
>> +                       if ((rc = client_flush(driver))) {
>>                                 RPRINTF("error passing flush request to
> backup");
>>                                 ctl_respond(s, TDREMUS_FAIL);
>>                         }
>> @@ -1521,6 +1451,7 @@ static void ctl_unregister(struct tdremus_state *s)
>>  static int tdremus_open(td_driver_t *driver, td_image_t *image,
> td_uuid_t uuid)
>>  {
>>         struct tdremus_state *s = (struct tdremus_state *)driver->data;
>> +       td_replication_connect_t *t = &s->t;
>>         int rc;
>>         const char *name = image->name;
>>         td_flag_t flags = image->flags;
>> @@ -1531,7 +1462,6 @@ static int tdremus_open(td_driver_t *driver,
> td_image_t *image, td_uuid_t uuid)
>>         remus_image = image;
>>
>>         memset(s, 0, sizeof(*s));
>> -       s->server_fd.fd = -1;
>>         s->stream_fd.fd = -1;
>>         s->ctl_fd.fd = -1;
>>         s->msg_fd.fd = -1;
>> @@ -1540,8 +1470,11 @@ static int tdremus_open(td_driver_t *driver,
> td_image_t *image, td_uuid_t uuid)
>>          * the driver stack from the stream_fd event handler */
>>         s->tdremus_driver = driver;
>>
>> -       /* parse name to get info etc */
>> -       if ((rc = get_args(driver, name)))
>> +       t->log_prefix = "remus";
>> +       t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
>> +       t->max_connections = 10;
>> +       t->callback = remus_server_established;
>> +       if ((rc = td_replication_connect_init(t, name)))
>>                 return rc;
>>
>>         if ((rc = ctl_open(driver, name))) {
>> @@ -1555,7 +1488,7 @@ static int tdremus_open(td_driver_t *driver,
> td_image_t *image, td_uuid_t uuid)
>>                 return rc;
>>         }
>>
>> -       if (!(rc = remus_bind(s)))
>> +       if (!(rc = td_replication_server_start(t)))
>>                 rc = switch_mode(driver, mode_backup);
>>         else if (rc == -2)
>>                 rc = switch_mode(driver, mode_primary);
>> @@ -1575,8 +1508,7 @@ static int tdremus_close(td_driver_t *driver)
>>         if (s->ramdisk.inprogress)
>>                 hashtable_destroy(s->ramdisk.inprogress, 0);
>>
>> -       close_server_fd(s);
>> -       close_stream_fd(s);
>> +       td_replication_connect_kill(&s->t);
>>         ctl_unregister(s);
>>         ctl_close(s);
>>
>> diff --git a/tools/blktap2/drivers/block-replication.h
> b/tools/blktap2/drivers/block-replication.h
>> index 9e051cc..07fd630 100644
>> --- a/tools/blktap2/drivers/block-replication.h
>> +++ b/tools/blktap2/drivers/block-replication.h
>> @@ -48,6 +48,7 @@
>>  enum {
>>         ERROR_INTERNAL = -1,
>>         ERROR_CONNECTION = -2,
>> +       ERROR_IO = -3,
>>  };
>>
>>  typedef struct td_replication_connect td_replication_connect_t;
>> --
>> 1.9.3
>>
> 
> The code looks ok. Have you tested this, with some read/write workload
> inside the guest? Especially read after write style sanity checks to ensure
> that there is no data corruption (caused by stale ramdisk data flushed to
> disk or served to guest), before a connection to backup has been
> established.

Which current testtool can check this?
Before the connection to backup has been established, the guest will be blocked
when the first write operation happens. So you cannot log in and run a test 
program.

> I am acking this piece under good faith that you have tested all these
> cases.

Yes. Apply the hack patch17, you can run remus with blktap2.

I have tested it with pgbench. IIRC, in the test, I only find one problem:
select() will be timeout in xc_domain_restore.c.

Thanks
Wen Congyang

> 
> Acked-by: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> 


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.