[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [PATCH 13/17] tools: block-remus: connect to backup asynchronously
Use the API to connect to backup asynchronously. Before the connection is established, we queue all I/O requests, and handle them when the connection is established. Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx> --- tools/blktap2/drivers/block-remus.c | 508 +++++++++++++----------------- tools/blktap2/drivers/block-replication.h | 1 + 2 files changed, 221 insertions(+), 288 deletions(-) diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c index e5ad782..a2b9f62 100644 --- a/tools/blktap2/drivers/block-remus.c +++ b/tools/blktap2/drivers/block-remus.c @@ -40,6 +40,7 @@ #include "hashtable.h" #include "hashtable_itr.h" #include "hashtable_utility.h" +#include "block-replication.h" #include <errno.h> #include <inttypes.h> @@ -49,10 +50,7 @@ #include <string.h> #include <sys/time.h> #include <sys/types.h> -#include <sys/socket.h> -#include <netdb.h> #include <netinet/in.h> -#include <arpa/inet.h> #include <sys/param.h> #include <sys/sysctl.h> #include <unistd.h> @@ -63,10 +61,12 @@ #define RAMDISK_HASHSIZE 128 /* connect retry timeout (seconds) */ -#define REMUS_CONNRETRY_TIMEOUT 10 +#define REMUS_CONNRETRY_TIMEOUT 1 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a) +#define MAX_REMUS_REQUESTS TAPDISK_DATA_REQUESTS + enum tdremus_mode { mode_invalid = 0, mode_unprotected, @@ -75,16 +75,14 @@ enum tdremus_mode { }; struct tdremus_req { - uint64_t sector; - int nb_sectors; - char buf[4096]; + td_request_t treq; }; struct req_ring { /* waste one slot to distinguish between empty and full */ - struct tdremus_req requests[MAX_REQUESTS * 2 + 1]; - unsigned int head; - unsigned int tail; + struct tdremus_req pending_requests[MAX_REMUS_REQUESTS + 1]; + unsigned int prod; + unsigned int cons; }; /* TODO: This isn't very pretty, but to properly generate our own treqs (needed @@ -161,13 +159,14 @@ struct tdremus_state { char* msg_path; /* output completion message here */ poll_fd_t msg_fd; - /* replication host */ - struct sockaddr_in sa; - poll_fd_t server_fd; /* server listen port */ + td_replication_connect_t t; poll_fd_t stream_fd; /* replication channel */ - /* queue write requests, batch-replicate at submit */ - struct req_ring write_ring; + /* + * queue I/O requests, batch-replicate when + * the connection is established. + */ + struct req_ring queued_io; /* ramdisk data*/ struct ramdisk ramdisk; @@ -206,11 +205,13 @@ static int tdremus_close(td_driver_t *driver); static int switch_mode(td_driver_t *driver, enum tdremus_mode mode); static int ctl_respond(struct tdremus_state *s, const char *response); +static int ctl_register(struct tdremus_state *s); +static void ctl_unregister(struct tdremus_state *s); /* ring functions */ -static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos) +static inline unsigned int ring_next(unsigned int pos) { - if (++pos >= MAX_REQUESTS * 2 + 1) + if (++pos >= MAX_REMUS_REQUESTS + 1) return 0; return pos; @@ -218,13 +219,26 @@ static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos) static inline int ring_isempty(struct req_ring* ring) { - return ring->head == ring->tail; + return ring->cons == ring->prod; } static inline int ring_isfull(struct req_ring* ring) { - return ring_next(ring, ring->tail) == ring->head; + return ring_next(ring->prod) == ring->cons; } + +static void ring_add_request(struct req_ring *ring, const td_request_t *treq) +{ + /* If ring is full, it means that tapdisk2 has some bug */ + if (ring_isfull(ring)) { + RPRINTF("OOPS, ring is full\n"); + exit(1); + } + + ring->pending_requests[ring->prod].treq = *treq; + ring->prod = ring_next(ring->prod); +} + /* Prototype declarations */ static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s); @@ -724,89 +738,113 @@ static int mwrite(int fd, void* buf, size_t len) select(fd + 1, NULL, &wfds, NULL, &tv); } - -static void inline close_stream_fd(struct tdremus_state *s) -{ - if (s->stream_fd.fd < 0) - return; - - /* XXX: -2 is magic. replace with macro perhaps? */ - tapdisk_server_unregister_event(s->stream_fd.id); - close(s->stream_fd.fd); - s->stream_fd.fd = -2; -} - -static void close_server_fd(struct tdremus_state *s) -{ - if (s->server_fd.fd < 0) - return; - - tapdisk_server_unregister_event(s->server_fd.id); - s->server_fd.id = -1; - close(s->stream_fd.fd); - s->stream_fd.fd = -1; -} - /* primary functions */ static void remus_client_event(event_id_t, char mode, void *private); +static int primary_forward_request(struct tdremus_state *s, + const td_request_t *treq); -static int primary_blocking_connect(struct tdremus_state *state) +/* + * It is called when we cannot connect to backup, or find I/O error when + * reading/writing. + */ +static void primary_failed(struct tdremus_state *s, int rc) { - int fd; - int id; + td_replication_connect_kill(&s->t); + if (rc == ERROR_INTERNAL) + RPRINTF("switch to unprotected mode due to internal error"); + UNREGISTER_EVENT(s->stream_fd.id); + switch_mode(s->tdremus_driver, mode_unprotected); +} + +static int remus_handle_queued_io(struct tdremus_state *s) +{ + struct req_ring *queued_io = &s->queued_io; + unsigned int cons; + td_request_t *treq; int rc; - int flags; - RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); + while (!ring_isempty(queued_io)) { + cons = queued_io->cons; + treq = &queued_io->pending_requests[cons].treq; - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - RPRINTF("could not create client socket: %d\n", errno); - return -1; - } - - do { - if ((rc = connect(fd, (struct sockaddr *)&state->sa, - sizeof(state->sa))) < 0) - { - if (errno == ECONNREFUSED) { - RPRINTF("connection refused -- retrying in 1 second\n"); - sleep(1); - } else { - RPRINTF("connection failed: %d\n", errno); - close(fd); - return -1; - } + if (treq->op == TD_OP_WRITE) { + rc = primary_forward_request(s, treq); + if (rc) + return rc; } - } while (rc < 0); - RPRINTF("client connected\n"); - - /* make socket nonblocking */ - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) - flags = 0; - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) - { - RPRINTF("error making socket nonblocking\n"); - close(fd); - return -1; + td_forward_request(*treq); + queued_io->cons = ring_next(cons); } - if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) { - RPRINTF("error registering client event handler: %s\n", strerror(id)); - close(fd); - return -1; - } - - state->stream_fd.fd = fd; - state->stream_fd.id = id; return 0; } -/* on read, just pass request through */ +static void remus_client_established(td_replication_connect_t *t, int rc) +{ + struct tdremus_state *s = CONTAINER_OF(t, *s, t); + event_id_t id; + + if (rc) { + primary_failed(s, rc); + return; + } + + /* the connect succeeded */ + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, + 0, remus_client_event, s); + if(id < 0) { + RPRINTF("error registering client event handler: %s\n", + strerror(id)); + primary_failed(s, ERROR_INTERNAL); + return; + } + + s->stream_fd.fd = t->fd; + s->stream_fd.id = id; + + /* handle the queued requests */ + rc = remus_handle_queued_io(s); + if (rc) + primary_failed(s, rc); +} + static void primary_queue_read(td_driver_t *driver, td_request_t treq) { - /* just pass read through */ - td_forward_request(treq); + struct tdremus_state *s = (struct tdremus_state *)driver->data; + struct req_ring *ring = &s->queued_io; + + if (ring_isempty(ring)) { + /* just pass read through */ + td_forward_request(treq); + return; + } + + ring_add_request(ring, &treq); +} + +static int primary_forward_request(struct tdremus_state *s, + const td_request_t *treq) +{ + char header[sizeof(uint32_t) + sizeof(uint64_t)]; + uint32_t *sectors = (uint32_t *)header; + uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t)); + td_driver_t *driver = s->tdremus_driver; + + *sectors = treq->secs; + *sector = treq->sec; + + if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0) + return ERROR_IO; + + if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0) + return ERROR_IO; + + if (mwrite(s->stream_fd.fd, treq->buf, + treq->secs * driver->info.sector_size) < 0) + return ERROR_IO; + + return 0; } /* TODO: @@ -819,28 +857,28 @@ static void primary_queue_read(td_driver_t *driver, td_request_t treq) static void primary_queue_write(td_driver_t *driver, td_request_t treq) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - - char header[sizeof(uint32_t) + sizeof(uint64_t)]; - uint32_t *sectors = (uint32_t *)header; - uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t)); + int rc, ret; // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); - /* -1 means we haven't connected yet, -2 means the connection was lost */ - if(s->stream_fd.fd == -1) { + ret = td_replication_connect_status(&s->t); + if(ret == -1) { RPRINTF("connecting to backup...\n"); - primary_blocking_connect(s); + s->t.callback = remus_client_established; + rc = td_replication_client_start(&s->t); + if (rc) + goto fail; } - *sectors = treq.secs; - *sector = treq.sec; + /* The connection is not established, just queue the request */ + if (ret != 1) { + ring_add_request(&s->queued_io, &treq); + return; + } - if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0) - goto fail; - if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0) - goto fail; - - if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0) + /* The connection is established */ + rc = primary_forward_request(s, &treq); + if (rc) goto fail; td_forward_request(treq); @@ -850,7 +888,7 @@ static void primary_queue_write(td_driver_t *driver, td_request_t treq) fail: /* switch to unprotected mode and tell tapdisk to retry */ RPRINTF("write request replication failed, switching to unprotected mode"); - switch_mode(s->tdremus_driver, mode_unprotected); + primary_failed(s, rc); td_complete_request(treq, -EBUSY); } @@ -867,7 +905,7 @@ static int client_flush(td_driver_t *driver) if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) { RPRINTF("error flushing output"); - close_stream_fd(s); + primary_failed(s, ERROR_IO); return -1; } @@ -886,6 +924,26 @@ static int server_flush(td_driver_t *driver) return ramdisk_flush(driver, s); } +/* It is called when switching the mode from primary to unprotected */ +static int primary_flush(td_driver_t *driver) +{ + struct tdremus_state *s = driver->data; + struct req_ring *ring = &s->queued_io; + unsigned int cons; + + if (ring_isempty(ring)) + return 0; + + while (!ring_isempty(ring)) { + cons = ring->cons; + ring->cons = ring_next(cons); + + td_forward_request(ring->pending_requests[cons].treq); + } + + return client_flush(driver); +} + static int primary_start(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; @@ -894,7 +952,7 @@ static int primary_start(td_driver_t *driver) tapdisk_remus.td_queue_read = primary_queue_read; tapdisk_remus.td_queue_write = primary_queue_write; - s->queue_flush = client_flush; + s->queue_flush = primary_flush; s->stream_fd.fd = -1; s->stream_fd.id = -1; @@ -913,7 +971,7 @@ static void remus_client_event(event_id_t id, char mode, void *private) if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) { /* replication stream closed or otherwise broken (timeout, reset, &c) */ RPRINTF("error reading from backup\n"); - close_stream_fd(s); + primary_failed(s, ERROR_IO); return; } @@ -924,7 +982,7 @@ static void remus_client_event(event_id_t id, char mode, void *private) ctl_respond(s, TDREMUS_DONE); else { RPRINTF("received unknown message: %s\n", req); - close_stream_fd(s); + primary_failed(s, ERROR_IO); } return; @@ -933,84 +991,36 @@ static void remus_client_event(event_id_t id, char mode, void *private) /* backup functions */ static void remus_server_event(event_id_t id, char mode, void *private); -/* returns the socket that receives write requests */ -static void remus_server_accept(event_id_t id, char mode, void* private) +/* It is called when we find some I/O error */ +static void backup_failed(struct tdremus_state *s, int rc) { - struct tdremus_state* s = (struct tdremus_state *) private; + UNREGISTER_EVENT(s->stream_fd.id); + td_replication_connect_kill(&s->t); + /* We will switch to unprotected mode in backup_queue_write() */ +} - int stream_fd; - event_id_t cid; +/* returns the socket that receives write requests */ +static void remus_server_established(td_replication_connect_t *t, int rc) +{ + struct tdremus_state *s = CONTAINER_OF(t, *s, t); + event_id_t id; - /* XXX: add address-based black/white list */ - if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) { - RPRINTF("error accepting connection: %d\n", errno); - return; - } - - /* TODO: check to see if we are already replicating. if so just close the - * connection (or do something smarter) */ - RPRINTF("server accepted connection\n"); + /* rc is always 0 */ /* add tapdisk event for replication stream */ - cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, - remus_server_event, s); + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, t->fd, 0, + remus_server_event, s); - if(cid < 0) { - RPRINTF("error registering connection event handler: %s\n", strerror(errno)); - close(stream_fd); + if (id < 0) { + RPRINTF("error registering connection event handler: %s\n", + strerror(errno)); + td_replication_server_restart(t); return; } /* store replication file descriptor */ - s->stream_fd.fd = stream_fd; - s->stream_fd.id = cid; -} - -/* returns -2 if EADDRNOTAVAIL */ -static int remus_bind(struct tdremus_state* s) -{ -// struct sockaddr_in sa; - int opt; - int rc = -1; - - if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - RPRINTF("could not create server socket: %d\n", errno); - return rc; - } - opt = 1; - if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) - RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno); - - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) { - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd, - inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno)); - if (errno != EADDRINUSE) - rc = -2; - goto err_sfd; - } - if (listen(s->server_fd.fd, 10)) { - RPRINTF("could not listen on socket: %d\n", errno); - goto err_sfd; - } - - /* The socket s now bound to the address and listening so we may now register - * the fd with tapdisk */ - - if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, - s->server_fd.fd, 0, - remus_server_accept, s)) < 0) { - RPRINTF("error registering server connection event handler: %s", - strerror(s->server_fd.id)); - goto err_sfd; - } - - return 0; - - err_sfd: - close(s->server_fd.fd); - s->server_fd.fd = -1; - - return rc; + s->stream_fd.fd = t->fd; + s->stream_fd.id = id; } /* wait for latest checkpoint to be applied */ @@ -1053,6 +1063,8 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq) * handle the write */ + /* If we have called backup_failed, calling it again is harmless */ + backup_failed(s, ERROR_INTERNAL); switch_mode(driver, mode_unprotected); /* TODO: call the appropriate write function rather than return EBUSY */ td_complete_request(treq, -EBUSY); @@ -1061,7 +1073,6 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq) static int backup_start(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - int fd; if (ramdisk_start(driver) < 0) return -1; @@ -1073,12 +1084,12 @@ static int backup_start(td_driver_t *driver) return 0; } -static int server_do_wreq(td_driver_t *driver) +static void server_do_wreq(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; static tdremus_wire_t twreq; char buf[4096]; - int len, rc; + int len, rc = ERROR_IO; char header[sizeof(uint32_t) + sizeof(uint64_t)]; uint32_t *sectors = (uint32_t *) header; @@ -1097,28 +1108,28 @@ static int server_do_wreq(td_driver_t *driver) if (len > sizeof(buf)) { /* freak out! */ RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf)); - return -1; + goto err; } if (mread(s->stream_fd.fd, buf, len) < 0) goto err; - if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) + if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) { + rc = ERROR_INTERNAL; goto err; + } - return 0; + return; err: /* should start failover */ RPRINTF("backup write request error\n"); - close_stream_fd(s); - - return -1; + backup_failed(s, rc); } /* at this point, the server can start applying the most recent * ramdisk. */ -static int server_do_creq(td_driver_t *driver) +static void server_do_creq(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; @@ -1128,9 +1139,7 @@ static int server_do_creq(td_driver_t *driver) /* XXX this message should not be sent until flush completes! */ if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4) - return -1; - - return 0; + backup_failed(s, ERROR_IO); } @@ -1213,11 +1222,6 @@ static int unprotected_start(td_driver_t *driver) RPRINTF("failure detected, activating passthrough\n"); - /* close the server socket */ - close_stream_fd(s); - - close_server_fd(s); - /* install the unprotected read/write handlers */ tapdisk_remus.td_queue_read = unprotected_queue_read; tapdisk_remus.td_queue_write = unprotected_queue_write; @@ -1227,90 +1231,6 @@ static int unprotected_start(td_driver_t *driver) /* control */ - -static inline int resolve_address(const char* addr, struct in_addr* ia) -{ - struct hostent* he; - uint32_t ip; - - if (!(he = gethostbyname(addr))) { - RPRINTF("error resolving %s: %d\n", addr, h_errno); - return -1; - } - - if (!he->h_addr_list[0]) { - RPRINTF("no address found for %s\n", addr); - return -1; - } - - /* network byte order */ - ip = *((uint32_t**)he->h_addr_list)[0]; - ia->s_addr = ip; - - return 0; -} - -static int get_args(td_driver_t *driver, const char* name) -{ - struct tdremus_state *state = (struct tdremus_state *)driver->data; - char* host; - char* port; -// char* driver_str; -// char* parent; -// int type; -// char* path; -// unsigned long ulport; -// int i; -// struct sockaddr_in server_addr_in; - - int gai_status; - int valid_addr; - struct addrinfo gai_hints; - struct addrinfo *servinfo, *servinfo_itr; - - memset(&gai_hints, 0, sizeof gai_hints); - gai_hints.ai_family = AF_UNSPEC; - gai_hints.ai_socktype = SOCK_STREAM; - - port = strchr(name, ':'); - if (!port) { - RPRINTF("missing host in %s\n", name); - return -ENOENT; - } - if (!(host = strndup(name, port - name))) { - RPRINTF("unable to allocate host\n"); - return -ENOMEM; - } - port++; - - if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) { - RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status)); - return -ENOENT; - } - - /* TODO: do something smarter here */ - valid_addr = 0; - for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) { - void *addr; - char *ipver; - - if (servinfo_itr->ai_family == AF_INET) { - valid_addr = 1; - memset(&state->sa, 0, sizeof(state->sa)); - state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr; - break; - } - } - freeaddrinfo(servinfo); - - if (!valid_addr) - return -ENOENT; - - RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); - - return 0; -} - static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) { struct tdremus_state *s = (struct tdremus_state *)driver->data; @@ -1343,6 +1263,20 @@ static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) return rc; } +static void ctl_reopen(struct tdremus_state *s) +{ + ctl_unregister(s); + CLOSE_FD(s->ctl_fd.fd); + RPRINTF("FIFO closed\n"); + + if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) { + RPRINTF("error reopening FIFO: %d\n", errno); + return; + } + + ctl_register(s); +} + static void ctl_request(event_id_t id, char mode, void *private) { struct tdremus_state *s = (struct tdremus_state *)private; @@ -1355,11 +1289,7 @@ static void ctl_request(event_id_t id, char mode, void *private) if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) { RPRINTF("0-byte read received, reopening FIFO\n"); /*TODO: we may have to unregister/re-register with tapdisk_server */ - close(s->ctl_fd.fd); - RPRINTF("FIFO closed\n"); - if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) { - RPRINTF("error reopening FIFO: %d\n", errno); - } + ctl_reopen(s); return; } @@ -1372,7 +1302,7 @@ static void ctl_request(event_id_t id, char mode, void *private) msg[rc] = '\0'; if (!strncmp(msg, "flush", 5)) { if (s->mode == mode_primary) { - if ((rc = s->queue_flush(driver))) { + if ((rc = client_flush(driver))) { RPRINTF("error passing flush request to backup"); ctl_respond(s, TDREMUS_FAIL); } @@ -1521,6 +1451,7 @@ static void ctl_unregister(struct tdremus_state *s) static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) { struct tdremus_state *s = (struct tdremus_state *)driver->data; + td_replication_connect_t *t = &s->t; int rc; const char *name = image->name; td_flag_t flags = image->flags; @@ -1531,7 +1462,6 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) remus_image = image; memset(s, 0, sizeof(*s)); - s->server_fd.fd = -1; s->stream_fd.fd = -1; s->ctl_fd.fd = -1; s->msg_fd.fd = -1; @@ -1540,8 +1470,11 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) * the driver stack from the stream_fd event handler */ s->tdremus_driver = driver; - /* parse name to get info etc */ - if ((rc = get_args(driver, name))) + t->log_prefix = "remus"; + t->retry_timeout_s = REMUS_CONNRETRY_TIMEOUT; + t->max_connections = 10; + t->callback = remus_server_established; + if ((rc = td_replication_connect_init(t, name))) return rc; if ((rc = ctl_open(driver, name))) { @@ -1555,7 +1488,7 @@ static int tdremus_open(td_driver_t *driver, td_image_t *image, td_uuid_t uuid) return rc; } - if (!(rc = remus_bind(s))) + if (!(rc = td_replication_server_start(t))) rc = switch_mode(driver, mode_backup); else if (rc == -2) rc = switch_mode(driver, mode_primary); @@ -1575,8 +1508,7 @@ static int tdremus_close(td_driver_t *driver) if (s->ramdisk.inprogress) hashtable_destroy(s->ramdisk.inprogress, 0); - close_server_fd(s); - close_stream_fd(s); + td_replication_connect_kill(&s->t); ctl_unregister(s); ctl_close(s); diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h index 9e051cc..07fd630 100644 --- a/tools/blktap2/drivers/block-replication.h +++ b/tools/blktap2/drivers/block-replication.h @@ -48,6 +48,7 @@ enum { ERROR_INTERNAL = -1, ERROR_CONNECTION = -2, + ERROR_IO = -3, }; typedef struct td_replication_connect td_replication_connect_t; -- 1.9.3 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |