|
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [Xen-devel] [RFC Patch v2 35/45] blktap2: connect to backup asynchronously
tapdisk2 is a single thread process. If we use remus,
we will block in primary_blocking_connect(). The
user will not have any chance to talk with tapdisk2.
So we should connect to backup asynchronously.
Before the connection is established, we queue
all I/O request, and handle it when the connection
is established.
Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
---
tools/blktap2/drivers/block-remus.c | 760 +++++++++++++++++++++++-------------
1 file changed, 479 insertions(+), 281 deletions(-)
diff --git a/tools/blktap2/drivers/block-remus.c
b/tools/blktap2/drivers/block-remus.c
index d358b44..c21f851 100644
--- a/tools/blktap2/drivers/block-remus.c
+++ b/tools/blktap2/drivers/block-remus.c
@@ -63,10 +63,28 @@
#define RAMDISK_HASHSIZE 128
/* connect retry timeout (seconds) */
-#define REMUS_CONNRETRY_TIMEOUT 10
+#define REMUS_CONNRETRY_TIMEOUT 1
#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
+#define UNREGISTER_EVENT(id) \
+ do { \
+ if (id >= 0) { \
+ tapdisk_server_unregister_event(id); \
+ id = -1; \
+ } \
+ } while (0)
+
+#define CLOSE_FD(fd) \
+ do { \
+ if (fd >= 0) { \
+ close(fd); \
+ fd = -1; \
+ } \
+ } while (0)
+
+#define MAX_REMUS_REQUEST TAPDISK_DATA_REQUESTS
+
enum tdremus_mode {
mode_invalid = 0,
mode_unprotected,
@@ -74,17 +92,21 @@ enum tdremus_mode {
mode_backup
};
+enum {
+ ERROR_INTERNAL = -1,
+ ERROR_IO = -2,
+ ERROR_CONNECTION = -3,
+};
+
struct tdremus_req {
- uint64_t sector;
- int nb_sectors;
- char buf[4096];
+ td_request_t treq;
};
struct req_ring {
/* waste one slot to distinguish between empty and full */
- struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
- unsigned int head;
- unsigned int tail;
+ struct tdremus_req pending_requests[MAX_REMUS_REQUEST + 1];
+ unsigned int prod;
+ unsigned int cons;
};
/* TODO: This isn't very pretty, but to properly generate our own treqs (needed
@@ -144,10 +166,21 @@ struct ramdisk_write_cbdata {
typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
-/* poll_fd type for blktap2 fd system. taken from block_log.c */
+/*
+ * If cid, rid and wid are -1, fd must be -1. It means that
+ * we are in unpritected mode or we don't start to connect
+ * to backup.
+ * If fd is an valid fd:
+ * cid is valid, rid and wid must be invalid. It means that
+ * the connection is in progress.
+ * cid is invalid. rid or wid must be valid. It means that
+ * the connection is established.
+ */
typedef struct poll_fd {
int fd;
- event_id_t id;
+ event_id_t cid;
+ event_id_t rid;
+ event_id_t wid;
} poll_fd_t;
struct tdremus_state {
@@ -166,8 +199,11 @@ struct tdremus_state {
poll_fd_t server_fd; /* server listen port */
poll_fd_t stream_fd; /* replication channel */
- /* queue write requests, batch-replicate at submit */
- struct req_ring write_ring;
+ /*
+ * queue I/O requests, batch-replicate when
+ * the connection is established.
+ */
+ struct req_ring queued_io;
/* ramdisk data*/
struct ramdisk ramdisk;
@@ -207,11 +243,13 @@ static int tdremus_close(td_driver_t *driver);
static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
static int ctl_respond(struct tdremus_state *s, const char *response);
+static int ctl_register(struct tdremus_state *s);
+static void ctl_unregister(struct tdremus_state *s);
/* ring functions */
-static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+static inline unsigned int ring_next(unsigned int pos)
{
- if (++pos >= MAX_REQUESTS * 2 + 1)
+ if (++pos >= MAX_REMUS_REQUEST + 1)
return 0;
return pos;
@@ -219,13 +257,26 @@ static inline unsigned int ring_next(struct req_ring*
ring, unsigned int pos)
static inline int ring_isempty(struct req_ring* ring)
{
- return ring->head == ring->tail;
+ return ring->cons == ring->prod;
}
static inline int ring_isfull(struct req_ring* ring)
{
- return ring_next(ring, ring->tail) == ring->head;
+ return ring_next(ring->prod) == ring->cons;
+}
+
+static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
+{
+ /* If ring is full, it means that tapdisk2 has some bug */
+ if (ring_isfull(ring)) {
+ RPRINTF("OOPS, ring is full\n");
+ exit(1);
+ }
+
+ ring->pending_requests[ring->prod].treq = *treq;
+ ring->prod = ring_next(ring->prod);
}
+
/* Prototype declarations */
static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
@@ -728,30 +779,39 @@ static int mwrite(int fd, void* buf, size_t len)
static void inline close_stream_fd(struct tdremus_state *s)
{
- if (s->stream_fd.fd < 0)
- return;
- /* XXX: -2 is magic. replace with macro perhaps? */
- tapdisk_server_unregister_event(s->stream_fd.id);
- close(s->stream_fd.fd);
- s->stream_fd.fd = -2;
+ UNREGISTER_EVENT(s->stream_fd.cid);
+ UNREGISTER_EVENT(s->stream_fd.rid);
+ UNREGISTER_EVENT(s->stream_fd.wid);
+
+ /* close the connection */
+ CLOSE_FD(s->stream_fd.fd);
}
static void close_server_fd(struct tdremus_state *s)
{
- if (s->server_fd.fd < 0)
- return;
-
- tapdisk_server_unregister_event(s->server_fd.id);
- s->server_fd.id = -1;
- close(s->stream_fd.fd);
- s->stream_fd.fd = -1;
+ UNREGISTER_EVENT(s->server_fd.cid);
+ CLOSE_FD(s->server_fd.fd);
}
/* primary functions */
static void remus_client_event(event_id_t, char mode, void *private);
static void remus_connect_event(event_id_t id, char mode, void *private);
static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+static int primary_forward_request(struct tdremus_state *s,
+ const td_request_t *treq);
+
+/*
+ * It is called when we cannot connect to backup, or find I/O error when
+ * reading/writing.
+ */
+static void primary_failed(struct tdremus_state *s, int rc)
+{
+ close_stream_fd(s);
+ if (rc == ERROR_INTERNAL)
+ RPRINTF("switch to unprotected mode due to internal error");
+ switch_mode(s->tdremus_driver, mode_unprotected);
+}
static int primary_do_connect(struct tdremus_state *state)
{
@@ -760,281 +820,247 @@ static int primary_do_connect(struct tdremus_state
*state)
int rc;
int flags;
- RPRINTF("client connecting to %s:%d...\n",
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+ RPRINTF("client connecting to %s:%d...\n",
+ inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
RPRINTF("could not create client socket: %d\n", errno);
- return -1;
+ return ERROR_INTERNAL;
}
+ state->stream_fd.fd = fd;
/* make socket nonblocking */
if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
flags = 0;
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
- return -1;
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ RPRINTF("error setting fd %d to non block mode\n", fd);
+ return ERROR_INTERNAL;
+ }
- /* once we have created the socket and populated the address, we can
now start
- * our non-blocking connect. rather than duplicating code we trigger a
timeout
- * on the socket fd, which calls out nonblocking connect code
+ /*
+ * once we have created the socket and populated the address,
+ * we can now start our non-blocking connect. rather than
+ * duplicating code we trigger a timeout on the socket fd,
+ * which calls out nonblocking connect code
*/
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
remus_retry_connect_event, state)) < 0) {
- RPRINTF("error registering timeout client connection event
handler: %s\n", strerror(id));
- /* TODO: we leak a fd here */
- return -1;
+ if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0,
+ remus_retry_connect_event,
+ state)) < 0) {
+ RPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
}
- state->stream_fd.fd = fd;
- state->stream_fd.id = id;
+
+ state->stream_fd.cid = id;
return 0;
}
-static int primary_blocking_connect(struct tdremus_state *state)
+static int remus_handle_queued_io(struct tdremus_state *s)
{
- int fd;
- int id;
+ struct req_ring *queued_io = &s->queued_io;
+ unsigned int cons;
+ td_request_t *treq;
int rc;
- int flags;
-
- RPRINTF("client connecting to %s:%d...\n",
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
- if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- RPRINTF("could not create client socket: %d\n", errno);
- return -1;
- }
+ while (!ring_isempty(queued_io)) {
+ cons = queued_io->cons;
+ treq = &queued_io->pending_requests[cons].treq;
- do {
- if ((rc = connect(fd, (struct sockaddr *)&state->sa,
- sizeof(state->sa))) < 0)
- {
- if (errno == ECONNREFUSED) {
- RPRINTF("connection refused -- retrying in 1
second\n");
- sleep(1);
- } else {
- RPRINTF("connection failed: %d\n", errno);
- close(fd);
- return -1;
- }
+ if (treq->op == TD_OP_WRITE) {
+ rc = primary_forward_request(s, treq);
+ if (rc)
+ return rc;
}
- } while (rc < 0);
-
- RPRINTF("client connected\n");
-
- /* make socket nonblocking */
- if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
- flags = 0;
- if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
- {
- RPRINTF("error making socket nonblocking\n");
- close(fd);
- return -1;
- }
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0,
remus_client_event, state)) < 0) {
- RPRINTF("error registering client event handler: %s\n",
strerror(id));
- close(fd);
- return -1;
+ td_forward_request(*treq);
+ queued_io->cons = ring_next(cons);
}
- state->stream_fd.fd = fd;
- state->stream_fd.id = id;
return 0;
}
-/* on read, just pass request through */
-static void primary_queue_read(td_driver_t *driver, td_request_t treq)
-{
- /* just pass read through */
- td_forward_request(treq);
-}
-
-/* TODO:
- * The primary uses mwrite() to write the contents of a write request to the
- * backup. This effectively blocks until all data has been copied into a system
- * buffer or a timeout has occured. We may wish to instead use tapdisk's
- * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
- * and write data in an asynchronous fashion.
- */
-static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+static int remus_connection_done(struct tdremus_state *s)
{
- struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
- char header[sizeof(uint32_t) + sizeof(uint64_t)];
- uint32_t *sectors = (uint32_t *)header;
- uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+ event_id_t id;
- // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+ /* the connect succeeded */
+ /* unregister this function and register a new event handler */
+ tapdisk_server_unregister_event(s->stream_fd.cid);
+ s->stream_fd.cid = -1;
- /* -1 means we haven't connected yet, -2 means the connection was lost
*/
- if(s->stream_fd.fd == -1) {
- RPRINTF("connecting to backup...\n");
- primary_blocking_connect(s);
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->stream_fd.fd,
+ 0, remus_client_event, s);
+ if(id < 0) {
+ RPRINTF("error registering client event handler: %s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
}
+ s->stream_fd.rid = id;
- *sectors = treq.secs;
- *sector = treq.sec;
-
- if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
- goto fail;
- if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
- goto fail;
+ /* handle the queued requests */
+ return remus_handle_queued_io(s);
+}
- if (mwrite(s->stream_fd.fd, treq.buf, treq.secs *
driver->info.sector_size) < 0)
- goto fail;
+static int remus_retry_connect(struct tdremus_state *s)
+{
+ event_id_t id;
- td_forward_request(treq);
+ tapdisk_server_unregister_event(s->stream_fd.cid);
+ s->stream_fd.cid = -1;
- return;
+ RPRINTF("connect to backup 1 second later");
+ id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT,
+ s->stream_fd.fd,
+ REMUS_CONNRETRY_TIMEOUT,
+ remus_retry_connect_event, s);
+ if (id < 0) {
+ RPRINTF("error registering timeout client connection event
handler: %s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
+ }
- fail:
- /* switch to unprotected mode and tell tapdisk to retry */
- RPRINTF("write request replication failed, switching to unprotected
mode");
- switch_mode(s->tdremus_driver, mode_unprotected);
- td_complete_request(treq, -EBUSY);
+ s->stream_fd.cid = id;
+ return 0;
}
-
-/* It is called when the user writes "flush" to control file */
-static int client_flush(td_driver_t *driver)
+static int remus_wait_connect_done(struct tdremus_state *s)
{
- struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
- // RPRINTF("committing output\n");
+ event_id_t id;
- if (s->stream_fd.fd == -1)
- /* connection not yet established, nothing to flush */
- return 0;
+ tapdisk_server_unregister_event(s->stream_fd.cid);
+ s->stream_fd.cid = -1;
- if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
- strlen(TDREMUS_COMMIT)) < 0) {
- RPRINTF("error flushing output");
- close_stream_fd(s);
- return -1;
+ id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
+ s->stream_fd.fd, 0,
+ remus_connect_event, s);
+ if (id < 0) {
+ RPRINTF("error registering client connection event handler:
%s\n",
+ strerror(id));
+ return ERROR_INTERNAL;
}
+ s->stream_fd.cid = id;
return 0;
}
-static int server_flush(td_driver_t *driver)
+/* return 1 if we need to reconnect to backup */
+static int check_connect_errno(int err)
{
- struct tdremus_state *s = (struct tdremus_state *)driver->data;
- /*
- * Nothing to flush in beginning.
+ /*
+ * The fd is non-block, so we will not get ETIMEDOUT
+ * after calling connect(). We only can get this errno
+ * by getsockopt().
*/
- if (!s->ramdisk.prev)
- return 0;
- /* Try to flush any remaining requests */
- return ramdisk_flush(driver, s);
-}
-
-static int primary_start(td_driver_t *driver)
-{
- struct tdremus_state *s = (struct tdremus_state *)driver->data;
-
- RPRINTF("activating client mode\n");
-
- tapdisk_remus.td_queue_read = primary_queue_read;
- tapdisk_remus.td_queue_write = primary_queue_write;
-
- s->stream_fd.fd = -1;
- s->stream_fd.id = -1;
+ if (err == ECONNREFUSED || err == ENETUNREACH ||
+ err == EAGAIN || err == ECONNABORTED ||
+ err == ETIMEDOUT)
+ return 1;
return 0;
}
-/* timeout callback */
static void remus_retry_connect_event(event_id_t id, char mode, void *private)
{
struct tdremus_state *s = (struct tdremus_state *)private;
+ int rc, ret;
/* do a non-blocking connect */
- if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
- && errno != EINPROGRESS)
- {
- if(errno == ECONNREFUSED || errno == ENETUNREACH || errno ==
EAGAIN || errno == ECONNABORTED)
- {
- /* try again in a second */
- tapdisk_server_unregister_event(s->stream_fd.id);
- if((id =
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd,
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
- RPRINTF("error registering timeout client
connection event handler: %s\n", strerror(id));
- return;
- }
- s->stream_fd.id = id;
- }
- else
- {
- /* not recoverable */
- RPRINTF("error connection to server %s\n",
strerror(errno));
+ ret = connect(s->stream_fd.fd,
+ (struct sockaddr *)&s->sa,
+ sizeof(s->sa));
+ if (ret) {
+ if (errno == EINPROGRESS) {
+ /*
+ * the connect returned EINPROGRESS (nonblocking
+ * connect) we must wait for the fd to be writeable
+ * to determine if the connect worked
+ */
+ rc = remus_wait_connect_done(s);
+ if (rc)
+ goto fail;
return;
}
- }
- else
- {
- /* the connect returned EINPROGRESS (nonblocking connect) we
must wait for the fd to be writeable to determine if the connect worked */
- tapdisk_server_unregister_event(s->stream_fd.id);
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD,
s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
- RPRINTF("error registering client connection event
handler: %s\n", strerror(id));
+ if (check_connect_errno(errno)) {
+ rc = remus_retry_connect(s);
+ if (rc)
+ goto fail;
return;
}
- s->stream_fd.id = id;
+
+ /* not recoverable */
+ RPRINTF("error connection to server %s\n", strerror(errno));
+ rc = ERROR_CONNECTION;
+ goto fail;
}
+
+ /* The connection is established unexpectedly */
+ rc = remus_connection_done(s);
+ if (rc)
+ goto fail;
+
+ return;
+
+fail:
+ primary_failed(s, rc);
+ return;
}
/* callback when nonblocking connect() is finished */
-/* called only by primary in unprotected state */
static void remus_connect_event(event_id_t id, char mode, void *private)
{
int socket_errno;
socklen_t socket_errno_size;
struct tdremus_state *s = (struct tdremus_state *)private;
+ int rc;
- /* check to se if the connect succeeded */
+ /* check to see if the connect succeeded */
socket_errno_size = sizeof(socket_errno);
- if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno,
&socket_errno_size)) {
+ if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR,
+ &socket_errno, &socket_errno_size)) {
RPRINTF("error getting socket errno\n");
return;
}
RPRINTF("socket connect returned %d\n", socket_errno);
- if(socket_errno)
- {
+ if (socket_errno) {
/* the connect did not succeed */
+ if (check_connect_errno(socket_errno)) {
+ /*
+ * we can probably assume that the backup is down.
+ * just try again later
+ */
+ rc = remus_retry_connect(s);
+ if (rc)
+ goto fail;
- if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH
|| socket_errno == ETIMEDOUT
- || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
- {
- /* we can probably assume that the backup is down. just
try again later */
- tapdisk_server_unregister_event(s->stream_fd.id);
- if((id =
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd,
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
- RPRINTF("error registering timeout client
connection event handler: %s\n", strerror(id));
- return;
- }
- s->stream_fd.id = id;
- }
- else
- {
- RPRINTF("socket connect returned %d, giving up\n",
socket_errno);
- }
- }
- else
- {
- /* the connect succeeded */
-
- /* unregister this function and register a new event handler */
- tapdisk_server_unregister_event(s->stream_fd.id);
- if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
- RPRINTF("error registering client event handler: %s\n",
strerror(id));
return;
+ } else {
+ RPRINTF("socket connect returned %d, giving up\n",
+ socket_errno);
+ rc = ERROR_CONNECTION;
+ goto fail;
}
- s->stream_fd.id = id;
- /* switch from unprotected to protected client */
- switch_mode(s->tdremus_driver, mode_primary);
+ return;
}
+
+ rc = remus_connection_done(s);
+ if (rc)
+ goto fail;
+
+ return;
+
+fail:
+ primary_failed(s, rc);
}
-/* we install this event handler on the primary once we have connected to the
backup */
+/*
+ * we install this event handler on the primary once we have
+ * connected to the backup.
+ */
/* wait for "done" message to commit checkpoint */
static void remus_client_event(event_id_t id, char mode, void *private)
{
@@ -1043,9 +1069,12 @@ static void remus_client_event(event_id_t id, char mode,
void *private)
int rc;
if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
- /* replication stream closed or otherwise broken (timeout,
reset, &c) */
+ /*
+ * replication stream closed or otherwise broken
+ * (timeout, reset, &c)
+ */
RPRINTF("error reading from backup\n");
- close_stream_fd(s);
+ primary_failed(s, ERROR_IO);
return;
}
@@ -1056,22 +1085,169 @@ static void remus_client_event(event_id_t id, char
mode, void *private)
ctl_respond(s, TDREMUS_DONE);
else {
RPRINTF("received unknown message: %s\n", req);
- close_stream_fd(s);
+ primary_failed(s, ERROR_IO);
+ }
+
+ return;
+}
+
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ struct req_ring *ring = &s->queued_io;
+
+ if (ring_isempty(ring)) {
+ /* just pass read through */
+ td_forward_request(treq);
+ return;
+ }
+
+ ring_add_request(ring, &treq);
+}
+
+static int primary_forward_request(struct tdremus_state *s,
+ const td_request_t *treq)
+{
+ char header[sizeof(uint32_t) + sizeof(uint64_t)];
+ uint32_t *sectors = (uint32_t *)header;
+ uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+ td_driver_t *driver = s->tdremus_driver;
+
+ *sectors = treq->secs;
+ *sector = treq->sec;
+
+ if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+ return ERROR_IO;
+
+ if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+ return ERROR_IO;
+
+ if (mwrite(s->stream_fd.fd, treq->buf,
+ treq->secs * driver->info.sector_size) < 0)
+ return ERROR_IO;
+
+ return 0;
+}
+
+/* TODO:
+ * The primary uses mwrite() to write the contents of a write request to the
+ * backup. This effectively blocks until all data has been copied into a system
+ * buffer or a timeout has occured. We may wish to instead use tapdisk's
+ * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
+ * and write data in an asynchronous fashion.
+ */
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ int rc;
+
+ // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+
+ if(s->stream_fd.fd < 0) {
+ RPRINTF("connecting to backup...\n");
+ rc = primary_do_connect(s);
+ if (rc)
+ goto fail;
+ }
+
+ /* The connection is not established, just queue the request */
+ if (s->stream_fd.cid >= 0) {
+ ring_add_request(&s->queued_io, &treq);
+ return;
}
+ /* The connection is established */
+ rc = primary_forward_request(s, &treq);
+ if (rc)
+ goto fail;
+
+ td_forward_request(treq);
+
return;
+
+fail:
+ /* switch to unprotected mode and forward the request */
+ RPRINTF("write request replication failed, switching to unprotected
mode");
+ primary_failed(s, rc);
+ td_forward_request(treq);
+}
+
+/* It is called when the user write "flush" to control file. */
+static int client_flush(td_driver_t *driver)
+{
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+ // RPRINTF("committing output\n");
+
+ if (s->stream_fd.fd == -1)
+ /* connection not yet established, nothing to flush */
+ return 0;
+
+ if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT,
+ strlen(TDREMUS_COMMIT)) < 0) {
+ RPRINTF("error flushing output");
+ primary_failed(s, ERROR_IO);
+ return -1;
+ }
+
+ return 0;
+}
+
+/* It is called when switching the mode from primary to unprotected */
+static int primary_flush(td_driver_t *driver)
+{
+ struct tdremus_state *s = driver->data;
+ struct req_ring *ring = &s->queued_io;
+ unsigned int cons;
+
+ if (ring_isempty(ring))
+ return 0;
+
+ while (!ring_isempty(ring)) {
+ cons = ring->cons;
+ ring->cons = ring_next(cons);
+
+ td_forward_request(ring->pending_requests[cons].treq);
+ }
+
+ return 0;
+}
+
+static int primary_start(td_driver_t *driver)
+{
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+ RPRINTF("activating client mode\n");
+
+ tapdisk_remus.td_queue_read = primary_queue_read;
+ tapdisk_remus.td_queue_write = primary_queue_write;
+ s->queue_flush = primary_flush;
+
+ s->stream_fd.fd = -1;
+ s->stream_fd.cid = -1;
+ s->stream_fd.rid = -1;
+ s->stream_fd.wid = -1;
+
+ return 0;
}
/* backup functions */
static void remus_server_event(event_id_t id, char mode, void *private);
+/* It is called when we find some I/O error */
+static void backup_failed(struct tdremus_state *s, int rc)
+{
+ close_stream_fd(s);
+ close_server_fd(s);
+ /* We will switch to unprotected mode in backup_queue_write() */
+}
+
/* returns the socket that receives write requests */
static void remus_server_accept(event_id_t id, char mode, void* private)
{
struct tdremus_state* s = (struct tdremus_state *) private;
int stream_fd;
- event_id_t cid;
/* XXX: add address-based black/white list */
if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
@@ -1079,68 +1255,80 @@ static void remus_server_accept(event_id_t id, char
mode, void* private)
return;
}
- /* TODO: check to see if we are already replicating. if so just close
the
- * connection (or do something smarter) */
+ /*
+ * TODO: check to see if we are already replicating.
+ * if so just close the connection (or do something
+ * smarter)
+ */
RPRINTF("server accepted connection\n");
/* add tapdisk event for replication stream */
- cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd,
0,
- remus_server_event, s);
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
+ remus_server_event, s);
- if(cid < 0) {
- RPRINTF("error registering connection event handler: %s\n",
strerror(errno));
+ if (id < 0) {
+ RPRINTF("error registering connection event handler: %s\n",
+ strerror(errno));
close(stream_fd);
return;
}
/* store replication file descriptor */
s->stream_fd.fd = stream_fd;
- s->stream_fd.id = cid;
+ s->stream_fd.rid = id;
}
/* returns -2 if EADDRNOTAVAIL */
static int remus_bind(struct tdremus_state* s)
{
-// struct sockaddr_in sa;
int opt;
int rc = -1;
+ event_id_t id;
if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
RPRINTF("could not create server socket: %d\n", errno);
return rc;
}
- opt = 1;
- if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt,
sizeof(opt)) < 0)
- RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd,
errno);
- if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) <
0) {
- RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
s->server_fd.fd,
- inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port),
errno, strerror(errno));
- if (errno != EADDRINUSE)
+ opt = 1;
+ if (setsockopt(s->server_fd.fd, SOL_SOCKET,
+ SO_REUSEADDR, &opt, sizeof(opt)) < 0)
+ RPRINTF("Error setting REUSEADDR on %d: %d\n",
+ s->server_fd.fd, errno);
+
+ if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa,
+ sizeof(s->sa)) < 0) {
+ RPRINTF("could not bind server socket %d to %s:%d: %d %s\n",
+ s->server_fd.fd, inet_ntoa(s->sa.sin_addr),
+ ntohs(s->sa.sin_port), errno, strerror(errno));
+ if (errno == EADDRNOTAVAIL)
rc = -2;
goto err_sfd;
}
+
if (listen(s->server_fd.fd, 10)) {
RPRINTF("could not listen on socket: %d\n", errno);
goto err_sfd;
}
- /* The socket s now bound to the address and listening so we may now
register
- * the fd with tapdisk */
-
- if((s->server_fd.id =
tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
- s->server_fd.fd, 0,
-
remus_server_accept, s)) < 0) {
+ /*
+ * The socket s now bound to the address and listening so we
+ * may now register the fd with tapdisk
+ */
+ id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+ s->server_fd.fd, 0,
+ remus_server_accept, s);
+ if (id < 0) {
RPRINTF("error registering server connection event handler: %s",
- strerror(s->server_fd.id));
+ strerror(id));
goto err_sfd;
}
+ s->server_fd.cid = id;
return 0;
- err_sfd:
- close(s->server_fd.fd);
- s->server_fd.fd = -1;
+err_sfd:
+ CLOSE_FD(s->server_fd.fd);
return rc;
}
@@ -1190,10 +1378,21 @@ void backup_queue_write(td_driver_t *driver,
td_request_t treq)
td_complete_request(treq, -EBUSY);
}
+static int server_flush(td_driver_t *driver)
+{
+ struct tdremus_state *s = (struct tdremus_state *)driver->data;
+ /*
+ * Nothing to flush in beginning.
+ */
+ if (!s->ramdisk.prev)
+ return 0;
+ /* Try to flush any remaining requests */
+ return ramdisk_flush(driver, s);
+}
+
static int backup_start(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
- int fd;
if (ramdisk_start(driver) < 0)
return -1;
@@ -1201,16 +1400,15 @@ static int backup_start(td_driver_t *driver)
tapdisk_remus.td_queue_read = backup_queue_read;
tapdisk_remus.td_queue_write = backup_queue_write;
s->queue_flush = server_flush;
- /* TODO set flush function */
return 0;
}
-static int server_do_wreq(td_driver_t *driver)
+static void server_do_wreq(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
static tdremus_wire_t twreq;
char buf[4096];
- int len, rc;
+ int len, rc = ERROR_IO;
char header[sizeof(uint32_t) + sizeof(uint64_t)];
uint32_t *sectors = (uint32_t *) header;
@@ -1227,39 +1425,40 @@ static int server_do_wreq(td_driver_t *driver)
// *sector);
if (len > sizeof(buf)) {
- /* freak out! */
- RPRINTF("write request too large: %d/%u\n", len,
(unsigned)sizeof(buf));
- return -1;
+ /* freak out! How to handle the remaining data from primary */
+ RPRINTF("write request too large: %d/%u\n",
+ len, (unsigned)sizeof(buf));
+ goto err;
}
if (mread(s->stream_fd.fd, buf, len) < 0)
goto err;
- if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+ if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
+ rc = ERROR_INTERNAL;
goto err;
+ }
- return 0;
+ return;
err:
/* should start failover */
RPRINTF("backup write request error\n");
- close_stream_fd(s);
-
- return -1;
+ backup_failed(s, rc);
}
-static int server_do_sreq(td_driver_t *driver)
+static void server_do_sreq(td_driver_t *driver)
{
/*
RPRINTF("submit request received\n");
*/
- return 0;
+ return;
}
/* at this point, the server can start applying the most recent
* ramdisk. */
-static int server_do_creq(td_driver_t *driver)
+static void server_do_creq(td_driver_t *driver)
{
struct tdremus_state *s = (struct tdremus_state *)driver->data;
@@ -1269,9 +1468,7 @@ static int server_do_creq(td_driver_t *driver)
/* XXX this message should not be sent until flush completes! */
if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
- return -1;
-
- return 0;
+ backup_failed(s, ERROR_IO);
}
@@ -1356,10 +1553,6 @@ static int unprotected_start(td_driver_t *driver)
RPRINTF("failure detected, activating passthrough\n");
- /* close the server socket */
- close_stream_fd(s);
-
- close_server_fd(s);
/* install the unprotected read/write handlers */
tapdisk_remus.td_queue_read = unprotected_queue_read;
@@ -1486,6 +1679,19 @@ static int switch_mode(td_driver_t *driver, enum
tdremus_mode mode)
return rc;
}
+static void ctl_reopen(struct tdremus_state *s)
+{
+ ctl_unregister(s);
+ CLOSE_FD(s->ctl_fd.fd);
+ RPRINTF("FIFO closed\n");
+
+ if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+ RPRINTF("error reopening FIFO: %d\n", errno);
+ return;
+ }
+ ctl_register(s);
+}
+
static void ctl_request(event_id_t id, char mode, void *private)
{
struct tdremus_state *s = (struct tdremus_state *)private;
@@ -1497,12 +1703,6 @@ static void ctl_request(event_id_t id, char mode, void
*private)
if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
RPRINTF("0-byte read received, reopening FIFO\n");
- /*TODO: we may have to unregister/re-register with
tapdisk_server */
- close(s->ctl_fd.fd);
- RPRINTF("FIFO closed\n");
- if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
- RPRINTF("error reopening FIFO: %d\n", errno);
- }
return;
}
@@ -1641,10 +1841,11 @@ static int ctl_register(struct tdremus_state *s)
RPRINTF("registering ctl fifo\n");
/* register ctl fd */
- s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->ctl_fd.fd, 0, ctl_request, s);
+ s->ctl_fd.cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
s->ctl_fd.fd, 0, ctl_request, s);
- if (s->ctl_fd.id < 0) {
- RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path,
s->ctl_fd.id);
+ if (s->ctl_fd.cid < 0) {
+ RPRINTF("error registering ctrl FIFO %s: %d\n",
+ s->ctl_path, s->ctl_fd.cid);
return -1;
}
@@ -1655,10 +1856,7 @@ static void ctl_unregister(struct tdremus_state *s)
{
RPRINTF("unregistering ctl fifo\n");
- if (s->ctl_fd.id >= 0) {
- tapdisk_server_unregister_event(s->ctl_fd.id);
- s->ctl_fd.id = -1;
- }
+ UNREGISTER_EVENT(s->ctl_fd.cid);
}
/* interface */
--
1.9.3
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|
![]() |
Lists.xenproject.org is hosted with RackSpace, monitoring our |