[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v2 35/45] blktap2: connect to backup asynchronously
tapdisk2 is a single thread process. If we use remus, we will block in primary_blocking_connect(). The user will not have any chance to talk with tapdisk2. So we should connect to backup asynchronously. Before the connection is established, we queue all I/O request, and handle it when the connection is established. Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx> --- tools/blktap2/drivers/block-remus.c | 760 +++++++++++++++++++++++------------- 1 file changed, 479 insertions(+), 281 deletions(-) diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c index d358b44..c21f851 100644 --- a/tools/blktap2/drivers/block-remus.c +++ b/tools/blktap2/drivers/block-remus.c @@ -63,10 +63,28 @@ #define RAMDISK_HASHSIZE 128 /* connect retry timeout (seconds) */ -#define REMUS_CONNRETRY_TIMEOUT 10 +#define REMUS_CONNRETRY_TIMEOUT 1 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a) +#define UNREGISTER_EVENT(id) \ + do { \ + if (id >= 0) { \ + tapdisk_server_unregister_event(id); \ + id = -1; \ + } \ + } while (0) + +#define CLOSE_FD(fd) \ + do { \ + if (fd >= 0) { \ + close(fd); \ + fd = -1; \ + } \ + } while (0) + +#define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS + enum tdremus_mode { mode_invalid = 0, mode_unprotected, @@ -74,17 +92,21 @@ enum tdremus_mode { mode_backup }; +enum { + ERROR_INTERNAL = -1, + ERROR_IO = -2, + ERROR_CONNECTION = -3, +}; + struct tdremus_req { - uint64_t sector; - int nb_sectors; - char buf[4096]; + td_request_t treq; }; struct req_ring { /* waste one slot to distinguish between empty and full */ - struct tdremus_req requests[MAX_REQUESTS * 2 + 1]; - unsigned int head; - unsigned int tail; + struct tdremus_req pending_requests[MAX_REMUS_REQUEST + 1]; + unsigned int prod; + unsigned int cons; }; /* TODO: This isn't very pretty, but to properly generate our own treqs (needed @@ -144,10 +166,21 @@ struct ramdisk_write_cbdata { typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq); -/* poll_fd type for blktap2 fd system. taken from block_log.c */ +/* + * If cid, rid and wid are -1, fd must be -1. It means that + * we are in unpritected mode or we don't start to connect + * to backup. + * If fd is an valid fd: + * cid is valid, rid and wid must be invalid. It means that + * the connection is in progress. + * cid is invalid. rid or wid must be valid. It means that + * the connection is established. + */ typedef struct poll_fd { int fd; - event_id_t id; + event_id_t cid; + event_id_t rid; + event_id_t wid; } poll_fd_t; struct tdremus_state { @@ -166,8 +199,11 @@ struct tdremus_state { poll_fd_t server_fd; /* server listen port */ poll_fd_t stream_fd; /* replication channel */ - /* queue write requests, batch-replicate at submit */ - struct req_ring write_ring; + /* + * queue I/O requests, batch-replicate when + * the connection is established. + */ + struct req_ring queued_io; /* ramdisk data*/ struct ramdisk ramdisk; @@ -207,11 +243,13 @@ static int tdremus_close(td_driver_t *driver); static int switch_mode(td_driver_t *driver, enum tdremus_mode mode); static int ctl_respond(struct tdremus_state *s, const char *response); +static int ctl_register(struct tdremus_state *s); +static void ctl_unregister(struct tdremus_state *s); /* ring functions */ -static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos) +static inline unsigned int ring_next(unsigned int pos) { - if (++pos >= MAX_REQUESTS * 2 + 1) + if (++pos >= MAX_REMUS_REQUEST + 1) return 0; return pos; @@ -219,13 +257,26 @@ static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos) static inline int ring_isempty(struct req_ring* ring) { - return ring->head == ring->tail; + return ring->cons == ring->prod; } static inline int ring_isfull(struct req_ring* ring) { - return ring_next(ring, ring->tail) == ring->head; + return ring_next(ring->prod) == ring->cons; +} + +static void ring_add_request(struct req_ring *ring, const td_request_t *treq) +{ + /* If ring is full, it means that tapdisk2 has some bug */ + if (ring_isfull(ring)) { + RPRINTF("OOPS, ring is full\n"); + exit(1); + } + + ring->pending_requests[ring->prod].treq = *treq; + ring->prod = ring_next(ring->prod); } + /* Prototype declarations */ static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s); @@ -728,30 +779,39 @@ static int mwrite(int fd, void* buf, size_t len) static void inline close_stream_fd(struct tdremus_state *s) { - if (s->stream_fd.fd < 0) - return; - /* XXX: -2 is magic. replace with macro perhaps? */ - tapdisk_server_unregister_event(s->stream_fd.id); - close(s->stream_fd.fd); - s->stream_fd.fd = -2; + UNREGISTER_EVENT(s->stream_fd.cid); + UNREGISTER_EVENT(s->stream_fd.rid); + UNREGISTER_EVENT(s->stream_fd.wid); + + /* close the connection */ + CLOSE_FD(s->stream_fd.fd); } static void close_server_fd(struct tdremus_state *s) { - if (s->server_fd.fd < 0) - return; - - tapdisk_server_unregister_event(s->server_fd.id); - s->server_fd.id = -1; - close(s->stream_fd.fd); - s->stream_fd.fd = -1; + UNREGISTER_EVENT(s->server_fd.cid); + CLOSE_FD(s->server_fd.fd); } /* primary functions */ static void remus_client_event(event_id_t, char mode, void *private); static void remus_connect_event(event_id_t id, char mode, void *private); static void remus_retry_connect_event(event_id_t id, char mode, void *private); +static int primary_forward_request(struct tdremus_state *s, + const td_request_t *treq); + +/* + * It is called when we cannot connect to backup, or find I/O error when + * reading/writing. + */ +static void primary_failed(struct tdremus_state *s, int rc) +{ + close_stream_fd(s); + if (rc == ERROR_INTERNAL) + RPRINTF("switch to unprotected mode due to internal error"); + switch_mode(s->tdremus_driver, mode_unprotected); +} static int primary_do_connect(struct tdremus_state *state) { @@ -760,281 +820,247 @@ static int primary_do_connect(struct tdremus_state *state) int rc; int flags; - RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); + RPRINTF("client connecting to %s:%d...\n", + inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { RPRINTF("could not create client socket: %d\n", errno); - return -1; + return ERROR_INTERNAL; } + state->stream_fd.fd = fd; /* make socket nonblocking */ if ((flags = fcntl(fd, F_GETFL, 0)) == -1) flags = 0; - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) - return -1; + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + RPRINTF("error setting fd %d to non block mode\n", fd); + return ERROR_INTERNAL; + } - /* once we have created the socket and populated the address, we can now start - * our non-blocking connect. rather than duplicating code we trigger a timeout - * on the socket fd, which calls out nonblocking connect code + /* + * once we have created the socket and populated the address, + * we can now start our non-blocking connect. rather than + * duplicating code we trigger a timeout on the socket fd, + * which calls out nonblocking connect code */ - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) { - RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id)); - /* TODO: we leak a fd here */ - return -1; + if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, + remus_retry_connect_event, + state)) < 0) { + RPRINTF("error registering timeout client connection event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; } - state->stream_fd.fd = fd; - state->stream_fd.id = id; + + state->stream_fd.cid = id; return 0; } -static int primary_blocking_connect(struct tdremus_state *state) +static int remus_handle_queued_io(struct tdremus_state *s) { - int fd; - int id; + struct req_ring *queued_io = &s->queued_io; + unsigned int cons; + td_request_t *treq; int rc; - int flags; - - RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port)); - if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - RPRINTF("could not create client socket: %d\n", errno); - return -1; - } + while (!ring_isempty(queued_io)) { + cons = queued_io->cons; + treq = &queued_io->pending_requests[cons].treq; - do { - if ((rc = connect(fd, (struct sockaddr *)&state->sa, - sizeof(state->sa))) < 0) - { - if (errno == ECONNREFUSED) { - RPRINTF("connection refused -- retrying in 1 second\n"); - sleep(1); - } else { - RPRINTF("connection failed: %d\n", errno); - close(fd); - return -1; - } + if (treq->op == TD_OP_WRITE) { + rc = primary_forward_request(s, treq); + if (rc) + return rc; } - } while (rc < 0); - - RPRINTF("client connected\n"); - - /* make socket nonblocking */ - if ((flags = fcntl(fd, F_GETFL, 0)) == -1) - flags = 0; - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) - { - RPRINTF("error making socket nonblocking\n"); - close(fd); - return -1; - } - if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) { - RPRINTF("error registering client event handler: %s\n", strerror(id)); - close(fd); - return -1; + td_forward_request(*treq); + queued_io->cons = ring_next(cons); } - state->stream_fd.fd = fd; - state->stream_fd.id = id; return 0; } -/* on read, just pass request through */ -static void primary_queue_read(td_driver_t *driver, td_request_t treq) -{ - /* just pass read through */ - td_forward_request(treq); -} - -/* TODO: - * The primary uses mwrite() to write the contents of a write request to the - * backup. This effectively blocks until all data has been copied into a system - * buffer or a timeout has occured. We may wish to instead use tapdisk's - * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts - * and write data in an asynchronous fashion. - */ -static void primary_queue_write(td_driver_t *driver, td_request_t treq) +static int remus_connection_done(struct tdremus_state *s) { - struct tdremus_state *s = (struct tdremus_state *)driver->data; - - char header[sizeof(uint32_t) + sizeof(uint64_t)]; - uint32_t *sectors = (uint32_t *)header; - uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t)); + event_id_t id; - // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); + /* the connect succeeded */ + /* unregister this function and register a new event handler */ + tapdisk_server_unregister_event(s->stream_fd.cid); + s->stream_fd.cid = -1; - /* -1 means we haven't connected yet, -2 means the connection was lost */ - if(s->stream_fd.fd == -1) { - RPRINTF("connecting to backup...\n"); - primary_blocking_connect(s); + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, + 0, remus_client_event, s); + if(id < 0) { + RPRINTF("error registering client event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; } + s->stream_fd.rid = id; - *sectors = treq.secs; - *sector = treq.sec; - - if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0) - goto fail; - if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0) - goto fail; + /* handle the queued requests */ + return remus_handle_queued_io(s); +} - if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0) - goto fail; +static int remus_retry_connect(struct tdremus_state *s) +{ + event_id_t id; - td_forward_request(treq); + tapdisk_server_unregister_event(s->stream_fd.cid); + s->stream_fd.cid = -1; - return; + RPRINTF("connect to backup 1 second later"); + id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, + s->stream_fd.fd, + REMUS_CONNRETRY_TIMEOUT, + remus_retry_connect_event, s); + if (id < 0) { + RPRINTF("error registering timeout client connection event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; + } - fail: - /* switch to unprotected mode and tell tapdisk to retry */ - RPRINTF("write request replication failed, switching to unprotected mode"); - switch_mode(s->tdremus_driver, mode_unprotected); - td_complete_request(treq, -EBUSY); + s->stream_fd.cid = id; + return 0; } - -/* It is called when the user writes "flush" to control file */ -static int client_flush(td_driver_t *driver) +static int remus_wait_connect_done(struct tdremus_state *s) { - struct tdremus_state *s = (struct tdremus_state *)driver->data; - - // RPRINTF("committing output\n"); + event_id_t id; - if (s->stream_fd.fd == -1) - /* connection not yet established, nothing to flush */ - return 0; + tapdisk_server_unregister_event(s->stream_fd.cid); + s->stream_fd.cid = -1; - if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, - strlen(TDREMUS_COMMIT)) < 0) { - RPRINTF("error flushing output"); - close_stream_fd(s); - return -1; + id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, + s->stream_fd.fd, 0, + remus_connect_event, s); + if (id < 0) { + RPRINTF("error registering client connection event handler: %s\n", + strerror(id)); + return ERROR_INTERNAL; } + s->stream_fd.cid = id; return 0; } -static int server_flush(td_driver_t *driver) +/* return 1 if we need to reconnect to backup */ +static int check_connect_errno(int err) { - struct tdremus_state *s = (struct tdremus_state *)driver->data; - /* - * Nothing to flush in beginning. + /* + * The fd is non-block, so we will not get ETIMEDOUT + * after calling connect(). We only can get this errno + * by getsockopt(). */ - if (!s->ramdisk.prev) - return 0; - /* Try to flush any remaining requests */ - return ramdisk_flush(driver, s); -} - -static int primary_start(td_driver_t *driver) -{ - struct tdremus_state *s = (struct tdremus_state *)driver->data; - - RPRINTF("activating client mode\n"); - - tapdisk_remus.td_queue_read = primary_queue_read; - tapdisk_remus.td_queue_write = primary_queue_write; - - s->stream_fd.fd = -1; - s->stream_fd.id = -1; + if (err == ECONNREFUSED || err == ENETUNREACH || + err == EAGAIN || err == ECONNABORTED || + err == ETIMEDOUT) + return 1; return 0; } -/* timeout callback */ static void remus_retry_connect_event(event_id_t id, char mode, void *private) { struct tdremus_state *s = (struct tdremus_state *)private; + int rc, ret; /* do a non-blocking connect */ - if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) - && errno != EINPROGRESS) - { - if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED) - { - /* try again in a second */ - tapdisk_server_unregister_event(s->stream_fd.id); - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) { - RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id)); - return; - } - s->stream_fd.id = id; - } - else - { - /* not recoverable */ - RPRINTF("error connection to server %s\n", strerror(errno)); + ret = connect(s->stream_fd.fd, + (struct sockaddr *)&s->sa, + sizeof(s->sa)); + if (ret) { + if (errno == EINPROGRESS) { + /* + * the connect returned EINPROGRESS (nonblocking + * connect) we must wait for the fd to be writeable + * to determine if the connect worked + */ + rc = remus_wait_connect_done(s); + if (rc) + goto fail; return; } - } - else - { - /* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */ - tapdisk_server_unregister_event(s->stream_fd.id); - if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) { - RPRINTF("error registering client connection event handler: %s\n", strerror(id)); + if (check_connect_errno(errno)) { + rc = remus_retry_connect(s); + if (rc) + goto fail; return; } - s->stream_fd.id = id; + + /* not recoverable */ + RPRINTF("error connection to server %s\n", strerror(errno)); + rc = ERROR_CONNECTION; + goto fail; } + + /* The connection is established unexpectedly */ + rc = remus_connection_done(s); + if (rc) + goto fail; + + return; + +fail: + primary_failed(s, rc); + return; } /* callback when nonblocking connect() is finished */ -/* called only by primary in unprotected state */ static void remus_connect_event(event_id_t id, char mode, void *private) { int socket_errno; socklen_t socket_errno_size; struct tdremus_state *s = (struct tdremus_state *)private; + int rc; - /* check to se if the connect succeeded */ + /* check to see if the connect succeeded */ socket_errno_size = sizeof(socket_errno); - if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) { + if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, + &socket_errno, &socket_errno_size)) { RPRINTF("error getting socket errno\n"); return; } RPRINTF("socket connect returned %d\n", socket_errno); - if(socket_errno) - { + if (socket_errno) { /* the connect did not succeed */ + if (check_connect_errno(socket_errno)) { + /* + * we can probably assume that the backup is down. + * just try again later + */ + rc = remus_retry_connect(s); + if (rc) + goto fail; - if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT - || socket_errno == ECONNABORTED || socket_errno == EAGAIN) - { - /* we can probably assume that the backup is down. just try again later */ - tapdisk_server_unregister_event(s->stream_fd.id); - if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) { - RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id)); - return; - } - s->stream_fd.id = id; - } - else - { - RPRINTF("socket connect returned %d, giving up\n", socket_errno); - } - } - else - { - /* the connect succeeded */ - - /* unregister this function and register a new event handler */ - tapdisk_server_unregister_event(s->stream_fd.id); - if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) { - RPRINTF("error registering client event handler: %s\n", strerror(id)); return; + } else { + RPRINTF("socket connect returned %d, giving up\n", + socket_errno); + rc = ERROR_CONNECTION; + goto fail; } - s->stream_fd.id = id; - /* switch from unprotected to protected client */ - switch_mode(s->tdremus_driver, mode_primary); + return; } + + rc = remus_connection_done(s); + if (rc) + goto fail; + + return; + +fail: + primary_failed(s, rc); } -/* we install this event handler on the primary once we have connected to the backup */ +/* + * we install this event handler on the primary once we have + * connected to the backup. + */ /* wait for "done" message to commit checkpoint */ static void remus_client_event(event_id_t id, char mode, void *private) { @@ -1043,9 +1069,12 @@ static void remus_client_event(event_id_t id, char mode, void *private) int rc; if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) { - /* replication stream closed or otherwise broken (timeout, reset, &c) */ + /* + * replication stream closed or otherwise broken + * (timeout, reset, &c) + */ RPRINTF("error reading from backup\n"); - close_stream_fd(s); + primary_failed(s, ERROR_IO); return; } @@ -1056,22 +1085,169 @@ static void remus_client_event(event_id_t id, char mode, void *private) ctl_respond(s, TDREMUS_DONE); else { RPRINTF("received unknown message: %s\n", req); - close_stream_fd(s); + primary_failed(s, ERROR_IO); + } + + return; +} + +static void primary_queue_read(td_driver_t *driver, td_request_t treq) +{ + struct tdremus_state *s = (struct tdremus_state *)driver->data; + struct req_ring *ring = &s->queued_io; + + if (ring_isempty(ring)) { + /* just pass read through */ + td_forward_request(treq); + return; + } + + ring_add_request(ring, &treq); +} + +static int primary_forward_request(struct tdremus_state *s, + const td_request_t *treq) +{ + char header[sizeof(uint32_t) + sizeof(uint64_t)]; + uint32_t *sectors = (uint32_t *)header; + uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t)); + td_driver_t *driver = s->tdremus_driver; + + *sectors = treq->secs; + *sector = treq->sec; + + if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0) + return ERROR_IO; + + if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0) + return ERROR_IO; + + if (mwrite(s->stream_fd.fd, treq->buf, + treq->secs * driver->info.sector_size) < 0) + return ERROR_IO; + + return 0; +} + +/* TODO: + * The primary uses mwrite() to write the contents of a write request to the + * backup. This effectively blocks until all data has been copied into a system + * buffer or a timeout has occured. We may wish to instead use tapdisk's + * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts + * and write data in an asynchronous fashion. + */ +static void primary_queue_write(td_driver_t *driver, td_request_t treq) +{ + struct tdremus_state *s = (struct tdremus_state *)driver->data; + int rc; + + // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd); + + if(s->stream_fd.fd < 0) { + RPRINTF("connecting to backup...\n"); + rc = primary_do_connect(s); + if (rc) + goto fail; + } + + /* The connection is not established, just queue the request */ + if (s->stream_fd.cid >= 0) { + ring_add_request(&s->queued_io, &treq); + return; } + /* The connection is established */ + rc = primary_forward_request(s, &treq); + if (rc) + goto fail; + + td_forward_request(treq); + return; + +fail: + /* switch to unprotected mode and forward the request */ + RPRINTF("write request replication failed, switching to unprotected mode"); + primary_failed(s, rc); + td_forward_request(treq); +} + +/* It is called when the user write "flush" to control file. */ +static int client_flush(td_driver_t *driver) +{ + struct tdremus_state *s = (struct tdremus_state *)driver->data; + + // RPRINTF("committing output\n"); + + if (s->stream_fd.fd == -1) + /* connection not yet established, nothing to flush */ + return 0; + + if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, + strlen(TDREMUS_COMMIT)) < 0) { + RPRINTF("error flushing output"); + primary_failed(s, ERROR_IO); + return -1; + } + + return 0; +} + +/* It is called when switching the mode from primary to unprotected */ +static int primary_flush(td_driver_t *driver) +{ + struct tdremus_state *s = driver->data; + struct req_ring *ring = &s->queued_io; + unsigned int cons; + + if (ring_isempty(ring)) + return 0; + + while (!ring_isempty(ring)) { + cons = ring->cons; + ring->cons = ring_next(cons); + + td_forward_request(ring->pending_requests[cons].treq); + } + + return 0; +} + +static int primary_start(td_driver_t *driver) +{ + struct tdremus_state *s = (struct tdremus_state *)driver->data; + + RPRINTF("activating client mode\n"); + + tapdisk_remus.td_queue_read = primary_queue_read; + tapdisk_remus.td_queue_write = primary_queue_write; + s->queue_flush = primary_flush; + + s->stream_fd.fd = -1; + s->stream_fd.cid = -1; + s->stream_fd.rid = -1; + s->stream_fd.wid = -1; + + return 0; } /* backup functions */ static void remus_server_event(event_id_t id, char mode, void *private); +/* It is called when we find some I/O error */ +static void backup_failed(struct tdremus_state *s, int rc) +{ + close_stream_fd(s); + close_server_fd(s); + /* We will switch to unprotected mode in backup_queue_write() */ +} + /* returns the socket that receives write requests */ static void remus_server_accept(event_id_t id, char mode, void* private) { struct tdremus_state* s = (struct tdremus_state *) private; int stream_fd; - event_id_t cid; /* XXX: add address-based black/white list */ if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) { @@ -1079,68 +1255,80 @@ static void remus_server_accept(event_id_t id, char mode, void* private) return; } - /* TODO: check to see if we are already replicating. if so just close the - * connection (or do something smarter) */ + /* + * TODO: check to see if we are already replicating. + * if so just close the connection (or do something + * smarter) + */ RPRINTF("server accepted connection\n"); /* add tapdisk event for replication stream */ - cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, - remus_server_event, s); + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0, + remus_server_event, s); - if(cid < 0) { - RPRINTF("error registering connection event handler: %s\n", strerror(errno)); + if (id < 0) { + RPRINTF("error registering connection event handler: %s\n", + strerror(errno)); close(stream_fd); return; } /* store replication file descriptor */ s->stream_fd.fd = stream_fd; - s->stream_fd.id = cid; + s->stream_fd.rid = id; } /* returns -2 if EADDRNOTAVAIL */ static int remus_bind(struct tdremus_state* s) { -// struct sockaddr_in sa; int opt; int rc = -1; + event_id_t id; if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { RPRINTF("could not create server socket: %d\n", errno); return rc; } - opt = 1; - if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) - RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno); - if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) { - RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd, - inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno)); - if (errno != EADDRINUSE) + opt = 1; + if (setsockopt(s->server_fd.fd, SOL_SOCKET, + SO_REUSEADDR, &opt, sizeof(opt)) < 0) + RPRINTF("Error setting REUSEADDR on %d: %d\n", + s->server_fd.fd, errno); + + if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, + sizeof(s->sa)) < 0) { + RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", + s->server_fd.fd, inet_ntoa(s->sa.sin_addr), + ntohs(s->sa.sin_port), errno, strerror(errno)); + if (errno == EADDRNOTAVAIL) rc = -2; goto err_sfd; } + if (listen(s->server_fd.fd, 10)) { RPRINTF("could not listen on socket: %d\n", errno); goto err_sfd; } - /* The socket s now bound to the address and listening so we may now register - * the fd with tapdisk */ - - if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, - s->server_fd.fd, 0, - remus_server_accept, s)) < 0) { + /* + * The socket s now bound to the address and listening so we + * may now register the fd with tapdisk + */ + id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, + s->server_fd.fd, 0, + remus_server_accept, s); + if (id < 0) { RPRINTF("error registering server connection event handler: %s", - strerror(s->server_fd.id)); + strerror(id)); goto err_sfd; } + s->server_fd.cid = id; return 0; - err_sfd: - close(s->server_fd.fd); - s->server_fd.fd = -1; +err_sfd: + CLOSE_FD(s->server_fd.fd); return rc; } @@ -1190,10 +1378,21 @@ void backup_queue_write(td_driver_t *driver, td_request_t treq) td_complete_request(treq, -EBUSY); } +static int server_flush(td_driver_t *driver) +{ + struct tdremus_state *s = (struct tdremus_state *)driver->data; + /* + * Nothing to flush in beginning. + */ + if (!s->ramdisk.prev) + return 0; + /* Try to flush any remaining requests */ + return ramdisk_flush(driver, s); +} + static int backup_start(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; - int fd; if (ramdisk_start(driver) < 0) return -1; @@ -1201,16 +1400,15 @@ static int backup_start(td_driver_t *driver) tapdisk_remus.td_queue_read = backup_queue_read; tapdisk_remus.td_queue_write = backup_queue_write; s->queue_flush = server_flush; - /* TODO set flush function */ return 0; } -static int server_do_wreq(td_driver_t *driver) +static void server_do_wreq(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; static tdremus_wire_t twreq; char buf[4096]; - int len, rc; + int len, rc = ERROR_IO; char header[sizeof(uint32_t) + sizeof(uint64_t)]; uint32_t *sectors = (uint32_t *) header; @@ -1227,39 +1425,40 @@ static int server_do_wreq(td_driver_t *driver) // *sector); if (len > sizeof(buf)) { - /* freak out! */ - RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf)); - return -1; + /* freak out! How to handle the remaining data from primary */ + RPRINTF("write request too large: %d/%u\n", + len, (unsigned)sizeof(buf)); + goto err; } if (mread(s->stream_fd.fd, buf, len) < 0) goto err; - if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) + if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) { + rc = ERROR_INTERNAL; goto err; + } - return 0; + return; err: /* should start failover */ RPRINTF("backup write request error\n"); - close_stream_fd(s); - - return -1; + backup_failed(s, rc); } -static int server_do_sreq(td_driver_t *driver) +static void server_do_sreq(td_driver_t *driver) { /* RPRINTF("submit request received\n"); */ - return 0; + return; } /* at this point, the server can start applying the most recent * ramdisk. */ -static int server_do_creq(td_driver_t *driver) +static void server_do_creq(td_driver_t *driver) { struct tdremus_state *s = (struct tdremus_state *)driver->data; @@ -1269,9 +1468,7 @@ static int server_do_creq(td_driver_t *driver) /* XXX this message should not be sent until flush completes! */ if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4) - return -1; - - return 0; + backup_failed(s, ERROR_IO); } @@ -1356,10 +1553,6 @@ static int unprotected_start(td_driver_t *driver) RPRINTF("failure detected, activating passthrough\n"); - /* close the server socket */ - close_stream_fd(s); - - close_server_fd(s); /* install the unprotected read/write handlers */ tapdisk_remus.td_queue_read = unprotected_queue_read; @@ -1486,6 +1679,19 @@ static int switch_mode(td_driver_t *driver, enum tdremus_mode mode) return rc; } +static void ctl_reopen(struct tdremus_state *s) +{ + ctl_unregister(s); + CLOSE_FD(s->ctl_fd.fd); + RPRINTF("FIFO closed\n"); + + if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) { + RPRINTF("error reopening FIFO: %d\n", errno); + return; + } + ctl_register(s); +} + static void ctl_request(event_id_t id, char mode, void *private) { struct tdremus_state *s = (struct tdremus_state *)private; @@ -1497,12 +1703,6 @@ static void ctl_request(event_id_t id, char mode, void *private) if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) { RPRINTF("0-byte read received, reopening FIFO\n"); - /*TODO: we may have to unregister/re-register with tapdisk_server */ - close(s->ctl_fd.fd); - RPRINTF("FIFO closed\n"); - if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) { - RPRINTF("error reopening FIFO: %d\n", errno); - } return; } @@ -1641,10 +1841,11 @@ static int ctl_register(struct tdremus_state *s) RPRINTF("registering ctl fifo\n"); /* register ctl fd */ - s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); + s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s); - if (s->ctl_fd.id < 0) { - RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id); + if (s->ctl_fd.cid < 0) { + RPRINTF("error registering ctrl FIFO %s: %d\n", + s->ctl_path, s->ctl_fd.cid); return -1; } @@ -1655,10 +1856,7 @@ static void ctl_unregister(struct tdremus_state *s) { RPRINTF("unregistering ctl fifo\n"); - if (s->ctl_fd.id >= 0) { - tapdisk_server_unregister_event(s->ctl_fd.id); - s->ctl_fd.id = -1; - } + UNREGISTER_EVENT(s->ctl_fd.cid); } /* interface */ -- 1.9.3 _______________________________________________ Xen-devel mailing list Xen-devel@xxxxxxxxxxxxx http://lists.xen.org/xen-devel
|
Lists.xenproject.org is hosted with RackSpace, monitoring our |