[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Xen-devel] [PATCH 13/17] tools: block-remus: connect to backup asynchronously
On Oct 19, 2014 10:59 PM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
>
> On 10/20/2014 10:50 AM, Shriram Rajagopalan wrote:
> > On Oct 13, 2014 10:13 PM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
> >>
> >> Use the API to connect to backup asynchronously.
> >> Before the connection is established, we queue
> >> all I/O requests, and handle them when the connection
> >> is established.
> >>
> >> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
> >> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> >> ---
> >> tools/blktap2/drivers/block-remus.c   Â| 508
> > +++++++++++++-----------------
> >>Â tools/blktap2/drivers/block-replication.h |Â Â1 +
> >>Â 2 files changed, 221 insertions(+), 288 deletions(-)
> >>
> >> diff --git a/tools/blktap2/drivers/block-remus.c
> > b/tools/blktap2/drivers/block-remus.c
> >> index e5ad782..a2b9f62 100644
> >> --- a/tools/blktap2/drivers/block-remus.c
> >> +++ b/tools/blktap2/drivers/block-remus.c
> >> @@ -40,6 +40,7 @@
> >>Â #include "hashtable.h"
> >>Â #include "hashtable_itr.h"
> >>Â #include "hashtable_utility.h"
> >> +#include "block-replication.h"
> >>
> >>Â #include <errno.h>
> >>Â #include <inttypes.h>
> >> @@ -49,10 +50,7 @@
> >>Â #include <string.h>
> >>Â #include <sys/time.h>
> >>Â #include <sys/types.h>
> >> -#include <sys/socket.h>
> >> -#include <netdb.h>
> >>Â #include <netinet/in.h>
> >> -#include <arpa/inet.h>
> >>Â #include <sys/param.h>
> >>Â #include <sys/sysctl.h>
> >>Â #include <unistd.h>
> >> @@ -63,10 +61,12 @@
> >>Â #define RAMDISK_HASHSIZE 128
> >>
> >>Â /* connect retry timeout (seconds) */
> >> -#define REMUS_CONNRETRY_TIMEOUT 10
> >> +#define REMUS_CONNRETRY_TIMEOUT 1
> >>
> >>Â #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
> >>
> >> +#define MAX_REMUS_REQUESTSÂ Â Â TAPDISK_DATA_REQUESTS
> >> +
> >>Â enum tdremus_mode {
> >>Â Â Â Â Âmode_invalid = 0,
> >>Â Â Â Â Âmode_unprotected,
> >> @@ -75,16 +75,14 @@ enum tdremus_mode {
> >>Â };
> >>
> >>Â struct tdremus_req {
> >> -Â Â Â Âuint64_t sector;
> >> -Â Â Â Âint nb_sectors;
> >> -Â Â Â Âchar buf[4096];
> >> +Â Â Â Âtd_request_t treq;
> >>Â };
> >>
> >>Â struct req_ring {
> >>Â Â Â Â Â/* waste one slot to distinguish between empty and full */
> >> -Â Â Â Âstruct tdremus_req requests[MAX_REQUESTS * 2 + 1];
> >> -Â Â Â Âunsigned int head;
> >> -Â Â Â Âunsigned int tail;
> >> +Â Â Â Âstruct tdremus_req pending_requests[MAX_REMUS_REQUESTS + 1];
> >> +Â Â Â Âunsigned int prod;
> >> +Â Â Â Âunsigned int cons;
> >>Â };
> >>
> >>Â /* TODO: This isn't very pretty, but to properly generate our own treqs
> > (needed
> >> @@ -161,13 +159,14 @@ struct tdremus_state {
> >>Â Â Â Â Âchar*Â Â Âmsg_path; /* output completion message here */
> >>Â Â Â Â Âpoll_fd_t msg_fd;
> >>
> >> -Â /* replication host */
> >> -Â Â Â Âstruct sockaddr_in sa;
> >> -Â Â Â Âpoll_fd_t server_fd;Â Â /* server listen port */
> >> +Â Â Â Âtd_replication_connect_t t;
> >>Â Â Â Â Âpoll_fd_t stream_fd;Â Â Â/* replication channel */
> >>
> >> -Â Â Â Â/* queue write requests, batch-replicate at submit */
> >> -Â Â Â Âstruct req_ring write_ring;
> >> +Â Â Â Â/*
> >> +Â Â Â Â * queue I/O requests, batch-replicate when
> >> +Â Â Â Â * the connection is established.
> >> +Â Â Â Â */
> >> +Â Â Â Âstruct req_ring queued_io;
> >>
> >>Â Â Â Â Â/* ramdisk data*/
> >>Â Â Â Â Âstruct ramdisk ramdisk;
> >> @@ -206,11 +205,13 @@ static int tdremus_close(td_driver_t *driver);
> >>
> >>Â static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
> >>Â static int ctl_respond(struct tdremus_state *s, const char *response);
> >> +static int ctl_register(struct tdremus_state *s);
> >> +static void ctl_unregister(struct tdremus_state *s);
> >>
> >>Â /* ring functions */
> >> -static inline unsigned int ring_next(struct req_ring* ring, unsigned int
> > pos)
> >> +static inline unsigned int ring_next(unsigned int pos)
> >>Â {
> >> -Â Â Â Âif (++pos >= MAX_REQUESTS * 2 + 1)
> >> +Â Â Â Âif (++pos >= MAX_REMUS_REQUESTS + 1)
> >>Â Â Â Â Â Â Â Â Âreturn 0;
> >>
> >>Â Â Â Â Âreturn pos;
> >> @@ -218,13 +219,26 @@ static inline unsigned int ring_next(struct
> > req_ring* ring, unsigned int pos)
> >>
> >>Â static inline int ring_isempty(struct req_ring* ring)
> >>Â {
> >> -Â Â Â Âreturn ring->head == ring->tail;
> >> +Â Â Â Âreturn ring->cons == ring->prod;
> >>Â }
> >>
> >>Â static inline int ring_isfull(struct req_ring* ring)
> >>Â {
> >> -Â Â Â Âreturn ring_next(ring, ring->tail) == ring->head;
> >> +Â Â Â Âreturn ring_next(ring->prod) == ring->cons;
> >>Â }
> >> +
> >> +static void ring_add_request(struct req_ring *ring, const td_request_t
> > *treq)
> >> +{
> >> +Â Â Â Â/* If ring is full, it means that tapdisk2 has some bug */
> >> +Â Â Â Âif (ring_isfull(ring)) {
> >> +Â Â Â Â Â Â Â ÂRPRINTF("OOPS, ring is full\n");
> >> +Â Â Â Â Â Â Â Âexit(1);
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Âring->pending_requests[ring->prod].treq = *treq;
> >> +Â Â Â Âring->prod = ring_next(ring->prod);
> >> +}
> >> +
> >>Â /* Prototype declarations */
> >>Â static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
> >>
> >> @@ -724,89 +738,113 @@ static int mwrite(int fd, void* buf, size_t len)
> >>Â Â Â Â Âselect(fd + 1, NULL, &wfds, NULL, &tv);
> >>Â }
> >>
> >> -
> >> -static void inline close_stream_fd(struct tdremus_state *s)
> >> -{
> >> -Â Â Â Âif (s->stream_fd.fd < 0)
> >> -Â Â Â Â Â Â Â Âreturn;
> >> -
> >> -Â Â Â Â/* XXX: -2 is magic. replace with macro perhaps? */
> >> -Â Â Â Âtapdisk_server_unregister_event(s->stream_fd.id);
> >> -Â Â Â Âclose(s->stream_fd.fd);
> >> -Â Â Â Âs->stream_fd.fd = -2;
> >> -}
> >> -
> >> -static void close_server_fd(struct tdremus_state *s)
> >> -{
> >> -Â Â Â Âif (s->server_fd.fd < 0)
> >> -Â Â Â Â Â Â Â Âreturn;
> >> -
> >> -Â Â Â Âtapdisk_server_unregister_event(s->server_fd.id);
> >> -Â Â Â Âs->server_fd.id = -1;
> >> -Â Â Â Âclose(s->stream_fd.fd);
> >> -Â Â Â Âs->stream_fd.fd = -1;
> >> -}
> >> -
> >>Â /* primary functions */
> >>Â static void remus_client_event(event_id_t, char mode, void *private);
> >> +static int primary_forward_request(struct tdremus_state *s,
> >> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â const td_request_t *treq);
> >>
> >> -static int primary_blocking_connect(struct tdremus_state *state)
> >> +/*
> >> + * It is called when we cannot connect to backup, or find I/O error when
> >> + * reading/writing.
> >> + */
> >> +static void primary_failed(struct tdremus_state *s, int rc)
> >>Â {
> >> -Â Â Â Âint fd;
> >> -Â Â Â Âint id;
> >> +Â Â Â Âtd_replication_connect_kill(&s->t);
> >> +Â Â Â Âif (rc == ERROR_INTERNAL)
> >> +Â Â Â Â Â Â Â ÂRPRINTF("switch to unprotected mode due to internal
> > error");
> >> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.id);
> >> +Â Â Â Âswitch_mode(s->tdremus_driver, mode_unprotected);
> >> +}
> >> +
> >> +static int remus_handle_queued_io(struct tdremus_state *s)
> >> +{
> >> +Â Â Â Âstruct req_ring *queued_io = &s->queued_io;
> >> +Â Â Â Âunsigned int cons;
> >> +Â Â Â Âtd_request_t *treq;
> >>Â Â Â Â Âint rc;
> >> -Â Â Â Âint flags;
> >>
> >> -Â Â Â ÂRPRINTF("client connecting to %s:%d...\n",
> > inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
> >> +Â Â Â Âwhile (!ring_isempty(queued_io)) {
> >> +Â Â Â Â Â Â Â Âcons = queued_io->cons;
> >> +Â Â Â Â Â Â Â Âtreq = &queued_io->pending_requests[cons].treq;
> >>
> >> -Â Â Â Âif ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("could not create client socket: %d\n", errno);
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Âdo {
> >> -Â Â Â Â Â Â Â Âif ((rc = connect(fd, (struct sockaddr *)&state->sa,
> >> -Â Â Â Â Â Â Â Â Â Âsizeof(state->sa))) < 0)
> >> -Â Â Â Â Â Â Â Â{
> >> -Â Â Â Â Â Â Â Â Â Â Â Âif (errno == ECONNREFUSED) {
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("connection refused -- retrying
> > in 1 second\n");
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsleep(1);
> >> -Â Â Â Â Â Â Â Â Â Â Â Â} else {
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("connection failed: %d\n", errno);
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âclose(fd);
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> >> -Â Â Â Â Â Â Â Â Â Â Â Â}
> >> +Â Â Â Â Â Â Â Âif (treq->op == TD_OP_WRITE) {
> >> +Â Â Â Â Â Â Â Â Â Â Â Ârc = primary_forward_request(s, treq);
> >> +Â Â Â Â Â Â Â Â Â Â Â Âif (rc)
> >> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn rc;
> >>Â Â Â Â Â Â Â Â Â}
> >> -Â Â Â Â} while (rc < 0);
> >>
> >> -Â Â Â ÂRPRINTF("client connected\n");
> >> -
> >> -Â Â Â Â/* make socket nonblocking */
> >> -Â Â Â Âif ((flags = fcntl(fd, F_GETFL, 0)) == -1)
> >> -Â Â Â Â Â Â Â Âflags = 0;
> >> -Â Â Â Âif (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
> >> -Â Â Â Â{
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error making socket nonblocking\n");
> >> -Â Â Â Â Â Â Â Âclose(fd);
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> +Â Â Â Â Â Â Â Âtd_forward_request(*treq);
> >> +Â Â Â Â Â Â Â Âqueued_io->cons = ring_next(cons);
> >>Â Â Â Â Â}
> >>
> >> -Â Â Â Âif((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> > fd, 0, remus_client_event, state)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error registering client event handler: %s\n",
> > strerror(id));
> >> -Â Â Â Â Â Â Â Âclose(fd);
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Âstate->stream_fd.fd = fd;
> >> -Â Â Â Âstate->stream_fd.id = id;
> >>Â Â Â Â Âreturn 0;
> >>Â }
> >>
> >> -/* on read, just pass request through */
> >> +static void remus_client_established(td_replication_connect_t *t, int rc)
> >> +{
> >> +Â Â Â Âstruct tdremus_state *s = CONTAINER_OF(t, *s, t);
> >> +Â Â Â Âevent_id_t id;
> >> +
> >> +Â Â Â Âif (rc) {
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, rc);
> >> +Â Â Â Â Â Â Â Âreturn;
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Â/* the connect succeeded */
> >> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
> >> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â 0, remus_client_event, s);
> >> +Â Â Â Âif(id < 0) {
> >> +Â Â Â Â Â Â Â ÂRPRINTF("error registering client event handler: %s\n",
> >> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(id));
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_INTERNAL);
> >> +Â Â Â Â Â Â Â Âreturn;
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Âs->stream_fd.fd = t->fd;
> >> +Â Â Â Âs->stream_fd.id = id;
> >> +
> >> +Â Â Â Â/* handle the queued requests */
> >> +Â Â Â Ârc = remus_handle_queued_io(s);
> >> +Â Â Â Âif (rc)
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, rc);
> >> +}
> >> +
> >>Â static void primary_queue_read(td_driver_t *driver, td_request_t treq)
> >>Â {
> >> -Â Â Â Â/* just pass read through */
> >> -Â Â Â Âtd_forward_request(treq);
> >> +Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> +Â Â Â Âstruct req_ring *ring = &s->queued_io;
> >> +
> >> +Â Â Â Âif (ring_isempty(ring)) {
> >> +Â Â Â Â Â Â Â Â/* just pass read through */
> >> +Â Â Â Â Â Â Â Âtd_forward_request(treq);
> >> +Â Â Â Â Â Â Â Âreturn;
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Âring_add_request(ring, &treq);
> >> +}
> >> +
> >> +static int primary_forward_request(struct tdremus_state *s,
> >> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â const td_request_t *treq)
> >> +{
> >> +Â Â Â Âchar header[sizeof(uint32_t) + sizeof(uint64_t)];
> >> +Â Â Â Âuint32_t *sectors = (uint32_t *)header;
> >> +Â Â Â Âuint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
> >> +Â Â Â Âtd_driver_t *driver = s->tdremus_driver;
> >> +
> >> +Â Â Â Â*sectors = treq->secs;
> >> +Â Â Â Â*sector = treq->sec;
> >> +
> >> +Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE))
> > < 0)
> >> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> >> +
> >> +Â Â Â Âif (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
> >> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> >> +
> >> +Â Â Â Âif (mwrite(s->stream_fd.fd, treq->buf,
> >> +Â Â Â Â Â Âtreq->secs * driver->info.sector_size) < 0)
> >> +Â Â Â Â Â Â Â Âreturn ERROR_IO;
> >> +
> >> +Â Â Â Âreturn 0;
> >>Â }
> >>
> >>Â /* TODO:
> >> @@ -819,28 +857,28 @@ static void primary_queue_read(td_driver_t *driver,
> > td_request_t treq)
> >>Â static void primary_queue_write(td_driver_t *driver, td_request_t treq)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> -
> >> -Â Â Â Âchar header[sizeof(uint32_t) + sizeof(uint64_t)];
> >> -Â Â Â Âuint32_t *sectors = (uint32_t *)header;
> >> -Â Â Â Âuint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
> >> +Â Â Â Âint rc, ret;
> >>
> >>Â Â Â Â Â// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
> >>
> >> -Â Â Â Â/* -1 means we haven't connected yet, -2 means the connection was
> > lost */
> >> -Â Â Â Âif(s->stream_fd.fd == -1) {
> >> +Â Â Â Âret = td_replication_connect_status(&s->t);
> >> +Â Â Â Âif(ret == -1) {
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("connecting to backup...\n");
> >> -Â Â Â Â Â Â Â Âprimary_blocking_connect(s);
> >> +Â Â Â Â Â Â Â Âs->t.callback = remus_client_established;
> >> +Â Â Â Â Â Â Â Ârc = td_replication_client_start(&s->t);
> >> +Â Â Â Â Â Â Â Âif (rc)
> >> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> >>Â Â Â Â Â}
> >>
> >> -Â Â Â Â*sectors = treq.secs;
> >> -Â Â Â Â*sector = treq.sec;
> >> +Â Â Â Â/* The connection is not established, just queue the request */
> >> +Â Â Â Âif (ret != 1) {
> >> +Â Â Â Â Â Â Â Âring_add_request(&s->queued_io, &treq);
> >> +Â Â Â Â Â Â Â Âreturn;
> >> +Â Â Â Â}
> >>
> >> -Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE))
> > < 0)
> >> -Â Â Â Â Â Â Â Âgoto fail;
> >> -Â Â Â Âif (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
> >> -Â Â Â Â Â Â Â Âgoto fail;
> >> -
> >> -Â Â Â Âif (mwrite(s->stream_fd.fd, treq.buf, treq.secs *
> > driver->info.sector_size) < 0)
> >> +Â Â Â Â/* The connection is established */
> >> +Â Â Â Ârc = primary_forward_request(s, &treq);
> >> +Â Â Â Âif (rc)
> >>Â Â Â Â Â Â Â Â Âgoto fail;
> >>
> >>Â Â Â Â Âtd_forward_request(treq);
> >> @@ -850,7 +888,7 @@ static void primary_queue_write(td_driver_t *driver,
> > td_request_t treq)
> >>Â Âfail:
> >>Â Â Â Â Â/* switch to unprotected mode and tell tapdisk to retry */
> >>Â Â Â Â ÂRPRINTF("write request replication failed, switching to
> > unprotected mode");
> >> -Â Â Â Âswitch_mode(s->tdremus_driver, mode_unprotected);
> >> +Â Â Â Âprimary_failed(s, rc);
> >>Â Â Â Â Âtd_complete_request(treq, -EBUSY);
> >>Â }
> >>
> >> @@ -867,7 +905,7 @@ static int client_flush(td_driver_t *driver)
> >>
> >>Â Â Â Â Âif (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
> > strlen(TDREMUS_COMMIT)) < 0) {
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("error flushing output");
> >> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> >>Â Â Â Â Â Â Â Â Âreturn -1;
> >>Â Â Â Â Â}
> >>
> >> @@ -886,6 +924,26 @@ static int server_flush(td_driver_t *driver)
> >>Â Â Â Â Âreturn ramdisk_flush(driver, s);
> >>Â }
> >>
> >> +/* It is called when switching the mode from primary to unprotected */
> >> +static int primary_flush(td_driver_t *driver)
> >> +{
> >> +Â Â Â Âstruct tdremus_state *s = driver->data;
> >> +Â Â Â Âstruct req_ring *ring = &s->queued_io;
> >> +Â Â Â Âunsigned int cons;
> >> +
> >> +Â Â Â Âif (ring_isempty(ring))
> >> +Â Â Â Â Â Â Â Âreturn 0;
> >> +
> >> +Â Â Â Âwhile (!ring_isempty(ring)) {
> >> +Â Â Â Â Â Â Â Âcons = ring->cons;
> >> +Â Â Â Â Â Â Â Âring->cons = ring_next(cons);
> >> +
> >> +Â Â Â Â Â Â Â Âtd_forward_request(ring->pending_requests[cons].treq);
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Âreturn client_flush(driver);
> >> +}
> >> +
> >>Â static int primary_start(td_driver_t *driver)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> @@ -894,7 +952,7 @@ static int primary_start(td_driver_t *driver)
> >>
> >>Â Â Â Â Âtapdisk_remus.td_queue_read = primary_queue_read;
> >>Â Â Â Â Âtapdisk_remus.td_queue_write = primary_queue_write;
> >> -Â Â Â Âs->queue_flush = client_flush;
> >> +Â Â Â Âs->queue_flush = primary_flush;
> >>
> >>Â Â Â Â Âs->stream_fd.fd = -1;
> >>Â Â Â Â Âs->stream_fd.id = -1;
> >> @@ -913,7 +971,7 @@ static void remus_client_event(event_id_t id, char
> > mode, void *private)
> >>Â Â Â Â Âif (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
> >>Â Â Â Â Â Â Â Â Â/* replication stream closed or otherwise broken
> > (timeout, reset, &c) */
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("error reading from backup\n");
> >> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> >>Â Â Â Â Â Â Â Â Âreturn;
> >>Â Â Â Â Â}
> >>
> >> @@ -924,7 +982,7 @@ static void remus_client_event(event_id_t id, char
> > mode, void *private)
> >>Â Â Â Â Â Â Â Â Âctl_respond(s, TDREMUS_DONE);
> >>Â Â Â Â Âelse {
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("received unknown message: %s\n", req);
> >> -Â Â Â Â Â Â Â Âclose_stream_fd(s);
> >> +Â Â Â Â Â Â Â Âprimary_failed(s, ERROR_IO);
> >>Â Â Â Â Â}
> >>
> >>Â Â Â Â Âreturn;
> >> @@ -933,84 +991,36 @@ static void remus_client_event(event_id_t id, char
> > mode, void *private)
> >>Â /* backup functions */
> >>Â static void remus_server_event(event_id_t id, char mode, void *private);
> >>
> >> -/* returns the socket that receives write requests */
> >> -static void remus_server_accept(event_id_t id, char mode, void* private)
> >> +/* It is called when we find some I/O error */
> >> +static void backup_failed(struct tdremus_state *s, int rc)
> >>Â {
> >> -Â Â Â Âstruct tdremus_state* s = (struct tdremus_state *) private;
> >> +Â Â Â ÂUNREGISTER_EVENT(s->stream_fd.id);
> >> +Â Â Â Âtd_replication_connect_kill(&s->t);
> >> +Â Â Â Â/* We will switch to unprotected mode in backup_queue_write() */
> >> +}
> >>
> >> -Â Â Â Âint stream_fd;
> >> -Â Â Â Âevent_id_t cid;
> >> +/* returns the socket that receives write requests */
> >> +static void remus_server_established(td_replication_connect_t *t, int rc)
> >> +{
> >> +Â Â Â Âstruct tdremus_state *s = CONTAINER_OF(t, *s, t);
> >> +Â Â Â Âevent_id_t id;
> >>
> >> -Â Â Â Â/* XXX: add address-based black/white list */
> >> -Â Â Â Âif ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error accepting connection: %d\n", errno);
> >> -Â Â Â Â Â Â Â Âreturn;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Â/* TODO: check to see if we are already replicating. if so just
> > close the
> >> -Â Â Â Â * connection (or do something smarter) */
> >> -Â Â Â ÂRPRINTF("server accepted connection\n");
> >> +Â Â Â Â/* rc is always 0 */
> >>
> >>Â Â Â Â Â/* add tapdisk event for replication stream */
> >> -Â Â Â Âcid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> > stream_fd, 0,
> >> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âremus_server_event, s);
> >> +Â Â Â Âid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd,
> > 0,
> >> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â remus_server_event, s);
> >>
> >> -Â Â Â Âif(cid < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error registering connection event handler:
> > %s\n", strerror(errno));
> >> -Â Â Â Â Â Â Â Âclose(stream_fd);
> >> +Â Â Â Âif (id < 0) {
> >> +Â Â Â Â Â Â Â ÂRPRINTF("error registering connection event handler:
> > %s\n",
> >> +Â Â Â Â Â Â Â Â Â Â Â Âstrerror(errno));
> >> +Â Â Â Â Â Â Â Âtd_replication_server_restart(t);
> >>Â Â Â Â Â Â Â Â Âreturn;
> >>Â Â Â Â Â}
> >>
> >>Â Â Â Â Â/* store replication file descriptor */
> >> -Â Â Â Âs->stream_fd.fd = stream_fd;
> >> -Â Â Â Âs->stream_fd.id = cid;
> >> -}
> >> -
> >> -/* returns -2 if EADDRNOTAVAIL */
> >> -static int remus_bind(struct tdremus_state* s)
> >> -{
> >> -//Â struct sockaddr_in sa;
> >> -Â Â Â Âint opt;
> >> -Â Â Â Âint rc = -1;
> >> -
> >> -Â Â Â Âif ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("could not create server socket: %d\n", errno);
> >> -Â Â Â Â Â Â Â Âreturn rc;
> >> -Â Â Â Â}
> >> -Â Â Â Âopt = 1;
> >> -Â Â Â Âif (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt,
> > sizeof(opt)) < 0)
> >> -Â Â Â Â Â Â Â ÂRPRINTF("Error setting REUSEADDR on %d: %d\n",
> > s->server_fd.fd, errno);
> >> -
> >> -Â Â Â Âif (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
> > sizeof(s->sa)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("could not bind server socket %d to %s:%d: %d
> > %s\n", s->server_fd.fd,
> >> -Â Â Â Â Â Â Â Â Â Â Â Âinet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port),
> > errno, strerror(errno));
> >> -Â Â Â Â Â Â Â Âif (errno != EADDRINUSE)
> >> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -2;
> >> -Â Â Â Â Â Â Â Âgoto err_sfd;
> >> -Â Â Â Â}
> >> -Â Â Â Âif (listen(s->server_fd.fd, 10)) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("could not listen on socket: %d\n", errno);
> >> -Â Â Â Â Â Â Â Âgoto err_sfd;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Â/* The socket s now bound to the address and listening so we may
> > now register
> >> -Â Â* the fd with tapdisk */
> >> -
> >> -Â Â Â Âif((s->server_fd.id =
> > tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
> >> -
> >Â s->server_fd.fd, 0,
> >> -
> >Â remus_server_accept, s)) < 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error registering server connection event
> > handler: %s",
> >> -Â Â Â Â Â Â Â Â Â Â Â Âstrerror(s->server_fd.id));
> >> -Â Â Â Â Â Â Â Âgoto err_sfd;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Âreturn 0;
> >> -
> >> - err_sfd:
> >> -Â Â Â Âclose(s->server_fd.fd);
> >> -Â Â Â Âs->server_fd.fd = -1;
> >> -
> >> -Â Â Â Âreturn rc;
> >> +Â Â Â Âs->stream_fd.fd = t->fd;
> >> +Â Â Â Âs->stream_fd.id = id;
> >>Â }
> >>
> >>Â /* wait for latest checkpoint to be applied */
> >> @@ -1053,6 +1063,8 @@ void backup_queue_write(td_driver_t *driver,
> > td_request_t treq)
> >>Â Â Â Â Â * handle the write
> >>Â Â Â Â Â */
> >>
> >> +Â Â Â Â/* If we have called backup_failed, calling it again is harmless
> > */
> >> +Â Â Â Âbackup_failed(s, ERROR_INTERNAL);
> >>Â Â Â Â Âswitch_mode(driver, mode_unprotected);
> >>Â Â Â Â Â/* TODO: call the appropriate write function rather than return
> > EBUSY */
> >>Â Â Â Â Âtd_complete_request(treq, -EBUSY);
> >> @@ -1061,7 +1073,6 @@ void backup_queue_write(td_driver_t *driver,
> > td_request_t treq)
> >>Â static int backup_start(td_driver_t *driver)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> -Â Â Â Âint fd;
> >>
> >>Â Â Â Â Âif (ramdisk_start(driver) < 0)
> >>Â Â Â Â Â Â Â Â Âreturn -1;
> >> @@ -1073,12 +1084,12 @@ static int backup_start(td_driver_t *driver)
> >>Â Â Â Â Âreturn 0;
> >>Â }
> >>
> >> -static int server_do_wreq(td_driver_t *driver)
> >> +static void server_do_wreq(td_driver_t *driver)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >>Â Â Â Â Âstatic tdremus_wire_t twreq;
> >>Â Â Â Â Âchar buf[4096];
> >> -Â Â Â Âint len, rc;
> >> +Â Â Â Âint len, rc = ERROR_IO;
> >>
> >>Â Â Â Â Âchar header[sizeof(uint32_t) + sizeof(uint64_t)];
> >>Â Â Â Â Âuint32_t *sectors = (uint32_t *) header;
> >> @@ -1097,28 +1108,28 @@ static int server_do_wreq(td_driver_t *driver)
> >>Â Â Â Â Âif (len > sizeof(buf)) {
> >>Â Â Â Â Â Â Â Â Â/* freak out! */
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("write request too large: %d/%u\n", len,
> > (unsigned)sizeof(buf));
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> +Â Â Â Â Â Â Â Âgoto err;
> >>Â Â Â Â Â}
> >>
> >>Â Â Â Â Âif (mread(s->stream_fd.fd, buf, len) < 0)
> >>Â Â Â Â Â Â Â Â Âgoto err;
> >>
> >> -Â Â Â Âif (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
> >> +Â Â Â Âif (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
> >> +Â Â Â Â Â Â Â Ârc = ERROR_INTERNAL;
> >>Â Â Â Â Â Â Â Â Âgoto err;
> >> +Â Â Â Â}
> >>
> >> -Â Â Â Âreturn 0;
> >> +Â Â Â Âreturn;
> >>
> >>Â Âerr:
> >>Â Â Â Â Â/* should start failover */
> >>Â Â Â Â ÂRPRINTF("backup write request error\n");
> >> -Â Â Â Âclose_stream_fd(s);
> >> -
> >> -Â Â Â Âreturn -1;
> >> +Â Â Â Âbackup_failed(s, rc);
> >>Â }
> >>
> >>Â /* at this point, the server can start applying the most recent
> >>Â Â* ramdisk. */
> >> -static int server_do_creq(td_driver_t *driver)
> >> +static void server_do_creq(td_driver_t *driver)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >>
> >> @@ -1128,9 +1139,7 @@ static int server_do_creq(td_driver_t *driver)
> >>
> >>Â Â Â Â Â/* XXX this message should not be sent until flush completes! */
> >>Â Â Â Â Âif (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) !=
> > 4)
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> -
> >> -Â Â Â Âreturn 0;
> >> +Â Â Â Â Â Â Â Âbackup_failed(s, ERROR_IO);
> >>Â }
> >>
> >>
> >> @@ -1213,11 +1222,6 @@ static int unprotected_start(td_driver_t *driver)
> >>
> >>Â Â Â Â ÂRPRINTF("failure detected, activating passthrough\n");
> >>
> >> -Â Â Â Â/* close the server socket */
> >> -Â Â Â Âclose_stream_fd(s);
> >> -
> >> -Â Â Â Âclose_server_fd(s);
> >> -
> >>Â Â Â Â Â/* install the unprotected read/write handlers */
> >>Â Â Â Â Âtapdisk_remus.td_queue_read = unprotected_queue_read;
> >>Â Â Â Â Âtapdisk_remus.td_queue_write = unprotected_queue_write;
> >> @@ -1227,90 +1231,6 @@ static int unprotected_start(td_driver_t *driver)
> >>
> >>
> >>Â /* control */
> >> -
> >> -static inline int resolve_address(const char* addr, struct in_addr* ia)
> >> -{
> >> -Â Â Â Âstruct hostent* he;
> >> -Â Â Â Âuint32_t ip;
> >> -
> >> -Â Â Â Âif (!(he = gethostbyname(addr))) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("error resolving %s: %d\n", addr, h_errno);
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Âif (!he->h_addr_list[0]) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("no address found for %s\n", addr);
> >> -Â Â Â Â Â Â Â Âreturn -1;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Â/* network byte order */
> >> -Â Â Â Âip = *((uint32_t**)he->h_addr_list)[0];
> >> -Â Â Â Âia->s_addr = ip;
> >> -
> >> -Â Â Â Âreturn 0;
> >> -}
> >> -
> >> -static int get_args(td_driver_t *driver, const char* name)
> >> -{
> >> -Â Â Â Âstruct tdremus_state *state = (struct tdremus_state
> > *)driver->data;
> >> -Â Â Â Âchar* host;
> >> -Â Â Â Âchar* port;
> >> -//Â char* driver_str;
> >> -//Â char* parent;
> >> -//Â int type;
> >> -//Â char* path;
> >> -//Â unsigned long ulport;
> >> -//Â int i;
> >> -//Â struct sockaddr_in server_addr_in;
> >> -
> >> -Â Â Â Âint gai_status;
> >> -Â Â Â Âint valid_addr;
> >> -Â Â Â Âstruct addrinfo gai_hints;
> >> -Â Â Â Âstruct addrinfo *servinfo, *servinfo_itr;
> >> -
> >> -Â Â Â Âmemset(&gai_hints, 0, sizeof gai_hints);
> >> -Â Â Â Âgai_hints.ai_family = AF_UNSPEC;
> >> -Â Â Â Âgai_hints.ai_socktype = SOCK_STREAM;
> >> -
> >> -Â Â Â Âport = strchr(name, ':');
> >> -Â Â Â Âif (!port) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("missing host in %s\n", name);
> >> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> >> -Â Â Â Â}
> >> -Â Â Â Âif (!(host = strndup(name, port - name))) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("unable to allocate host\n");
> >> -Â Â Â Â Â Â Â Âreturn -ENOMEM;
> >> -Â Â Â Â}
> >> -Â Â Â Âport++;
> >> -
> >> -Â Â Â Âif ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo))
> > != 0) {
> >> -Â Â Â Â Â Â Â ÂRPRINTF("getaddrinfo error: %s\n",
> > gai_strerror(gai_status));
> >> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> >> -Â Â Â Â}
> >> -
> >> -Â Â Â Â/* TODO: do something smarter here */
> >> -Â Â Â Âvalid_addr = 0;
> >> -Â Â Â Âfor(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr =
> > servinfo_itr->ai_next) {
> >> -Â Â Â Â Â Â Â Âvoid *addr;
> >> -Â Â Â Â Â Â Â Âchar *ipver;
> >> -
> >> -Â Â Â Â Â Â Â Âif (servinfo_itr->ai_family == AF_INET) {
> >> -Â Â Â Â Â Â Â Â Â Â Â Âvalid_addr = 1;
> >> -Â Â Â Â Â Â Â Â Â Â Â Âmemset(&state->sa, 0, sizeof(state->sa));
> >> -Â Â Â Â Â Â Â Â Â Â Â Âstate->sa = *(struct sockaddr_in
> > *)servinfo_itr->ai_addr;
> >> -Â Â Â Â Â Â Â Â Â Â Â Âbreak;
> >> -Â Â Â Â Â Â Â Â}
> >> -Â Â Â Â}
> >> -Â Â Â Âfreeaddrinfo(servinfo);
> >> -
> >> -Â Â Â Âif (!valid_addr)
> >> -Â Â Â Â Â Â Â Âreturn -ENOENT;
> >> -
> >> -Â Â Â ÂRPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr),
> > ntohs(state->sa.sin_port));
> >> -
> >> -Â Â Â Âreturn 0;
> >> -}
> >> -
> >>Â static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> @@ -1343,6 +1263,20 @@ static int switch_mode(td_driver_t *driver, enum
> > tdremus_mode mode)
> >>Â Â Â Â Âreturn rc;
> >>Â }
> >>
> >> +static void ctl_reopen(struct tdremus_state *s)
> >> +{
> >> +Â Â Â Âctl_unregister(s);
> >> +Â Â Â ÂCLOSE_FD(s->ctl_fd.fd);
> >> +Â Â Â ÂRPRINTF("FIFO closed\n");
> >> +
> >> +Â Â Â Âif ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
> >> +Â Â Â Â Â Â Â ÂRPRINTF("error reopening FIFO: %d\n", errno);
> >> +Â Â Â Â Â Â Â Âreturn;
> >> +Â Â Â Â}
> >> +
> >> +Â Â Â Âctl_register(s);
> >> +}
> >> +
> >>Â static void ctl_request(event_id_t id, char mode, void *private)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)private;
> >> @@ -1355,11 +1289,7 @@ static void ctl_request(event_id_t id, char mode,
> > void *private)
> >>Â Â Â Â Âif (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul
> > */))) {
> >>Â Â Â Â Â Â Â Â ÂRPRINTF("0-byte read received, reopening FIFO\n");
> >>Â Â Â Â Â Â Â Â Â/*TODO: we may have to unregister/re-register with
> > tapdisk_server */
> >> -Â Â Â Â Â Â Â Âclose(s->ctl_fd.fd);
> >> -Â Â Â Â Â Â Â ÂRPRINTF("FIFO closed\n");
> >> -Â Â Â Â Â Â Â Âif ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
> >> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error reopening FIFO: %d\n", errno);
> >> -Â Â Â Â Â Â Â Â}
> >> +Â Â Â Â Â Â Â Âctl_reopen(s);
> >>Â Â Â Â Â Â Â Â Âreturn;
> >>Â Â Â Â Â}
> >>
> >> @@ -1372,7 +1302,7 @@ static void ctl_request(event_id_t id, char mode,
> > void *private)
> >>Â Â Â Â Âmsg[rc] = '\0';
> >>Â Â Â Â Âif (!strncmp(msg, "flush", 5)) {
> >>Â Â Â Â Â Â Â Â Âif (s->mode == mode_primary) {
> >> -Â Â Â Â Â Â Â Â Â Â Â Âif ((rc = s->queue_flush(driver))) {
> >> +Â Â Â Â Â Â Â Â Â Â Â Âif ((rc = client_flush(driver))) {
> >>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("error passing flush request to
> > backup");
> >>Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âctl_respond(s, TDREMUS_FAIL);
> >>Â Â Â Â Â Â Â Â Â Â Â Â Â}
> >> @@ -1521,6 +1451,7 @@ static void ctl_unregister(struct tdremus_state *s)
> >>Â static int tdremus_open(td_driver_t *driver, td_image_t *image,
> > td_uuid_t uuid)
> >>Â {
> >>Â Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> >> +Â Â Â Âtd_replication_connect_t *t = &s->t;
> >>Â Â Â Â Âint rc;
> >>Â Â Â Â Âconst char *name = image->name;
> >>Â Â Â Â Âtd_flag_t flags = image->flags;
> >> @@ -1531,7 +1462,6 @@ static int tdremus_open(td_driver_t *driver,
> > td_image_t *image, td_uuid_t uuid)
> >>Â Â Â Â Âremus_image = image;
> >>
> >>Â Â Â Â Âmemset(s, 0, sizeof(*s));
> >> -Â Â Â Âs->server_fd.fd = -1;
> >>Â Â Â Â Âs->stream_fd.fd = -1;
> >>Â Â Â Â Âs->ctl_fd.fd = -1;
> >>Â Â Â Â Âs->msg_fd.fd = -1;
> >> @@ -1540,8 +1470,11 @@ static int tdremus_open(td_driver_t *driver,
> > td_image_t *image, td_uuid_t uuid)
> >>Â Â Â Â Â * the driver stack from the stream_fd event handler */
> >>Â Â Â Â Âs->tdremus_driver = driver;
> >>
> >> -Â Â Â Â/* parse name to get info etc */
> >> -Â Â Â Âif ((rc = get_args(driver, name)))
> >> +Â Â Â Ât->log_prefix = "remus";
> >> +Â Â Â Ât->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT;
> >> +Â Â Â Ât->max_connections = 10;
> >> +Â Â Â Ât->callback = remus_server_established;
> >> +Â Â Â Âif ((rc = td_replication_connect_init(t, name)))
> >>Â Â Â Â Â Â Â Â Âreturn rc;
> >>
> >>Â Â Â Â Âif ((rc = ctl_open(driver, name))) {
> >> @@ -1555,7 +1488,7 @@ static int tdremus_open(td_driver_t *driver,
> > td_image_t *image, td_uuid_t uuid)
> >>Â Â Â Â Â Â Â Â Âreturn rc;
> >>Â Â Â Â Â}
> >>
> >> -Â Â Â Âif (!(rc = remus_bind(s)))
> >> +Â Â Â Âif (!(rc = td_replication_server_start(t)))
> >>Â Â Â Â Â Â Â Â Ârc = switch_mode(driver, mode_backup);
> >>Â Â Â Â Âelse if (rc == -2)
> >>Â Â Â Â Â Â Â Â Ârc = switch_mode(driver, mode_primary);
> >> @@ -1575,8 +1508,7 @@ static int tdremus_close(td_driver_t *driver)
> >>Â Â Â Â Âif (s->ramdisk.inprogress)
> >>Â Â Â Â Â Â Â Â Âhashtable_destroy(s->ramdisk.inprogress, 0);
> >>
> >> -Â Â Â Âclose_server_fd(s);
> >> -Â Â Â Âclose_stream_fd(s);
> >> +Â Â Â Âtd_replication_connect_kill(&s->t);
> >>Â Â Â Â Âctl_unregister(s);
> >>Â Â Â Â Âctl_close(s);
> >>
> >> diff --git a/tools/blktap2/drivers/block-replication.h
> > b/tools/blktap2/drivers/block-replication.h
> >> index 9e051cc..07fd630 100644
> >> --- a/tools/blktap2/drivers/block-replication.h
> >> +++ b/tools/blktap2/drivers/block-replication.h
> >> @@ -48,6 +48,7 @@
> >>Â enum {
> >>Â Â Â Â ÂERROR_INTERNAL = -1,
> >>Â Â Â Â ÂERROR_CONNECTION = -2,
> >> +Â Â Â ÂERROR_IO = -3,
> >>Â };
> >>
> >>Â typedef struct td_replication_connect td_replication_connect_t;
> >> --
> >> 1.9.3
> >>
> >
> > The code looks ok. Have you tested this, with some read/write workload
> > inside the guest? Especially read after write style sanity checks to ensure
> > that there is no data corruption (caused by stale ramdisk data flushed to
> > disk or served to guest), before a connection to backup has been
> > established.
>
> Which current testtool can check this?
> Before the connection to backup has been established, the guest will be blocked
> when the first write operation happens. So you cannot log in and run a test program.
>
That is how Remus behaves with current blktap2. I thought this patch was trying to allow the guest to run normally before starting Remus while buffering writes in a ramdisk.
> > I am acking this piece under good faith that you have tested all these
> > cases.
>
> Yes. Apply the hack patch17, you can run remus with blktap2.
>
> I have tested it with pgbench. IIRC, in the test, I only find one problem:
> select() will be timeout in xc_domain_restore.c.
>
Pgbench is too heavy for this test. You are better off running your own simple C code that does these basic sanity checks.
> Thanks
> Wen Congyang
>
> >
> > Acked-by: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> >
>
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|