[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] [xen-unstable] blktap2: add remus driver



# HG changeset patch
# User Keir Fraser <keir.fraser@xxxxxxxxxx>
# Date 1257795906 0
# Node ID 9479190566fdd8c50af9f7b9d8aadb99d40f7472
# Parent  e084ee729798877be7966962606a6b50a52238be
blktap2: add remus driver

Blktap2 port of remus disk driver. Backwards compatable with blktap1
implementation.

Signed-off-by: Ryan O'Connor <rjo@xxxxxxxxx>
Signed-off-by: Brendan Cully <brendan@xxxxxxxxx>
---
 tools/blktap2/drivers/Makefile                   |   11 
 tools/blktap2/drivers/block-remus.c              | 1670 +++++++++++++++++++++++
 tools/blktap2/drivers/disktypes.h                |   13 
 tools/blktap2/drivers/hashtable.c                |  274 +++
 tools/blktap2/drivers/hashtable_itr.c            |  188 ++
 tools/blktap2/drivers/hashtable_itr.h            |  112 +
 tools/blktap2/drivers/hashtable_utility.c        |   71 
 tools/blktap2/drivers/hashtable_utility.h        |   55 
 tools/python/xen/xend/server/BlktapController.py |    1 
 9 files changed, 2394 insertions(+), 1 deletion(-)

diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/Makefile
--- a/tools/blktap2/drivers/Makefile    Mon Nov 09 19:41:16 2009 +0000
+++ b/tools/blktap2/drivers/Makefile    Mon Nov 09 19:45:06 2009 +0000
@@ -36,13 +36,21 @@ CRYPT_LIB += -lcrypto
 CRYPT_LIB += -lcrypto
 endif
 
-LDFLAGS_img := $(CRYPT_LIB) -lpthread -lz
+LDFLAGS_img := $(CRYPT_LIB) -lpthread -lz -lm
 
 LIBS += -L$(LIBVHDDIR) -lvhd
 
 ifeq ($(CONFIG_Linux),y)
 LIBS += -luuid
 endif
+
+REMUS-OBJS  := block-remus.o
+REMUS-OBJS  += hashtable.o
+REMUS-OBJS  += hashtable_itr.o
+REMUS-OBJS  += hashtable_utility.o
+
+$(REMUS-OBJS): CFLAGS += -fgnu89-inline -I$(XEN_XENSTORE)
+
 
 LIBAIO_DIR = $(XEN_ROOT)/tools/libaio/src
 tapdisk2 tapdisk-stream tapdisk-diff $(QCOW_UTIL): AIOLIBS := 
$(LIBAIO_DIR)/libaio.a
@@ -81,6 +89,7 @@ BLK-OBJS-y  += block-qcow.o
 BLK-OBJS-y  += block-qcow.o
 BLK-OBJS-y  += aes.o
 BLK-OBJS-y  += $(PORTABLE-OBJS-y)
+BLK-OBJS-y  += $(REMUS-OBJS)
 
 all: $(IBIN) lock-util qcow-util
 
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/block-remus.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/block-remus.c       Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,1670 @@
+/* block-remus.c
+ *
+ * This disk sends all writes to a backup via a network interface before
+ * passing them to an underlying device.
+ * The backup is a bit more complicated:
+ *  1. It applies all incoming writes to a ramdisk.
+ *  2. When a checkpoint request arrives, it moves the ramdisk to
+ *     a committing state and uses a new ramdisk for subsequent writes.
+ *     It also acknowledges the request, to let the sender know it can
+ *     release output.
+ *  3. The ramdisk flushes its contents to the underlying driver.
+ *  4. At failover, the backup waits for the in-flight ramdisk (if any) to
+ *     drain before letting the domain be activated.
+ *
+ * The driver determines whether it is the client or server by attempting
+ * to bind to the replication address. If the address is not local,
+ * the driver acts as client.
+ *
+ * The following messages are defined for the replication stream:
+ * 1. write request
+ *    "wreq"      4
+ *    num_sectors 4
+ *    sector      8
+ *    buffer      (num_sectors * sector_size)
+ * 2. submit request (may be used as a barrier
+ *    "sreq"      4
+ * 3. commit request
+ *    "creq"      4
+ * After a commit request, the client must wait for a competion message:
+ * 4. completion
+ *    "done"      4
+ */
+
+/* due to architectural choices in tapdisk, block-buffer is forced to
+ * reimplement some code which is meant to be private */
+#define TAPDISK
+#include "tapdisk.h"
+#include "tapdisk-server.h"
+#include "tapdisk-driver.h"
+#include "tapdisk-interface.h"
+#include "hashtable.h"
+#include "hashtable_itr.h"
+#include "hashtable_utility.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/param.h>
+#include <sys/sysctl.h>
+#include <unistd.h>
+
+/* timeout for reads and writes in ms */
+#define NET_TIMEOUT 500
+#define RAMDISK_HASHSIZE 128
+
+/* connect retry timeout (seconds) */
+#define REMUS_CONNRETRY_TIMEOUT 10
+
+#define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
+
+enum tdremus_mode {
+       mode_invalid = 0,
+       mode_unprotected,
+       mode_primary,
+       mode_backup
+};
+
+struct tdremus_req {
+       uint64_t sector;
+       int nb_sectors;
+       char buf[4096];
+};
+
+struct req_ring {
+       /* waste one slot to distinguish between empty and full */
+       struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
+       unsigned int head;
+       unsigned int tail;
+};
+
+/* TODO: This isn't very pretty, but to properly generate our own treqs (needed
+ * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
+ * internals). As a proper fix, we should consider extending the tapdisk
+ * interface with a td_create_request() function, or something similar.
+ *
+ * For now, we just grab the vbd in the td_open() command, and the td_image_t
+ * from the first read request.
+ */
+td_vbd_t *device_vbd = NULL;
+td_image_t *remus_image = NULL;
+
+struct ramdisk {
+       size_t sector_size;
+       struct hashtable* h;
+       /* when a ramdisk is flushed, h is given a new empty hash for writes
+        * while the old ramdisk (prev) is drained asynchronously. To avoid
+        * a race where a read request points to a sector in prev which has
+        * not yet been flushed, check prev on a miss in h */
+       struct hashtable* prev;
+       /* count of outstanding requests to the base driver */
+       size_t inflight;
+};
+
+/* the ramdisk intercepts the original callback for reads and writes.
+ * This holds the original data. */
+/* Might be worth making this a static array in struct ramdisk to avoid
+ * a malloc per request */
+
+struct tdremus_state;
+
+struct ramdisk_cbdata {
+       td_callback_t cb;
+       void* private;
+       char* buf;
+       struct tdremus_state* state;
+};
+
+struct ramdisk_write_cbdata {
+       struct tdremus_state* state;
+       char* buf;
+};
+
+typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
+
+/* poll_fd type for blktap2 fd system. taken from block_log.c */
+typedef struct poll_fd {
+       int        fd;
+       event_id_t id;
+} poll_fd_t;
+
+struct tdremus_state {
+//  struct tap_disk* driver;
+       void* driver_data;
+
+  /* XXX: this is needed so that the server can perform operations on
+   * the driver from the stream_fd event handler. fix this. */
+       td_driver_t *tdremus_driver;
+
+       /* TODO: we may wish to replace these two FIFOs with a unix socket */
+       char*     ctl_path; /* receive flush instruction here */
+       poll_fd_t ctl_fd;     /* io_fd slot for control FIFO */
+       char*     msg_path; /* output completion message here */
+       poll_fd_t msg_fd;
+
+  /* replication host */
+       struct sockaddr_in sa;
+       poll_fd_t server_fd;    /* server listen port */
+       poll_fd_t stream_fd;     /* replication channel */
+
+       /* queue write requests, batch-replicate at submit */
+       struct req_ring write_ring;
+
+       /* ramdisk data*/
+       struct ramdisk ramdisk;
+
+       /* mode methods */
+       enum tdremus_mode mode;
+       int (*queue_flush)(td_driver_t *driver);
+};
+
+typedef struct tdremus_wire {
+       uint32_t op;
+       uint64_t id;
+       uint64_t sec;
+       uint32_t secs;
+} tdremus_wire_t;
+
+#define TDREMUS_READ "rreq"
+#define TDREMUS_WRITE "wreq"
+#define TDREMUS_SUBMIT "sreq"
+#define TDREMUS_COMMIT "creq"
+#define TDREMUS_DONE "done"
+#define TDREMUS_FAIL "fail"
+
+/* primary read/write functions */
+static void primary_queue_read(td_driver_t *driver, td_request_t treq);
+static void primary_queue_write(td_driver_t *driver, td_request_t treq);
+
+/* backup read/write functions */
+static void backup_queue_read(td_driver_t *driver, td_request_t treq);
+static void backup_queue_write(td_driver_t *driver, td_request_t treq);
+
+/* unpritected read/write functions */
+static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
+static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
+
+static int tdremus_close(td_driver_t *driver);
+
+static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
+static int ctl_respond(struct tdremus_state *s, const char *response);
+
+/* ring functions */
+static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
+{
+       if (++pos >= MAX_REQUESTS * 2 + 1)
+               return 0;
+
+       return pos;
+}
+
+static inline int ring_isempty(struct req_ring* ring)
+{
+       return ring->head == ring->tail;
+}
+
+static inline int ring_isfull(struct req_ring* ring)
+{
+       return ring_next(ring, ring->tail) == ring->head;
+}
+
+/* functions to create and sumbit treq's */
+
+static void
+replicated_write_callback(td_request_t treq, int err)
+{
+       struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
+       td_vbd_request_t *vreq;
+
+       vreq = (td_vbd_request_t *) treq.private;
+
+       /* the write failed for now, lets panic. this is very bad */
+       if (err) {
+               RPRINTF("ramdisk write failed, disk image is not consistent\n");
+               exit(-1);
+       }
+
+       /* The write succeeded. let's pull the vreq off whatever request list
+        * it is on and free() it */
+       list_del(&vreq->next);
+       free(vreq);
+
+       s->ramdisk.inflight--;
+       if (!s->ramdisk.inflight && !s->ramdisk.prev) {
+               /* TODO: the ramdisk has been flushed */
+       }
+}
+
+static inline int
+create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, 
char *buf)
+{
+       td_request_t treq;
+       td_vbd_request_t *vreq;
+
+       treq.op      = TD_OP_WRITE;
+       treq.buf     = buf;
+       treq.sec     = sec;
+       treq.secs    = secs;
+       treq.image   = remus_image;
+       treq.cb      = replicated_write_callback;
+       treq.cb_data = state;
+       treq.id      = 0;
+       treq.sidx    = 0;
+
+       vreq         = calloc(1, sizeof(td_vbd_request_t));
+       treq.private = vreq;
+
+       if(!vreq)
+               return -1;
+
+       vreq->submitting = 1;
+       INIT_LIST_HEAD(&vreq->next);
+       tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
+
+       /* TODO:
+        * we should probably leave it up to the caller to forward the request 
*/
+       td_forward_request(treq);
+
+       vreq->submitting--;
+
+       return 0;
+}
+
+
+/* ramdisk methods */
+static int ramdisk_flush(td_driver_t *driver, struct tdremus_state *s);
+
+/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
+static unsigned int uint64_hash(void* k)
+{
+       uint64_t key = *(uint64_t*)k;
+
+       key = (~key) + (key << 18);
+       key = key ^ (key >> 31);
+       key = key * 21;
+       key = key ^ (key >> 11);
+       key = key + (key << 6);
+       key = key ^ (key >> 22);
+
+       return (unsigned int)key;
+}
+
+static int rd_hash_equal(void* k1, void* k2)
+{
+       uint64_t key1, key2;
+
+       key1 = *(uint64_t*)k1;
+       key2 = *(uint64_t*)k2;
+
+       return key1 == key2;
+}
+
+static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
+                       int nb_sectors, char* buf)
+{
+       int i;
+       char* v;
+       uint64_t key;
+
+       for (i = 0; i < nb_sectors; i++) {
+               key = sector + i;
+               if (!(v = hashtable_search(ramdisk->h, &key))) {
+                       /* check whether it is queued in a previous flush 
request */
+                       if (!(ramdisk->prev && (v = 
hashtable_search(ramdisk->prev, &key))))
+                               return -1;
+               }
+               memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
+       }
+
+       return 0;
+}
+
+static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
+                             size_t len)
+{
+       char* v;
+       uint64_t* key;
+
+       if ((v = hashtable_search(h, &sector))) {
+               memcpy(v, buf, len);
+               return 0;
+       }
+
+       if (!(v = malloc(len))) {
+               DPRINTF("ramdisk_write_hash: malloc failed\n");
+               return -1;
+       }
+       memcpy(v, buf, len);
+       if (!(key = malloc(sizeof(*key)))) {
+               DPRINTF("ramdisk_write_hash: error allocating key\n");
+               free(v);
+               return -1;
+       }
+       *key = sector;
+       if (!hashtable_insert(h, key, v)) {
+               DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", 
sector);
+               free(key);
+               free(v);
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
+                               int nb_sectors, char* buf)
+{
+       int i, rc;
+
+       for (i = 0; i < nb_sectors; i++) {
+               rc = ramdisk_write_hash(ramdisk->h, sector + i,
+                                       buf + i * ramdisk->sector_size,
+                                       ramdisk->sector_size);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+static int ramdisk_write_cb(td_driver_t *driver, int res, uint64_t sector,
+                           int nb_sectors, int id, void* private)
+{
+       struct ramdisk_write_cbdata *cbdata = (struct 
ramdisk_write_cbdata*)private;
+       struct tdremus_state *s = cbdata->state;
+       int rc;
+
+       /*
+         RPRINTF("ramdisk write callback: rc %d, %d sectors @ %" PRIu64 "\n", 
res, nb_sectors,
+         sector);
+       */
+
+       free(cbdata->buf);
+       free(cbdata);
+
+       s->ramdisk.inflight--;
+       if (!s->ramdisk.inflight && !s->ramdisk.prev) {
+               /* when this reaches 0 and prev is empty, the disk is flushed. 
*/
+               /*
+                 RPRINTF("ramdisk flush complete\n");
+               */
+       }
+
+       if (s->ramdisk.prev) {
+               /* resubmit as much as possible in the remaining disk */
+               /*
+                 RPRINTF("calling ramdisk_flush from write callback\n");
+               */
+               return ramdisk_flush(driver, s);
+       }
+
+       return 0;
+}
+
+static int uint64_compare(const void* k1, const void* k2)
+{
+       uint64_t u1 = *(uint64_t*)k1;
+       uint64_t u2 = *(uint64_t*)k2;
+
+       /* u1 - u2 is unsigned */
+       return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
+}
+
+/* set psectors to an array of the sector numbers in the hash, returning
+ * the number of entries (or -1 on error) */
+static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
+{
+       struct hashtable_itr* itr;
+       uint64_t* sectors;
+       int count;
+
+       if (!(count = hashtable_count(h)))
+               return 0;
+
+       if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
+               DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
+               return -1;
+       }
+       sectors = *psectors;
+
+       itr = hashtable_iterator(h);
+       count = 0;
+       do {
+               sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
+       } while (hashtable_iterator_advance(itr));
+       free(itr);
+
+       return count;
+}
+
+static char* merge_requests(struct ramdisk* ramdisk, uint64_t start,
+                           size_t count)
+{
+       char* buf;
+       char* sector;
+       int i;
+
+       if (!(buf = valloc(count * ramdisk->sector_size))) {
+               DPRINTF("merge_request: allocation failed\n");
+               return NULL;
+       }
+
+       for (i = 0; i < count; i++) {
+               if (!(sector = hashtable_search(ramdisk->prev, &start))) {
+                       DPRINTF("merge_request: lookup failed on %"PRIu64"\n", 
start);
+                       return NULL;
+               }
+
+               memcpy(buf + i * ramdisk->sector_size, sector, 
ramdisk->sector_size);
+               free(sector);
+
+               start++;
+       }
+
+       return buf;
+}
+
+/* The underlying driver may not handle having the whole ramdisk queued at
+ * once. We queue what we can and let the callbacks attempt to queue more. */
+/* NOTE: may be called from callback, while dd->private still belongs to
+ * the underlying driver */
+static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
+{
+       uint64_t* sectors;
+       char* buf;
+       uint64_t base, batchlen;
+       int i, j, count = 0;
+
+       // RPRINTF("ramdisk flush\n");
+
+       if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
+               return count;
+
+       /*
+         RPRINTF("ramdisk: flushing %d sectors\n", count);
+       */
+
+       /* sort and merge sectors to improve disk performance */
+       qsort(sectors, count, sizeof(*sectors), uint64_compare);
+
+       for (i = 0; i < count;) {
+               base = sectors[i++];
+               while (i < count && sectors[i] == sectors[i-1] + 1)
+                       i++;
+               batchlen = sectors[i-1] - base + 1;
+
+               if (!(buf = merge_requests(&s->ramdisk, base, batchlen))) {
+                       RPRINTF("ramdisk_flush: merge_requests failed\n");
+                       free(sectors);
+                       return -1;
+               }
+
+               /* NOTE: create_write_request() creates a treq AND forwards it 
down
+                * the driver chain */
+               // RPRINTF("forwarding write request at %" PRIu64 ", length: %" 
PRIu64 "\n", base, batchlen);
+               create_write_request(s, base, batchlen, buf);
+               //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " 
forwarded\n", base, batchlen);
+
+               s->ramdisk.inflight++;
+
+               for (j = 0; j < batchlen; j++) {
+                       hashtable_remove(s->ramdisk.prev, &base);
+                       base++;
+               }
+       }
+
+       if (!hashtable_count(s->ramdisk.prev)) {
+               /* everything is in flight */
+               hashtable_destroy(s->ramdisk.prev, 0);
+               s->ramdisk.prev = NULL;
+       }
+
+       free(sectors);
+
+       // RPRINTF("ramdisk flush done\n");
+       return 0;
+}
+
+/* flush ramdisk contents to disk */
+static int ramdisk_start_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       uint64_t* key;
+       char* buf;
+       int rc = 0;
+       int i, j, count, batchlen;
+       uint64_t* sectors;
+
+       if (!hashtable_count(s->ramdisk.h)) {
+               /*
+                 RPRINTF("Nothing to flush\n");
+               */
+               return 0;
+       }
+
+       if (s->ramdisk.prev) {
+               /* a flush request issued while a previous flush is still in 
progress
+                * will merge with the previous request. If you want the 
previous
+                * request to be consistent, wait for it to complete. */
+               if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
+                       return count;
+
+               for (i = 0; i < count; i++) {
+                       buf = hashtable_search(s->ramdisk.h, sectors + i);
+                       ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
+                                          s->ramdisk.sector_size);
+               }
+               free(sectors);
+
+               hashtable_destroy (s->ramdisk.h, 0);
+       } else
+               s->ramdisk.prev = s->ramdisk.h;
+
+       /* We create a new hashtable so that new writes can be performed before
+        * the old hashtable is completely drained. */
+       s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                       rd_hash_equal);
+
+       return ramdisk_flush(driver, s);
+}
+
+
+static int ramdisk_start(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       if (s->ramdisk.h) {
+               RPRINTF("ramdisk already allocated\n");
+               return 0;
+       }
+
+       s->ramdisk.sector_size = driver->info.sector_size;
+       s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
+                                       rd_hash_equal);
+
+       DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
+
+       return 0;
+}
+
+/* common client/server functions */
+/* mayberead: Time out after a certain interval. */
+static int mread(int fd, void* buf, size_t len)
+{
+       fd_set rfds;
+       int rc;
+       size_t cur = 0;
+       struct timeval tv = {
+               .tv_sec = 0,
+               .tv_usec = NET_TIMEOUT * 1000
+       };
+
+       if (!len)
+               return 0;
+
+       /* read first. Only select if read is incomplete. */
+       rc = read(fd, buf, len);
+       while (rc < 0 || cur + rc < len) {
+               if (!rc) {
+                       RPRINTF("end-of-file");
+                       return -1;
+               }
+               if (rc < 0 && errno != EAGAIN) {
+                       RPRINTF("error during read: %s\n", strerror(errno));
+                       return -1;
+               }
+               if (rc > 0)
+                       cur += rc;
+
+               FD_ZERO(&rfds);
+               FD_SET(fd, &rfds);
+               if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
+                       RPRINTF("time out during read\n");
+                       return -1;
+               } else if (rc < 0) {
+                       RPRINTF("error during select: %d\n", errno);
+                       return -1;
+               }
+               rc = read(fd, buf + cur, len - cur);
+       }
+       /*
+         RPRINTF("read %d bytes\n", cur + rc);
+       */
+
+       return 0;
+}
+
+static int mwrite(int fd, void* buf, size_t len)
+{
+       fd_set wfds;
+       size_t cur = 0;
+       int rc;
+       struct timeval tv = {
+               .tv_sec = 0,
+               .tv_usec = NET_TIMEOUT * 1000
+       };
+
+       if (!len)
+               return 0;
+
+       /* read first. Only select if read is incomplete. */
+       rc = write(fd, buf, len);
+       while (rc < 0 || cur + rc < len) {
+               if (!rc) {
+                       RPRINTF("end-of-file");
+                       return -1;
+               }
+               if (rc < 0 && errno != EAGAIN) {
+                       RPRINTF("error during write: %s\n", strerror(errno));
+                       return -1;
+               }
+               if (rc > 0)
+                       cur += rc;
+
+               FD_ZERO(&wfds);
+               FD_SET(fd, &wfds);
+               if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
+                       RPRINTF("time out during write\n");
+                       return -1;
+               } else if (rc < 0) {
+                       RPRINTF("error during select: %d\n", errno);
+                       return -1;
+               }
+               rc = write(fd, buf + cur, len - cur);
+       }
+       /*
+         RPRINTF("wrote %d bytes\n", cur + rc);
+       */
+
+       return 0;
+       FD_ZERO(&wfds);
+       FD_SET(fd, &wfds);
+       select(fd + 1, NULL, &wfds, NULL, &tv);
+}
+
+
+static void inline close_stream_fd(struct tdremus_state *s)
+{
+       /* XXX: -2 is magic. replace with macro perhaps? */
+       tapdisk_server_unregister_event(s->stream_fd.id);
+       close(s->stream_fd.fd);
+       s->stream_fd.fd = -2;
+}
+
+/* primary functions */
+static void remus_client_event(event_id_t, char mode, void *private);
+static void remus_connect_event(event_id_t id, char mode, void *private);
+static void remus_retry_connect_event(event_id_t id, char mode, void *private);
+
+static int primary_do_connect(struct tdremus_state *state)
+{
+       event_id_t id;
+       int fd;
+       int rc;
+       int flags;
+
+       RPRINTF("client connecting to %s:%d...\n", 
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+
+       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+               RPRINTF("could not create client socket: %d\n", errno);
+               return -1;
+       }
+
+       /* make socket nonblocking */
+       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+               flags = 0;
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
+               return -1;
+
+       /* once we have created the socket and populated the address, we can 
now start
+        * our non-blocking connect. rather than duplicating code we trigger a 
timeout
+        * on the socket fd, which calls out nonblocking connect code
+        */
+       if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, 
remus_retry_connect_event, state)) < 0) {
+               RPRINTF("error registering timeout client connection event 
handler: %s\n", strerror(id));
+               /* TODO: we leak a fd here */
+               return -1;
+       }
+       state->stream_fd.fd = fd;
+       state->stream_fd.id = id;
+       return 0;
+}
+
+static int primary_blocking_connect(struct tdremus_state *state)
+{
+       int fd;
+       int id;
+       int rc;
+       int flags;
+
+       RPRINTF("client connecting to %s:%d...\n", 
inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
+
+       if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+               RPRINTF("could not create client socket: %d\n", errno);
+               return -1;
+       }
+
+       do {
+               if ((rc = connect(fd, &state->sa, sizeof(state->sa))) < 0) {
+                       if (errno == ECONNREFUSED) {
+                               RPRINTF("connection refused -- retrying in 1 
second\n");
+                               sleep(1);
+                       } else {
+                               RPRINTF("connection failed: %d\n", errno);
+                               close(fd);
+                               return -1;
+                       }
+               }
+       } while (rc < 0);
+
+       RPRINTF("client connected\n");
+
+       /* make socket nonblocking */
+       if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
+               flags = 0;
+       if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
+       {
+               RPRINTF("error making socket nonblocking\n");
+               close(fd);
+               return -1;
+       }
+
+       if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, 
remus_client_event, state)) < 0) {
+               RPRINTF("error registering client event handler: %s\n", 
strerror(id));
+               close(fd);
+               return -1;
+       }
+
+       state->stream_fd.fd = fd;
+       state->stream_fd.id = id;
+       return 0;
+}
+
+/* on read, just pass request through */
+static void primary_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       /* just pass read through */
+       td_forward_request(treq);
+}
+
+/* TODO:
+ * The primary uses mwrite() to write the contents of a write request to the
+ * backup. This effectively blocks until all data has been copied into a system
+ * buffer or a timeout has occured. We may wish to instead use tapdisk's
+ * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
+ * and write data in an asynchronous fashion.
+ */
+static void primary_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       char header[sizeof(uint32_t) + sizeof(uint64_t)];
+       uint32_t *sectors = (uint32_t *)header;
+       uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
+
+       // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
+
+       /* -1 means we haven't connected yet, -2 means the connection was lost 
*/
+       if(s->stream_fd.fd == -1) {
+               RPRINTF("connecting to backup...\n");
+               primary_blocking_connect(s);
+       }
+
+       *sectors = treq.secs;
+       *sector = treq.sec;
+
+       if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
+               goto fail;
+       if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
+               goto fail;
+
+       if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * 
driver->info.sector_size) < 0)
+               goto fail;
+
+       td_forward_request(treq);
+
+       return;
+
+ fail:
+       /* switch to unprotected mode and tell tapdisk to retry */
+       RPRINTF("write request replication failed, switching to unprotected 
mode");
+       switch_mode(s->tdremus_driver, mode_unprotected);
+       td_complete_request(treq, -EBUSY);
+}
+
+
+static int client_flush(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       // RPRINTF("committing output\n");
+
+       if (s->stream_fd.fd == -1)
+               /* connection not yet established, nothing to flush */
+               return 0;
+
+       if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 
0) {
+               RPRINTF("error flushing output");
+               close_stream_fd(s);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int primary_start(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       RPRINTF("activating client mode\n");
+
+       tapdisk_remus.td_queue_read = primary_queue_read;
+       tapdisk_remus.td_queue_write = primary_queue_write;
+       s->queue_flush = client_flush;
+
+       s->stream_fd.fd = -1;
+       s->stream_fd.id = -1;
+
+       return 0;
+}
+
+/* timeout callback */
+static void remus_retry_connect_event(event_id_t id, char mode, void *private)
+{
+       struct tdremus_state *s = (struct tdremus_state *)private;
+
+       /* do a non-blocking connect */
+       if (connect(s->stream_fd.fd, &s->sa, sizeof(s->sa)) && errno != 
EINPROGRESS) {
+               if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == 
EAGAIN || errno == ECONNABORTED)
+               {
+                       /* try again in a second */
+                       tapdisk_server_unregister_event(s->stream_fd.id);
+                       if((id = 
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, 
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
+                               RPRINTF("error registering timeout client 
connection event handler: %s\n", strerror(id));
+                               return;
+                       }
+                       s->stream_fd.id = id;
+               }
+               else
+               {
+                       /* not recoverable */
+                       RPRINTF("error connection to server %s\n", 
strerror(errno));
+                       return;
+               }
+       }
+       else
+       {
+               /* the connect returned EINPROGRESS (nonblocking connect) we 
must wait for the fd to be writeable to determine if the connect worked */
+
+               tapdisk_server_unregister_event(s->stream_fd.id);
+               if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, 
s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
+                       RPRINTF("error registering client connection event 
handler: %s\n", strerror(id));
+                       return;
+               }
+               s->stream_fd.id = id;
+       }
+}
+
+/* callback when nonblocking connect() is finished */
+/* called only by primary in unprotected state */
+static void remus_connect_event(event_id_t id, char mode, void *private)
+{
+       int socket_errno;
+       socklen_t socket_errno_size;
+       struct tdremus_state *s = (struct tdremus_state *)private;
+
+       /* check to se if the connect succeeded */
+       socket_errno_size = sizeof(socket_errno);
+       if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, 
&socket_errno_size)) {
+               RPRINTF("error getting socket errno\n");
+               return;
+       }
+
+       RPRINTF("socket connect returned %d\n", socket_errno);
+
+       if(socket_errno)
+       {
+               /* the connect did not succeed */
+
+               if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH 
|| socket_errno == ETIMEDOUT
+                  || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
+               {
+                       /* we can probably assume that the backup is down. just 
try again later */
+                       tapdisk_server_unregister_event(s->stream_fd.id);
+                       if((id = 
tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, 
REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
+                               RPRINTF("error registering timeout client 
connection event handler: %s\n", strerror(id));
+                               return;
+                       }
+                       s->stream_fd.id = id;
+               }
+               else
+               {
+                       RPRINTF("socket connect returned %d, giving up\n", 
socket_errno);
+               }
+       }
+       else
+       {
+               /* the connect succeeded */
+
+               /* unregister this function and register a new event handler */
+               tapdisk_server_unregister_event(s->stream_fd.id);
+               if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
+                       RPRINTF("error registering client event handler: %s\n", 
strerror(id));
+                       return;
+               }
+               s->stream_fd.id = id;
+
+               /* switch from unprotected to protected client */
+               switch_mode(s->tdremus_driver, mode_primary);
+       }
+}
+
+
+/* we install this event handler on the primary once we have connected to the 
backup */
+/* wait for "done" message to commit checkpoint */
+static void remus_client_event(event_id_t id, char mode, void *private)
+{
+       struct tdremus_state *s = (struct tdremus_state *)private;
+       char req[5];
+       int rc;
+
+       if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
+               /* replication stream closed or otherwise broken (timeout, 
reset, &c) */
+               RPRINTF("error reading from backup\n");
+               close_stream_fd(s);
+               return;
+       }
+
+       req[4] = '\0';
+
+       if (!strcmp(req, TDREMUS_DONE))
+               /* checkpoint committed, inform msg_fd */
+               ctl_respond(s, TDREMUS_DONE);
+       else {
+               RPRINTF("received unknown message: %s\n", req);
+               close_stream_fd(s);
+       }
+
+       return;
+}
+
+/* backup functions */
+static void remus_server_event(event_id_t id, char mode, void *private);
+
+/* returns the socket that receives write requests */
+static void remus_server_accept(event_id_t id, char mode, void* private)
+{
+       struct tdremus_state* s = (struct tdremus_state *) private;
+
+       int stream_fd;
+       event_id_t cid;
+
+       /* XXX: add address-based black/white list */
+       if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
+               RPRINTF("error accepting connection: %d\n", errno);
+               return;
+       }
+
+       /* TODO: check to see if we are already replicating. if so just close 
the
+        * connection (or do something smarter) */
+       RPRINTF("server accepted connection\n");
+
+       /* add tapdisk event for replication stream */
+       cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 
0,
+                                           remus_server_event, s);
+
+       if(cid < 0) {
+               RPRINTF("error registering connection event handler: %s\n", 
strerror(errno));
+               close(stream_fd);
+               return;
+       }
+
+       /* store replication file descriptor */
+       s->stream_fd.fd = stream_fd;
+       s->stream_fd.id = cid;
+}
+
+/* returns -2 if EADDRNOTAVAIL */
+static int remus_bind(struct tdremus_state* s)
+{
+//  struct sockaddr_in sa;
+       int opt;
+       int rc = -1;
+
+       if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               RPRINTF("could not create server socket: %d\n", errno);
+               return rc;
+       }
+       opt = 1;
+       if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, 
sizeof(opt)) < 0)
+               RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, 
errno);
+
+       if (bind(s->server_fd.fd, &s->sa, sizeof(s->sa)) < 0) {
+               RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", 
s->server_fd.fd,
+                       inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), 
errno, strerror(errno));
+               if (errno != EADDRINUSE)
+                       rc = -2;
+               goto err_sfd;
+       }
+       if (listen(s->server_fd.fd, 10)) {
+               RPRINTF("could not listen on socket: %d\n", errno);
+               goto err_sfd;
+       }
+
+       /* The socket s now bound to the address and listening so we may now 
register
+   * the fd with tapdisk */
+
+       if((s->server_fd.id = 
tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
+                                                           s->server_fd.fd, 0,
+                                                           
remus_server_accept, s)) < 0) {
+               RPRINTF("error registering server connection event handler: %s",
+                       strerror(s->server_fd.id));
+               goto err_sfd;
+       }
+
+       return 0;
+
+ err_sfd:
+       close(s->server_fd.fd);
+       s->server_fd.fd = -1;
+
+       return rc;
+}
+
+/* wait for latest checkpoint to be applied */
+static inline int server_writes_inflight(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       if (!s->ramdisk.inflight && !s->ramdisk.prev)
+               return 0;
+
+       return 1;
+}
+
+/* Due to block device prefetching this code may be called on the server side
+ * during normal replication. In this case we must return EBUSY, otherwise the
+ * domain may be started with stale data.
+ */
+void backup_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       if(!remus_image)
+               remus_image = treq.image;
+
+#if 0
+       /* due to prefetching, we must return EBUSY on server reads. This
+        * maintains a consistent disk image */
+       td_complete_request(treq, -EBUSY);
+#else
+       /* what exactly is the race that requires the response above? */
+       td_forward_request(treq);
+#endif
+}
+
+/* see above */
+void backup_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       /* on a server write, we know the domain has failed over. we must 
change our
+        * state to unprotected and then have the unprotected queue_write 
function
+        * handle the write
+        */
+
+       switch_mode(driver, mode_unprotected);
+       /* TODO: call the appropriate write function rather than return EBUSY */
+       td_complete_request(treq, -EBUSY);
+}
+
+static int backup_start(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int fd;
+
+       if (ramdisk_start(driver) < 0)
+               return -1;
+
+       tapdisk_remus.td_queue_read = backup_queue_read;
+       tapdisk_remus.td_queue_write = backup_queue_write;
+       /* TODO set flush function */
+       return 0;
+}
+
+static int server_do_wreq(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       static tdremus_wire_t twreq;
+       char buf[4096];
+       int len, rc;
+
+       char header[sizeof(uint32_t) + sizeof(uint64_t)];
+       uint32_t *sectors = (uint32_t *) header;
+       uint64_t *sector =  (uint64_t *) &header[sizeof(uint32_t)];
+
+       // RPRINTF("received write request\n");
+
+       if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
+               goto err;
+
+       len = *sectors * driver->info.sector_size;
+
+       //RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", 
*sectors, len,
+       // *sector);
+
+       if (len > sizeof(buf)) {
+               /* freak out! */
+               RPRINTF("write request too large: %d/%u\n", len, 
(unsigned)sizeof(buf));
+               return -1;
+       }
+
+       if (mread(s->stream_fd.fd, buf, len) < 0)
+               goto err;
+
+       if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
+               goto err;
+
+       return 0;
+
+ err:
+       /* should start failover */
+       RPRINTF("backup write request error\n");
+       close_stream_fd(s);
+
+       return -1;
+}
+
+static int server_do_sreq(td_driver_t *driver)
+{
+       /*
+         RPRINTF("submit request received\n");
+  */
+
+       return 0;
+}
+
+/* at this point, the server can start applying the most recent
+ * ramdisk. */
+static int server_do_creq(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       // RPRINTF("committing buffer\n");
+
+       ramdisk_start_flush(driver);
+
+       /* XXX this message should not be sent until flush completes! */
+       if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
+               return -1;
+
+       return 0;
+}
+
+
+/* called when data is pending in s->rfd */
+static void remus_server_event(event_id_t id, char mode, void *private)
+{
+       struct tdremus_state *s = (struct tdremus_state *)private;
+       td_driver_t *driver = s->tdremus_driver;
+       char req[5];
+
+       // RPRINTF("replication data waiting\n");
+
+       /* TODO: add a get_connection_by_event_id() function.
+        * for now we can assume that the fd is s->stream_fd */
+
+       if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
+               RPRINTF("error reading server event, activating backup\n");
+               switch_mode(driver, mode_unprotected);
+               return;
+       }
+
+       req[4] = '\0';
+
+       if (!strcmp(req, TDREMUS_WRITE))
+               server_do_wreq(driver);
+       else if (!strcmp(req, TDREMUS_SUBMIT))
+               server_do_sreq(driver);
+       else if (!strcmp(req, TDREMUS_COMMIT))
+               server_do_creq(driver);
+       else
+               RPRINTF("unknown request received: %s\n", req);
+
+       return;
+
+}
+
+/* unprotected */
+
+void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       /* wait for previous ramdisk to flush  before servicing reads */
+       if (server_writes_inflight(driver)) {
+               /* for now lets just return EBUSY. if this becomes an issue we 
can
+                * do something smarter */
+               td_complete_request(treq, -EBUSY);
+       }
+       else {
+               /* here we just pass reads through */
+               td_forward_request(treq);
+       }
+}
+
+/* For a recoverable remus solution we need to log unprotected writes here */
+void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       /* wait for previous ramdisk to flush */
+       if (server_writes_inflight(driver)) {
+               RPRINTF("queue_write: waiting for queue to drain");
+               td_complete_request(treq, -EBUSY);
+       }
+       else {
+               // RPRINTF("servicing write request on backup\n");
+               td_forward_request(treq);
+       }
+}
+
+static int unprotected_start(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       RPRINTF("failure detected, activating passthrough\n");
+
+       /* close the server socket */
+       close_stream_fd(s);
+
+       /* unregister the replication stream */
+       tapdisk_server_unregister_event(s->server_fd.id);
+
+       /* close the replication stream */
+       close(s->server_fd.fd);
+       s->server_fd.fd = -1;
+
+       /* install the unprotected read/write handlers */
+       tapdisk_remus.td_queue_read = unprotected_queue_read;
+       tapdisk_remus.td_queue_write = unprotected_queue_write;
+
+       return 0;
+}
+
+
+/* control */
+
+static inline int resolve_address(const char* addr, struct in_addr* ia)
+{
+       struct hostent* he;
+       uint32_t ip;
+
+       if (!(he = gethostbyname(addr))) {
+               RPRINTF("error resolving %s: %d\n", addr, h_errno);
+               return -1;
+       }
+
+       if (!he->h_addr_list[0]) {
+               RPRINTF("no address found for %s\n", addr);
+               return -1;
+       }
+
+       /* network byte order */
+       ip = *((uint32_t**)he->h_addr_list)[0];
+       ia->s_addr = ip;
+
+       return 0;
+}
+
+static int get_args(td_driver_t *driver, const char* name)
+{
+       struct tdremus_state *state = (struct tdremus_state *)driver->data;
+       char* host;
+       char* port;
+//  char* driver_str;
+//  char* parent;
+//  int type;
+//  char* path;
+//  unsigned long ulport;
+//  int i;
+//  struct sockaddr_in server_addr_in;
+
+       int gai_status;
+       int valid_addr;
+       struct addrinfo gai_hints;
+       struct addrinfo *servinfo, *servinfo_itr;
+
+       memset(&gai_hints, 0, sizeof gai_hints);
+       gai_hints.ai_family = AF_UNSPEC;
+       gai_hints.ai_socktype = SOCK_STREAM;
+
+       port = strchr(name, ':');
+       if (!port) {
+               RPRINTF("missing host in %s\n", name);
+               return -ENOENT;
+       }
+       if (!(host = strndup(name, port - name))) {
+               RPRINTF("unable to allocate host\n");
+               return -ENOMEM;
+       }
+       port++;
+
+       if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) 
{
+               RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
+               return -ENOENT;
+       }
+
+       /* TODO: do something smarter here */
+       valid_addr = 0;
+       for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = 
servinfo_itr->ai_next) {
+               void *addr;
+               char *ipver;
+
+               if (servinfo_itr->ai_family == AF_INET) {
+                       valid_addr = 1;
+                       memset(&state->sa, 0, sizeof(state->sa));
+                       state->sa = *(struct sockaddr_in 
*)servinfo_itr->ai_addr;
+                       break;
+               }
+       }
+       freeaddrinfo(servinfo);
+
+       if (!valid_addr)
+               return -ENOENT;
+
+       RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), 
ntohs(state->sa.sin_port));
+
+       return 0;
+}
+
+static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int rc;
+
+       if (mode == s->mode)
+               return 0;
+
+       if (s->queue_flush)
+               if ((rc = s->queue_flush(driver)) < 0) {
+                       // fall back to unprotected mode on error
+                       RPRINTF("switch_mode: error flushing queue (old: %d, 
new: %d)", s->mode, mode);
+                       mode = mode_unprotected;
+               }
+
+       if (mode == mode_unprotected)
+               rc = unprotected_start(driver);
+       else if (mode == mode_primary)
+               rc = primary_start(driver);
+       else if (mode == mode_backup)
+               rc = backup_start(driver);
+       else {
+               RPRINTF("unknown mode requested: %d\n", mode);
+               rc = -1;
+       }
+
+       if (!rc)
+               s->mode = mode;
+
+       return rc;
+}
+
+static void ctl_request(event_id_t id, char mode, void *private)
+{
+       struct tdremus_state *s = (struct tdremus_state *)private;
+       td_driver_t *driver = s->tdremus_driver;
+       char msg[80];
+       int rc;
+
+       // RPRINTF("data waiting on control fifo\n");
+
+       if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
+               RPRINTF("0-byte read received, reopening FIFO\n");
+               /*TODO: we may have to unregister/re-register with 
tapdisk_server */
+               close(s->ctl_fd.fd);
+               RPRINTF("FIFO closed\n");
+               if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+                       RPRINTF("error reopening FIFO: %d\n", errno);
+               }
+               return;
+       }
+
+       if (rc < 0) {
+               RPRINTF("error reading from FIFO: %d\n", errno);
+               return;
+       }
+
+       /* TODO: need to get driver somehow */
+       msg[rc] = '\0';
+       if (!strncmp(msg, "flush", 5)) {
+               if (s->queue_flush)
+                       if ((rc = s->queue_flush(driver))) {
+                               RPRINTF("error passing flush request to 
backup");
+                               ctl_respond(s, TDREMUS_FAIL);
+                       }
+       } else {
+               RPRINTF("unknown command: %s\n", msg);
+       }
+}
+
+static int ctl_respond(struct tdremus_state *s, const char *response)
+{
+       int rc;
+
+       if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
+               RPRINTF("error writing notification: %d\n", errno);
+               close(s->msg_fd.fd);
+               if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
+                       RPRINTF("error reopening FIFO: %d\n", errno);
+       }
+
+       return rc;
+}
+
+/* must be called after the underlying driver has been initialized */
+static int ctl_open(td_driver_t *driver, const char* name)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int i, l;
+
+       /* first we must ensure that BLKTAP_CTRL_DIR exists */
+       if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
+       {
+               DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, 
errno);
+               return -1;
+       }
+
+       /* use the device name to create the control fifo path */
+       if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
+               return -1;
+       /* scrub fifo pathname  */
+       for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; 
i++) {
+               if (strchr(":/", s->ctl_path[i]))
+                       s->ctl_path[i] = '_';
+       }
+       if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
+               goto err_ctlfifo;
+
+       if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+               RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, 
errno);
+               goto err_msgfifo;
+       }
+
+       if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
+               RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, 
errno);
+               goto err_msgfifo;
+       }
+
+       /* RDWR so that fd doesn't block select when no writer is present */
+       if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
+               RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, 
errno);
+               goto err_msgfifo;
+       }
+
+       if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
+               RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, 
errno);
+               goto err_openctlfifo;
+       }
+
+       RPRINTF("control FIFO %s\n", s->ctl_path);
+       RPRINTF("message FIFO %s\n", s->msg_path);
+
+       return 0;
+
+ err_openctlfifo:
+       close(s->ctl_fd.fd);
+ err_msgfifo:
+       free(s->msg_path);
+       s->msg_path = NULL;
+ err_ctlfifo:
+       free(s->ctl_path);
+       s->ctl_path = NULL;
+       return -1;
+}
+
+static void ctl_close(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+
+       /* TODO: close *all* connections */
+
+       if(s->ctl_fd.fd)
+               close(s->ctl_fd.fd);
+
+       if (s->ctl_path) {
+               unlink(s->ctl_path);
+               free(s->ctl_path);
+               s->ctl_path = NULL;
+       }
+       if (s->msg_path) {
+               unlink(s->msg_path);
+               free(s->msg_path);
+               s->msg_path = NULL;
+       }
+}
+
+static int ctl_register(struct tdremus_state *s)
+{
+       RPRINTF("registering ctl fifo\n");
+
+       /* register ctl fd */
+       s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, 
s->ctl_fd.fd, 0, ctl_request, s);
+
+       if (s->ctl_fd.id < 0) {
+               RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, 
s->ctl_fd.id);
+               return -1;
+       }
+
+       return 0;
+}
+
+/* interface */
+
+static int tdremus_open(td_driver_t *driver, const char *name,
+                       td_flag_t flags)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int rc;
+
+       RPRINTF("opening %s\n", name);
+
+       /* first we need to get the underlying vbd for this driver stack. To do 
so we
+        * need to know the vbd's id. Fortunately, for tapdisk2 this is 
hard-coded as
+        * 0 (see tapdisk2.c)
+        */
+       device_vbd = tapdisk_server_get_vbd(0);
+
+       memset(s, 0, sizeof(*s));
+       s->server_fd.fd = -1;
+       s->stream_fd.fd = -1;
+       s->ctl_fd.fd = -1;
+       s->msg_fd.fd = -1;
+
+       /* TODO: this is only needed so that the server can send writes down
+        * the driver stack from the stream_fd event handler */
+       s->tdremus_driver = driver;
+
+       /* parse name to get info etc */
+       if ((rc = get_args(driver, name)))
+               return rc;
+
+       if ((rc = ctl_open(driver, name))) {
+               RPRINTF("error setting up control channel\n");
+               free(s->driver_data);
+               return rc;
+       }
+
+       if ((rc = ctl_register(s))) {
+               RPRINTF("error registering control channel\n");
+               free(s->driver_data);
+               return rc;
+       }
+
+       if (!(rc = remus_bind(s)))
+               rc = switch_mode(driver, mode_backup);
+       else if (rc == -2)
+               rc = switch_mode(driver, mode_primary);
+
+       if (!rc)
+               return 0;
+
+       tdremus_close(driver);
+       return -EIO;
+}
+
+static int tdremus_close(td_driver_t *driver)
+{
+       struct tdremus_state *s = (struct tdremus_state *)driver->data;
+       int rc;
+
+       RPRINTF("closing\n");
+
+       if (s->driver_data) {
+               free(s->driver_data);
+               s->driver_data = NULL;
+       }
+       if (s->server_fd.fd >= 0) {
+               close(s->server_fd.fd);
+               s->server_fd.fd = -1;
+       }
+       if (s->stream_fd.fd >= 0)
+               close_stream_fd(s);
+
+       ctl_close(driver);
+
+       return rc;
+}
+
+static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
+{
+       /* we shouldn't have a parent... for now */
+       return -EINVAL;
+}
+
+static int tdremus_validate_parent(td_driver_t *driver,
+                                  td_driver_t *pdriver, td_flag_t flags)
+{
+       return 0;
+}
+
+struct tap_disk tapdisk_remus = {
+       .disk_type          = "tapdisk_remus",
+       .private_data_size  = sizeof(struct tdremus_state),
+       .td_open            = tdremus_open,
+       .td_queue_read      = unprotected_queue_read,
+       .td_queue_write     = unprotected_queue_write,
+       .td_close           = tdremus_close,
+       .td_get_parent_id   = tdremus_get_parent_id,
+       .td_validate_parent = tdremus_validate_parent,
+       .td_debug           = NULL,
+};
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/disktypes.h
--- a/tools/blktap2/drivers/disktypes.h Mon Nov 09 19:41:16 2009 +0000
+++ b/tools/blktap2/drivers/disktypes.h Mon Nov 09 19:45:06 2009 +0000
@@ -49,6 +49,7 @@ extern struct tap_disk tapdisk_ram;
  extern struct tap_disk tapdisk_qcow; 
 extern struct tap_disk tapdisk_block_cache;
 extern struct tap_disk tapdisk_log;
+extern struct tap_disk tapdisk_remus;
 
 #define MAX_DISK_TYPES        20
 
@@ -61,6 +62,7 @@ extern struct tap_disk tapdisk_log;
 #define DISK_TYPE_QCOW        6
 #define DISK_TYPE_BLOCK_CACHE 7
 #define DISK_TYPE_LOG         9
+#define DISK_TYPE_REMUS       10
 
 /*Define Individual Disk Parameters here */
 static disk_info_t null_disk = {
@@ -167,6 +169,16 @@ static disk_info_t log_disk = {
 #endif
 };
 
+static disk_info_t remus_disk = {
+       DISK_TYPE_REMUS,
+       "remus disk replicator (remus)",
+       "remus",
+       0,
+#ifdef TAPDISK
+       &tapdisk_remus,
+#endif
+};
+
 /*Main disk info array */
 static disk_info_t *dtypes[] = {
        &aio_disk,
@@ -179,6 +191,7 @@ static disk_info_t *dtypes[] = {
        &block_cache_disk,
        &null_disk,
        &log_disk,
+       &remus_disk,
 };
 
 #endif
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/hashtable.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/hashtable.c Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,274 @@
+/* Copyright (C) 2004 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <math.h>
+
+/*
+Credit for primes table: Aaron Krowne
+ http://br.endernet.org/~akrowne/
+ http://planetmath.org/encyclopedia/GoodHashTablePrimes.html
+*/
+static const unsigned int primes[] = {
+       53, 97, 193, 389,
+       769, 1543, 3079, 6151,
+       12289, 24593, 49157, 98317,
+       196613, 393241, 786433, 1572869,
+       3145739, 6291469, 12582917, 25165843,
+       50331653, 100663319, 201326611, 402653189,
+       805306457, 1610612741
+};
+const unsigned int prime_table_length = sizeof(primes)/sizeof(primes[0]);
+const float max_load_factor = 0.65;
+
+/*****************************************************************************/
+struct hashtable *
+create_hashtable(unsigned int minsize,
+                 unsigned int (*hashf) (void*),
+                 int (*eqf) (void*,void*))
+{
+       struct hashtable *h;
+       unsigned int pindex, size = primes[0];
+       /* Check requested hashtable isn't too large */
+       if (minsize > (1u << 30)) return NULL;
+       /* Enforce size as prime */
+       for (pindex=0; pindex < prime_table_length; pindex++) {
+               if (primes[pindex] > minsize) { size = primes[pindex]; break; }
+       }
+       h = (struct hashtable *)malloc(sizeof(struct hashtable));
+       if (NULL == h) return NULL; /*oom*/
+       h->table = (struct entry **)malloc(sizeof(struct entry*) * size);
+       if (NULL == h->table) { free(h); return NULL; } /*oom*/
+       memset(h->table, 0, size * sizeof(struct entry *));
+       h->tablelength  = size;
+       h->primeindex   = pindex;
+       h->entrycount   = 0;
+       h->hashfn       = hashf;
+       h->eqfn         = eqf;
+       h->loadlimit    = (unsigned int) ceil(size * max_load_factor);
+       return h;
+}
+
+/*****************************************************************************/
+unsigned int
+hash(struct hashtable *h, void *k)
+{
+       /* Aim to protect against poor hash functions by adding logic here
+        * - logic taken from java 1.4 hashtable source */
+       unsigned int i = h->hashfn(k);
+       i += ~(i << 9);
+       i ^=  ((i >> 14) | (i << 18)); /* >>> */
+       i +=  (i << 4);
+       i ^=  ((i >> 10) | (i << 22)); /* >>> */
+       return i;
+}
+
+/*****************************************************************************/
+static int
+hashtable_expand(struct hashtable *h)
+{
+       /* Double the size of the table to accomodate more entries */
+       struct entry **newtable;
+       struct entry *e;
+       struct entry **pE;
+       unsigned int newsize, i, index;
+       /* Check we're not hitting max capacity */
+       if (h->primeindex == (prime_table_length - 1)) return 0;
+       newsize = primes[++(h->primeindex)];
+
+       newtable = (struct entry **)malloc(sizeof(struct entry*) * newsize);
+       if (NULL != newtable)
+       {
+               memset(newtable, 0, newsize * sizeof(struct entry *));
+               /* This algorithm is not 'stable'. ie. it reverses the list
+                * when it transfers entries between the tables */
+               for (i = 0; i < h->tablelength; i++) {
+                       while (NULL != (e = h->table[i])) {
+                               h->table[i] = e->next;
+                               index = indexFor(newsize,e->h);
+                               e->next = newtable[index];
+                               newtable[index] = e;
+                       }
+               }
+               free(h->table);
+               h->table = newtable;
+       }
+       /* Plan B: realloc instead */
+       else
+       {
+               newtable = (struct entry **)
+                       realloc(h->table, newsize * sizeof(struct entry *));
+               if (NULL == newtable) { (h->primeindex)--; return 0; }
+               h->table = newtable;
+               memset(newtable[h->tablelength], 0, newsize - h->tablelength);
+               for (i = 0; i < h->tablelength; i++) {
+                       for (pE = &(newtable[i]), e = *pE; e != NULL; e = *pE) {
+                               index = indexFor(newsize,e->h);
+                               if (index == i)
+                               {
+                                       pE = &(e->next);
+                               }
+                               else
+                               {
+                                       *pE = e->next;
+                                       e->next = newtable[index];
+                                       newtable[index] = e;
+                               }
+                       }
+               }
+       }
+       h->tablelength = newsize;
+       h->loadlimit   = (unsigned int) ceil(newsize * max_load_factor);
+       return -1;
+}
+
+/*****************************************************************************/
+unsigned int
+hashtable_count(struct hashtable *h)
+{
+       return h->entrycount;
+}
+
+/*****************************************************************************/
+int
+hashtable_insert(struct hashtable *h, void *k, void *v)
+{
+       /* This method allows duplicate keys - but they shouldn't be used */
+       unsigned int index;
+       struct entry *e;
+       if (++(h->entrycount) > h->loadlimit)
+       {
+               /* Ignore the return value. If expand fails, we should
+                * still try cramming just this value into the existing table
+                * -- we may not have memory for a larger table, but one more
+                * element may be ok. Next time we insert, we'll try expanding 
again.*/
+               hashtable_expand(h);
+       }
+       e = (struct entry *)malloc(sizeof(struct entry));
+       if (NULL == e) { --(h->entrycount); return 0; } /*oom*/
+       e->h = hash(h,k);
+       index = indexFor(h->tablelength,e->h);
+       e->k = k;
+       e->v = v;
+       e->next = h->table[index];
+       h->table[index] = e;
+       return -1;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_search(struct hashtable *h, void *k)
+{
+       struct entry *e;
+       unsigned int hashvalue, index;
+       hashvalue = hash(h,k);
+       index = indexFor(h->tablelength,hashvalue);
+       e = h->table[index];
+       while (NULL != e)
+       {
+               /* Check hash value to short circuit heavier comparison */
+               if ((hashvalue == e->h) && (h->eqfn(k, e->k))) return e->v;
+               e = e->next;
+       }
+       return NULL;
+}
+
+/*****************************************************************************/
+void * /* returns value associated with key */
+hashtable_remove(struct hashtable *h, void *k)
+{
+       /* TODO: consider compacting the table when the load factor drops 
enough,
+        *       or provide a 'compact' method. */
+
+       struct entry *e;
+       struct entry **pE;
+       void *v;
+       unsigned int hashvalue, index;
+
+       hashvalue = hash(h,k);
+       index = indexFor(h->tablelength,hash(h,k));
+       pE = &(h->table[index]);
+       e = *pE;
+       while (NULL != e)
+       {
+               /* Check hash value to short circuit heavier comparison */
+               if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+               {
+                       *pE = e->next;
+                       h->entrycount--;
+                       v = e->v;
+                       freekey(e->k);
+                       free(e);
+                       return v;
+               }
+               pE = &(e->next);
+               e = e->next;
+       }
+       return NULL;
+}
+
+/*****************************************************************************/
+/* destroy */
+void
+hashtable_destroy(struct hashtable *h, int free_values)
+{
+       unsigned int i;
+       struct entry *e, *f;
+       struct entry **table = h->table;
+       if (free_values)
+       {
+               for (i = 0; i < h->tablelength; i++)
+               {
+                       e = table[i];
+                       while (NULL != e)
+                       { f = e; e = e->next; freekey(f->k); free(f->v); 
free(f); }
+               }
+       }
+       else
+       {
+               for (i = 0; i < h->tablelength; i++)
+               {
+                       e = table[i];
+                       while (NULL != e)
+                       { f = e; e = e->next; freekey(f->k); free(f); }
+               }
+       }
+       free(h->table);
+       free(h);
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/hashtable_itr.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/hashtable_itr.c     Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,188 @@
+/* Copyright (C) 2002, 2004 Christopher Clark  
<firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_itr.h"
+#include <stdlib.h> /* defines NULL */
+
+/*****************************************************************************/
+/* hashtable_iterator    - iterator constructor */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h)
+{
+       unsigned int i, tablelength;
+       struct hashtable_itr *itr = (struct hashtable_itr *)
+               malloc(sizeof(struct hashtable_itr));
+       if (NULL == itr) return NULL;
+       itr->h = h;
+       itr->e = NULL;
+       itr->parent = NULL;
+       tablelength = h->tablelength;
+       itr->index = tablelength;
+       if (0 == h->entrycount) return itr;
+
+       for (i = 0; i < tablelength; i++)
+       {
+               if (NULL != h->table[i])
+               {
+                       itr->e = h->table[i];
+                       itr->index = i;
+                       break;
+               }
+       }
+       return itr;
+}
+
+/*****************************************************************************/
+/* key      - return the key of the (key,value) pair at the current position */
+/* value    - return the value of the (key,value) pair at the current position 
*/
+
+void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{ return i->e->k; }
+
+void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{ return i->e->v; }
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr)
+{
+       unsigned int j,tablelength;
+       struct entry **table;
+       struct entry *next;
+       if (NULL == itr->e) return 0; /* stupidity check */
+
+       next = itr->e->next;
+       if (NULL != next)
+       {
+               itr->parent = itr->e;
+               itr->e = next;
+               return -1;
+       }
+       tablelength = itr->h->tablelength;
+       itr->parent = NULL;
+       if (tablelength <= (j = ++(itr->index)))
+       {
+               itr->e = NULL;
+               return 0;
+       }
+       table = itr->h->table;
+       while (NULL == (next = table[j]))
+       {
+               if (++j >= tablelength)
+               {
+                       itr->index = tablelength;
+                       itr->e = NULL;
+                       return 0;
+               }
+       }
+       itr->index = j;
+       itr->e = next;
+       return -1;
+}
+
+/*****************************************************************************/
+/* remove - remove the entry at the current iterator position
+ *          and advance the iterator, if there is a successive
+ *          element.
+ *          If you want the value, read it before you remove:
+ *          beware memory leaks if you don't.
+ *          Returns zero if end of iteration. */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr)
+{
+       struct entry *remember_e, *remember_parent;
+       int ret;
+
+       /* Do the removal */
+       if (NULL == (itr->parent))
+       {
+               /* element is head of a chain */
+               itr->h->table[itr->index] = itr->e->next;
+       } else {
+               /* element is mid-chain */
+               itr->parent->next = itr->e->next;
+       }
+       /* itr->e is now outside the hashtable */
+       remember_e = itr->e;
+       itr->h->entrycount--;
+       freekey(remember_e->k);
+
+       /* Advance the iterator, correcting the parent */
+       remember_parent = itr->parent;
+       ret = hashtable_iterator_advance(itr);
+       if (itr->parent == remember_e) { itr->parent = remember_parent; }
+       free(remember_e);
+       return ret;
+}
+
+/*****************************************************************************/
+int /* returns zero if not found */
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k)
+{
+       struct entry *e, *parent;
+       unsigned int hashvalue, index;
+
+       hashvalue = hash(h,k);
+       index = indexFor(h->tablelength,hashvalue);
+
+       e = h->table[index];
+       parent = NULL;
+       while (NULL != e)
+       {
+               /* Check hash value to short circuit heavier comparison */
+               if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+               {
+                       itr->index = index;
+                       itr->e = e;
+                       itr->parent = parent;
+                       itr->h = h;
+                       return -1;
+               }
+               parent = e;
+               e = e->next;
+       }
+       return 0;
+}
+
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/hashtable_itr.h
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/hashtable_itr.h     Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,112 @@
+/* Copyright (C) 2002, 2004 Christopher Clark 
<firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_ITR_CWC22__
+#define __HASHTABLE_ITR_CWC22__
+#include "hashtable.h"
+#include "hashtable_private.h" /* needed to enable inlining */
+
+/*****************************************************************************/
+/* This struct is only concrete here to allow the inlining of two of the
+ * accessor functions. */
+struct hashtable_itr
+{
+       struct hashtable *h;
+       struct entry *e;
+       struct entry *parent;
+       unsigned int index;
+};
+
+
+/*****************************************************************************/
+/* hashtable_iterator
+ */
+
+struct hashtable_itr *
+hashtable_iterator(struct hashtable *h);
+
+/*****************************************************************************/
+/* hashtable_iterator_key
+ * - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_key(struct hashtable_itr *i)
+{
+       return i->e->k;
+}
+
+/*****************************************************************************/
+/* value - return the value of the (key,value) pair at the current position */
+
+extern inline void *
+hashtable_iterator_value(struct hashtable_itr *i)
+{
+       return i->e->v;
+}
+
+/*****************************************************************************/
+/* advance - advance the iterator to the next element
+ *           returns zero if advanced to end of table */
+
+int
+hashtable_iterator_advance(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* remove - remove current element and advance the iterator to the next element
+ *          NB: if you need the value to free it, read it before
+ *          removing. ie: beware memory leaks!
+ *          returns zero if advanced to end of table */
+
+int
+hashtable_iterator_remove(struct hashtable_itr *itr);
+
+/*****************************************************************************/
+/* search - overwrite the supplied iterator, to point to the entry
+ *          matching the supplied key.
+            h points to the hashtable to be searched.
+ *          returns zero if not found. */
+int
+hashtable_iterator_search(struct hashtable_itr *itr,
+                          struct hashtable *h, void *k);
+
+#define DEFINE_HASHTABLE_ITERATOR_SEARCH(fnname, keytype) \
+int fnname (struct hashtable_itr *i, struct hashtable *h, keytype *k) \
+{ \
+    return (hashtable_iterator_search(i,h,k)); \
+}
+
+
+
+#endif /* __HASHTABLE_ITR_CWC22__*/
+
+/*
+ * Copyright (c) 2002, 2004, Christopher Clark
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/hashtable_utility.c
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/hashtable_utility.c Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,71 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#include "hashtable.h"
+#include "hashtable_private.h"
+#include "hashtable_utility.h"
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+/*****************************************************************************/
+/* hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ *
+ *  */
+int
+hashtable_change(struct hashtable *h, void *k, void *v)
+{
+       struct entry *e;
+       unsigned int hashvalue, index;
+       hashvalue = hash(h,k);
+       index = indexFor(h->tablelength,hashvalue);
+       e = h->table[index];
+       while (NULL != e)
+       {
+               /* Check hash value to short circuit heavier comparison */
+               if ((hashvalue == e->h) && (h->eqfn(k, e->k)))
+               {
+                       free(e->v);
+                       e->v = v;
+                       return -1;
+               }
+               e = e->next;
+       }
+       return 0;
+}
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
diff -r e084ee729798 -r 9479190566fd tools/blktap2/drivers/hashtable_utility.h
--- /dev/null   Thu Jan 01 00:00:00 1970 +0000
+++ b/tools/blktap2/drivers/hashtable_utility.h Mon Nov 09 19:45:06 2009 +0000
@@ -0,0 +1,55 @@
+/* Copyright (C) 2002 Christopher Clark <firstname.lastname@xxxxxxxxxxxx> */
+
+#ifndef __HASHTABLE_CWC22_UTILITY_H__
+#define __HASHTABLE_CWC22_UTILITY_H__
+
+/*****************************************************************************
+ * hashtable_change
+ *
+ * function to change the value associated with a key, where there already
+ * exists a value bound to the key in the hashtable.
+ * Source due to Holger Schemel.
+ *
+ * @name        hashtable_change
+ * @param   h   the hashtable
+ * @param       key
+ * @param       value
+ *
+ */
+int
+hashtable_change(struct hashtable *h, void *k, void *v);
+
+#endif /* __HASHTABLE_CWC22_H__ */
+
+/*
+ * Copyright (c) 2002, Christopher Clark
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
diff -r e084ee729798 -r 9479190566fd 
tools/python/xen/xend/server/BlktapController.py
--- a/tools/python/xen/xend/server/BlktapController.py  Mon Nov 09 19:41:16 
2009 +0000
+++ b/tools/python/xen/xend/server/BlktapController.py  Mon Nov 09 19:45:06 
2009 +0000
@@ -28,6 +28,7 @@ blktap2_disk_types = [
     'ram',
     'qcow',
     'vhd',
+    'remus',
     ]
 
 blktap_disk_types = blktap1_disk_types + blktap2_disk_types

_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxx
http://lists.xensource.com/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.