[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Xen-devel] [PATCH 15/17] tools: blktap2: move ramdisk related codes to block-replication.c
On Oct 13, 2014 10:13 PM, "Wen Congyang" <wency@xxxxxxxxxxxxxx> wrote:
>
> COLO will reuse them
>
> Signed-off-by: Wen Congyang <wency@xxxxxxxxxxxxxx>
> Cc: Shriram Rajagopalan <rshriram@xxxxxxxxx>
> ---
> Âtools/blktap2/drivers/block-remus.c   Â| 480 +-----------------------------
> Âtools/blktap2/drivers/block-replication.c | 460 ++++++++++++++++++++++++++++
> Âtools/blktap2/drivers/block-replication.h |Â 65 ++++
> Â3 files changed, 539 insertions(+), 466 deletions(-)
>
> diff --git a/tools/blktap2/drivers/block-remus.c b/tools/blktap2/drivers/block-remus.c
> index 09dc46f..c7b429c 100644
> --- a/tools/blktap2/drivers/block-remus.c
> +++ b/tools/blktap2/drivers/block-remus.c
> @@ -37,9 +37,6 @@
> Â#include "tapdisk-server.h"
> Â#include "tapdisk-driver.h"
> Â#include "tapdisk-interface.h"
> -#include "hashtable.h"
> -#include "hashtable_itr.h"
> -#include "hashtable_utility.h"
> Â#include "block-replication.h"
>
> Â#include <errno.h>
> @@ -58,7 +55,6 @@
>
> Â/* timeout for reads and writes in ms */
> Â#define HEARTBEAT_MS 1000
> -#define RAMDISK_HASHSIZE 128
>
> Â/* connect retry timeout (seconds) */
> Â#define REMUS_CONNRETRY_TIMEOUT 1
> @@ -97,51 +93,6 @@ td_vbd_t *device_vbd = NULL;
> Âtd_image_t *remus_image = NULL;
> Âstruct tap_disk tapdisk_remus;
>
> -struct ramdisk {
> -Â Â Â Âsize_t sector_size;
> -Â Â Â Âstruct hashtable* h;
> -Â Â Â Â/* when a ramdisk is flushed, h is given a new empty hash for writes
> -Â Â Â Â * while the old ramdisk (prev) is drained asynchronously.
> -Â Â Â Â */
> -Â Â Â Âstruct hashtable* prev;
> -Â Â Â Â/* count of outstanding requests to the base driver */
> -Â Â Â Âsize_t inflight;
> -Â Â Â Â/* prev holds the requests to be flushed, while inprogress holds
> -Â Â Â Â * requests being flushed. When requests complete, they are removed
> -Â Â Â Â * from inprogress.
> -Â Â Â Â * Whenever a new flush is merged with ongoing flush (i.e, prev),
> -Â Â Â Â * we have to make sure that none of the new requests overlap with
> -Â Â Â Â * ones in "inprogress". If it does, keep it back in prev and dont issue
> -Â Â Â Â * IO until the current one finishes. If we allow this IO to proceed,
> -Â Â Â Â * we might end up with two "overlapping" requests in the disk's queue and
> -Â Â Â Â * the disk may not offer any guarantee on which one is written first.
> -Â Â Â Â * IOW, make sure we dont create a write-after-write time ordering constraint.
> -Â Â Â Â *
> -Â Â Â Â */
> -Â Â Â Âstruct hashtable* inprogress;
> -};
> -
> -/* the ramdisk intercepts the original callback for reads and writes.
> - * This holds the original data. */
> -/* Might be worth making this a static array in struct ramdisk to avoid
> - * a malloc per request */
> -
> -struct tdremus_state;
> -
> -struct ramdisk_cbdata {
> -Â Â Â Âtd_callback_t cb;
> -Â Â Â Âvoid* private;
> -Â Â Â Âchar* buf;
> -Â Â Â Âstruct tdremus_state* state;
> -};
> -
> -struct ramdisk_write_cbdata {
> -Â Â Â Âstruct tdremus_state* state;
> -Â Â Â Âchar* buf;
> -};
> -
> -typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
> -
> Â/* poll_fd type for blktap2 fd system. taken from block_log.c */
> Âtypedef struct poll_fd {
>     int    fd;
> @@ -168,7 +119,7 @@ struct tdremus_state {
> Â Â Â Â Â*/
> Â Â Â Â struct req_ring queued_io;
>
> -Â Â Â Â/* ramdisk data*/
> +Â Â Â Â/* ramdisk data */
> Â Â Â Â struct ramdisk ramdisk;
>
> Â Â Â Â /* mode methods */
> @@ -239,404 +190,14 @@ static void ring_add_request(struct req_ring *ring, const td_request_t *treq)
> Â Â Â Â ring->prod = ring_next(ring->prod);
> Â}
>
> -/* Prototype declarations */
> -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
> -
> -/* functions to create and sumbit treq's */
> -
> -static void
> -replicated_write_callback(td_request_t treq, int err)
> -{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
> -Â Â Â Âtd_vbd_request_t *vreq;
> -Â Â Â Âint i;
> -Â Â Â Âuint64_t start;
> -Â Â Â Âvreq = (td_vbd_request_t *) treq.private;
> -
> -Â Â Â Â/* the write failed for now, lets panic. this is very bad */
> -Â Â Â Âif (err) {
> -Â Â Â Â Â Â Â ÂRPRINTF("ramdisk write failed, disk image is not consistent\n");
> -Â Â Â Â Â Â Â Âexit(-1);
> -Â Â Â Â}
> -
> -Â Â Â Â/* The write succeeded. let's pull the vreq off whatever request list
> -Â Â Â Â * it is on and free() it */
> -Â Â Â Âlist_del(&vreq->next);
> -Â Â Â Âfree(vreq);
> -
> -Â Â Â Âs->ramdisk.inflight--;
> -Â Â Â Âstart = treq.sec;
> -Â Â Â Âfor (i = 0; i < treq.secs; i++) {
> -Â Â Â Â Â Â Â Âhashtable_remove(s->ramdisk.inprogress, &start);
> -Â Â Â Â Â Â Â Âstart++;
> -Â Â Â Â}
> -Â Â Â Âfree(treq.buf);
> -
> -Â Â Â Âif (!s->ramdisk.inflight && !s->ramdisk.prev) {
> -Â Â Â Â Â Â Â Â/* TODO: the ramdisk has been flushed */
> -Â Â Â Â}
> -}
> -
> -static inline int
> -create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
> -{
> -Â Â Â Âtd_request_t treq;
> -Â Â Â Âtd_vbd_request_t *vreq;
> -
> -   Âtreq.op   = TD_OP_WRITE;
> -   Âtreq.buf  Â= buf;
> -   Âtreq.sec  Â= sec;
> -   Âtreq.secs  = secs;
> -   Âtreq.image Â= remus_image;
> -   Âtreq.cb   = replicated_write_callback;
> -Â Â Â Âtreq.cb_data = state;
> -   Âtreq.id   = 0;
> -   Âtreq.sidx  = 0;
> -
> -   Âvreq    Â= calloc(1, sizeof(td_vbd_request_t));
> -Â Â Â Âtreq.private = vreq;
> -
> -Â Â Â Âif(!vreq)
> -Â Â Â Â Â Â Â Âreturn -1;
> -
> -Â Â Â Âvreq->submitting = 1;
> -Â Â Â ÂINIT_LIST_HEAD(&vreq->next);
> -Â Â Â Âtapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
> -
> -Â Â Â Â/* TODO:
> -Â Â Â Â * we should probably leave it up to the caller to forward the request */
> -Â Â Â Âtd_forward_request(treq);
> -
> -Â Â Â Âvreq->submitting--;
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -
> -/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
> -static unsigned int uint64_hash(void* k)
> -{
> -Â Â Â Âuint64_t key = *(uint64_t*)k;
> -
> -Â Â Â Âkey = (~key) + (key << 18);
> -Â Â Â Âkey = key ^ (key >> 31);
> -Â Â Â Âkey = key * 21;
> -Â Â Â Âkey = key ^ (key >> 11);
> -Â Â Â Âkey = key + (key << 6);
> -Â Â Â Âkey = key ^ (key >> 22);
> -
> -Â Â Â Âreturn (unsigned int)key;
> -}
> -
> -static int rd_hash_equal(void* k1, void* k2)
> -{
> -Â Â Â Âuint64_t key1, key2;
> -
> -Â Â Â Âkey1 = *(uint64_t*)k1;
> -Â Â Â Âkey2 = *(uint64_t*)k2;
> -
> -Â Â Â Âreturn key1 == key2;
> -}
> -
> -static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
> -Â Â Â Â Â Â Â Â Â Â Â Âint nb_sectors, char* buf)
> -{
> -Â Â Â Âint i;
> -Â Â Â Âchar* v;
> -Â Â Â Âuint64_t key;
> -
> -Â Â Â Âfor (i = 0; i < nb_sectors; i++) {
> -Â Â Â Â Â Â Â Âkey = sector + i;
> -Â Â Â Â Â Â Â Â/* check whether it is queued in a previous flush request */
> -Â Â Â Â Â Â Â Âif (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) {
> -Â Â Â Â Â Â Â Â Â Â Â Â/* check whether it is an ongoing flush */
> -Â Â Â Â Â Â Â Â Â Â Â Âif (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key))))
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Âmemcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
> -Â Â Â Â}
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsize_t len)
> -{
> -Â Â Â Âchar* v;
> -Â Â Â Âuint64_t* key;
> -
> -Â Â Â Âif ((v = hashtable_search(h, §or))) {
> -Â Â Â Â Â Â Â Âmemcpy(v, buf, len);
> -Â Â Â Â Â Â Â Âreturn 0;
> -Â Â Â Â}
> -
> -Â Â Â Âif (!(v = malloc(len))) {
> -Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash: malloc failed\n");
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -Â Â Â Âmemcpy(v, buf, len);
> -Â Â Â Âif (!(key = malloc(sizeof(*key)))) {
> -Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash: error allocating key\n");
> -Â Â Â Â Â Â Â Âfree(v);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -Â Â Â Â*key = sector;
> -Â Â Â Âif (!hashtable_insert(h, key, v)) {
> -Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
> -Â Â Â Â Â Â Â Âfree(key);
> -Â Â Â Â Â Â Â Âfree(v);
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âint nb_sectors, char* buf)
> -{
> -Â Â Â Âint i, rc;
> -
> -Â Â Â Âfor (i = 0; i < nb_sectors; i++) {
> -Â Â Â Â Â Â Â Ârc = ramdisk_write_hash(ramdisk->h, sector + i,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âbuf + i * ramdisk->sector_size,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âramdisk->sector_size);
> -Â Â Â Â Â Â Â Âif (rc)
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn rc;
> -Â Â Â Â}
> -
> -Â Â Â Âreturn 0;
> -}
> -
> -static int uint64_compare(const void* k1, const void* k2)
> -{
> -Â Â Â Âuint64_t u1 = *(uint64_t*)k1;
> -Â Â Â Âuint64_t u2 = *(uint64_t*)k2;
> -
> -Â Â Â Â/* u1 - u2 is unsigned */
> -Â Â Â Âreturn u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
> -}
> -
> -/* set psectors to an array of the sector numbers in the hash, returning
> - * the number of entries (or -1 on error) */
> -static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
> -{
> -Â Â Â Âstruct hashtable_itr* itr;
> -Â Â Â Âuint64_t* sectors;
> -Â Â Â Âint count;
> -
> -Â Â Â Âif (!(count = hashtable_count(h)))
> -Â Â Â Â Â Â Â Âreturn 0;
> -
> -Â Â Â Âif (!(*psectors = malloc(count * sizeof(uint64_t)))) {
> -Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_get_sectors: error allocating sector map\n");
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -Â Â Â Âsectors = *psectors;
> -
> -Â Â Â Âitr = hashtable_iterator(h);
> -Â Â Â Âcount = 0;
> -Â Â Â Âdo {
> -Â Â Â Â Â Â Â Âsectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
> -Â Â Â Â} while (hashtable_iterator_advance(itr));
> -Â Â Â Âfree(itr);
> -
> -Â Â Â Âreturn count;
> -}
> -
> -/*
> -Â return -1 for OOM
> -Â return -2 for merge lookup failure
> -Â return -3 for WAW race
> -Â return 0 on success.
> -*/
> -static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
> -Â Â Â Â Â Â Â Â Â Â Â Âsize_t count, char **mergedbuf)
> -{
> -Â Â Â Âchar* buf;
> -Â Â Â Âchar* sector;
> -Â Â Â Âint i;
> -Â Â Â Âuint64_t *key;
> -Â Â Â Âint rc = 0;
> -
> -Â Â Â Âif (!(buf = valloc(count * ramdisk->sector_size))) {
> -Â Â Â Â Â Â Â ÂDPRINTF("merge_request: allocation failed\n");
> -Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â}
> -
> -Â Â Â Âfor (i = 0; i < count; i++) {
> -Â Â Â Â Â Â Â Âif (!(sector = hashtable_search(ramdisk->prev, &start))) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂDPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -2;
> -Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â}
> -
> -Â Â Â Â Â Â Â Â/* Check inprogress requests to avoid waw non-determinism */
> -Â Â Â Â Â Â Â Âif (hashtable_search(ramdisk->inprogress, &start)) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂDPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -3;
> -Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Â/* Insert req into inprogress (brief period of duplication of hash entries until
> -Â Â Â Â Â Â Â Â * they are removed from prev. Read tracking would not be reading wrong entries)
> -Â Â Â Â Â Â Â Â */
> -Â Â Â Â Â Â Â Âif (!(key = malloc(sizeof(*key)))) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂDPRINTF("%s: error allocating key\n", __FUNCTION__);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -1;
> -Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Â*key = start;
> -Â Â Â Â Â Â Â Âif (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂDPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n",
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â__FUNCTION__, start);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(key);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> -Â Â Â Â Â Â Â Â Â Â Â Ârc = -1;
> -Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Âmemcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
> -Â Â Â Â Â Â Â Âstart++;
> -Â Â Â Â}
> -
> -Â Â Â Â*mergedbuf = buf;
> -Â Â Â Âreturn 0;
> -fail:
> -Â Â Â Âfor (start--; i >0; i--, start--)
> -Â Â Â Â Â Â Â Âhashtable_remove(ramdisk->inprogress, &start);
> -Â Â Â Âreturn rc;
> -}
> -
> -/* The underlying driver may not handle having the whole ramdisk queued at
> - * once. We queue what we can and let the callbacks attempt to queue more. */
> -/* NOTE: may be called from callback, while dd->private still belongs to
> - * the underlying driver */
> -static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
> -{
> -Â Â Â Âuint64_t* sectors;
> -Â Â Â Âchar* buf = NULL;
> -Â Â Â Âuint64_t base, batchlen;
> -Â Â Â Âint i, j, count = 0;
> -
> -Â Â Â Â// RPRINTF("ramdisk flush\n");
> -
> -Â Â Â Âif ((count = ramdisk_get_sectors(s->ramdisk.prev, §ors)) <= 0)
> -Â Â Â Â Â Â Â Âreturn count;
> -
> -Â Â Â Â/* Create the inprogress table if empty */
> -Â Â Â Âif (!s->ramdisk.inprogress)
> -Â Â Â Â Â Â Â Âs->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âuint64_hash,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Ârd_hash_equal);
> -
> -Â Â Â Â/*
> -Â Â Â Â ÂRPRINTF("ramdisk: flushing %d sectors\n", count);
> -Â Â Â Â*/
> -
> -Â Â Â Â/* sort and merge sectors to improve disk performance */
> -Â Â Â Âqsort(sectors, count, sizeof(*sectors), uint64_compare);
> -
> -Â Â Â Âfor (i = 0; i < count;) {
> -Â Â Â Â Â Â Â Âbase = sectors[i++];
> -Â Â Â Â Â Â Â Âwhile (i < count && sectors[i] == sectors[i-1] + 1)
> -Â Â Â Â Â Â Â Â Â Â Â Âi++;
> -Â Â Â Â Â Â Â Âbatchlen = sectors[i-1] - base + 1;
> -
> -Â Â Â Â Â Â Â Âj = merge_requests(&s->ramdisk, base, batchlen, &buf);
> -
> -Â Â Â Â Â Â Â Âif (j) {
> -Â Â Â Â Â Â Â Â Â Â Â ÂRPRINTF("ramdisk_flush: merge_requests failed:%s\n",
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âj == -1? "OOM": (j==-2? "missing sector" : "WAW race"));
> -Â Â Â Â Â Â Â Â Â Â Â Âif (j == -3) continue;
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(sectors);
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> -Â Â Â Â Â Â Â Â}
> -
> -Â Â Â Â Â Â Â Â/* NOTE: create_write_request() creates a treq AND forwards it down
> -Â Â Â Â Â Â Â Â * the driver chain */
> -Â Â Â Â Â Â Â Â// RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
> -Â Â Â Â Â Â Â Âcreate_write_request(s, base, batchlen, buf);
> -Â Â Â Â Â Â Â Â//RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
> -
> -Â Â Â Â Â Â Â Âs->ramdisk.inflight++;
> -
> -Â Â Â Â Â Â Â Âfor (j = 0; j < batchlen; j++) {
> -Â Â Â Â Â Â Â Â Â Â Â Âbuf = hashtable_search(s->ramdisk.prev, &base);
> -Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> -Â Â Â Â Â Â Â Â Â Â Â Âhashtable_remove(s->ramdisk.prev, &base);
> -Â Â Â Â Â Â Â Â Â Â Â Âbase++;
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â}
> -
> -Â Â Â Âif (!hashtable_count(s->ramdisk.prev)) {
> -Â Â Â Â Â Â Â Â/* everything is in flight */
> -Â Â Â Â Â Â Â Âhashtable_destroy(s->ramdisk.prev, 0);
> -Â Â Â Â Â Â Â Âs->ramdisk.prev = NULL;
> -Â Â Â Â}
> -
> -Â Â Â Âfree(sectors);
> -
> -Â Â Â Â// RPRINTF("ramdisk flush done\n");
> -Â Â Â Âreturn 0;
> -}
> -
> -/* flush ramdisk contents to disk */
> -static int ramdisk_start_flush(td_driver_t *driver)
> -{
> -Â Â Â Âstruct tdremus_state *s = (struct tdremus_state *)driver->data;
> -Â Â Â Âuint64_t* key;
> -Â Â Â Âchar* buf;
> -Â Â Â Âint rc = 0;
> -Â Â Â Âint i, j, count, batchlen;
> -Â Â Â Âuint64_t* sectors;
> -
> -Â Â Â Âif (!hashtable_count(s->ramdisk.h)) {
> -Â Â Â Â Â Â Â Â/*
> -Â Â Â Â Â Â Â Â ÂRPRINTF("Nothing to flush\n");
> -Â Â Â Â Â Â Â Â*/
> -Â Â Â Â Â Â Â Âreturn 0;
> -Â Â Â Â}
> -
> -Â Â Â Âif (s->ramdisk.prev) {
> -Â Â Â Â Â Â Â Â/* a flush request issued while a previous flush is still in progress
> -Â Â Â Â Â Â Â Â * will merge with the previous request. If you want the previous
> -Â Â Â Â Â Â Â Â * request to be consistent, wait for it to complete. */
> -Â Â Â Â Â Â Â Âif ((count = ramdisk_get_sectors(s->ramdisk.h, §ors)) < 0)
> -Â Â Â Â Â Â Â Â Â Â Â Âreturn count;
> -
> -Â Â Â Â Â Â Â Âfor (i = 0; i < count; i++) {
> -Â Â Â Â Â Â Â Â Â Â Â Âbuf = hashtable_search(s->ramdisk.h, sectors + i);
> -Â Â Â Â Â Â Â Â Â Â Â Âramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â s->ramdisk.sector_size);
> -Â Â Â Â Â Â Â Â}
> -Â Â Â Â Â Â Â Âfree(sectors);
> -
> -Â Â Â Â Â Â Â Âhashtable_destroy (s->ramdisk.h, 1);
> -Â Â Â Â} else
> -Â Â Â Â Â Â Â Âs->ramdisk.prev = s->ramdisk.h;
> -
> -Â Â Â Â/* We create a new hashtable so that new writes can be performed before
> -Â Â Â Â * the old hashtable is completely drained. */
> -Â Â Â Âs->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Ârd_hash_equal);
> -
> -Â Â Â Âreturn ramdisk_flush(driver, s);
> -}
> -
> -
> Âstatic int ramdisk_start(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
>
> -Â Â Â Âif (s->ramdisk.h) {
> -Â Â Â Â Â Â Â ÂRPRINTF("ramdisk already allocated\n");
> -Â Â Â Â Â Â Â Âreturn 0;
> -Â Â Â Â}
> -
> Â Â Â Â s->ramdisk.sector_size = driver->info.sector_size;
> -Â Â Â Âs->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
> -Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Ârd_hash_equal);
> +Â Â Â Âs->ramdisk.log_prefix = "remus";
> +Â Â Â Âs->ramdisk.image = remus_image;
> +Â Â Â Âramdisk_init(&s->ramdisk);
>
> Â Â Â Â DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
>
> @@ -917,13 +478,9 @@ static int client_flush(td_driver_t *driver)
> Âstatic int server_flush(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
> -Â Â Â Â/*
> -Â Â Â Â * Nothing to flush in beginning.
> -Â Â Â Â */
> -Â Â Â Âif (!s->ramdisk.prev)
> -Â Â Â Â Â Â Â Âreturn 0;
> +
> Â Â Â Â /* Try to flush any remaining requests */
> -Â Â Â Âreturn ramdisk_flush(driver, s);
> +Â Â Â Âreturn ramdisk_flush_pended_requests(&s->ramdisk);
> Â}
>
> Â/* It is called when switching the mode from primary to unprotected */
> @@ -1030,10 +587,7 @@ static inline int server_writes_inflight(td_driver_t *driver)
> Â{
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
>
> -Â Â Â Âif (!s->ramdisk.inflight && !s->ramdisk.prev)
> -Â Â Â Â Â Â Â Âreturn 0;
> -
> -Â Â Â Âreturn 1;
> +Â Â Â Âreturn ramdisk_writes_inflight(&s->ramdisk);
> Â}
>
> Â/* Due to block device prefetching this code may be called on the server side
> @@ -1116,7 +670,9 @@ static void server_do_wreq(td_driver_t *driver)
> Â Â Â Â if (mread(s->stream_fd.fd, buf, len) < 0)
> Â Â Â Â Â Â Â Â goto err;
>
> -Â Â Â Âif (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0) {
> +Â Â Â Âif (ramdisk_cache_write_request(&s->ramdisk, *sector, *sectors,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âdriver->info.sector_size, buf,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â"remus") < 0) {
> Â Â Â Â Â Â Â Â rc = ERROR_INTERNAL;
> Â Â Â Â Â Â Â Â goto err;
> Â Â Â Â }
> @@ -1137,7 +693,7 @@ static void server_do_creq(td_driver_t *driver)
>
> Â Â Â Â // RPRINTF("committing buffer\n");
>
> -Â Â Â Âramdisk_start_flush(driver);
> +Â Â Â Âramdisk_start_flush(&s->ramdisk);
>
> Â Â Â Â /* XXX this message should not be sent until flush completes! */
> Â Â Â Â if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
> @@ -1184,12 +740,7 @@ void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
>
>     /* wait for previous ramdisk to flush before servicing reads */
> Â Â Â Â if (server_writes_inflight(driver)) {
> -Â Â Â Â Â Â Â Â/* for now lets just return EBUSY.
> -Â Â Â Â Â Â Â Â * if there are any left-over requests in prev,
> -Â Â Â Â Â Â Â Â * kick em again.
> -Â Â Â Â Â Â Â Â */
> -Â Â Â Â Â Â Â Âif(!s->ramdisk.inflight) /* nothing in inprogress */
> -Â Â Â Â Â Â Â Â Â Â Â Âramdisk_flush(driver, s);
> +Â Â Â Â Â Â Â Âramdisk_flush_pended_requests(&s->ramdisk);
>
> Â Â Â Â Â Â Â Â td_complete_request(treq, -EBUSY);
> Â Â Â Â }
> @@ -1207,8 +758,7 @@ void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
> Â Â Â Â /* wait for previous ramdisk to flush */
> Â Â Â Â if (server_writes_inflight(driver)) {
> Â Â Â Â Â Â Â Â RPRINTF("queue_write: waiting for queue to drain");
> -Â Â Â Â Â Â Â Âif(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
> -Â Â Â Â Â Â Â Â Â Â Â Âramdisk_flush(driver, s);
> +Â Â Â Â Â Â Â Âramdisk_flush_pended_requests(&s->ramdisk);
> Â Â Â Â Â Â Â Â td_complete_request(treq, -EBUSY);
> Â Â Â Â }
> Â Â Â Â else {
> @@ -1518,9 +1068,7 @@ static int tdremus_close(td_driver_t *driver)
> Â Â Â Â struct tdremus_state *s = (struct tdremus_state *)driver->data;
>
> Â Â Â Â RPRINTF("closing\n");
> -Â Â Â Âif (s->ramdisk.inprogress)
> -Â Â Â Â Â Â Â Âhashtable_destroy(s->ramdisk.inprogress, 0);
> -
> +Â Â Â Âramdisk_destroy(&s->ramdisk);
> Â Â Â Â td_replication_connect_kill(&s->t);
> Â Â Â Â ctl_unregister(s);
> Â Â Â Â ctl_close(s);
> diff --git a/tools/blktap2/drivers/block-replication.c b/tools/blktap2/drivers/block-replication.c
> index e4b2679..82d7609 100644
> --- a/tools/blktap2/drivers/block-replication.c
> +++ b/tools/blktap2/drivers/block-replication.c
> @@ -15,6 +15,10 @@
>
> Â#include "tapdisk-server.h"
> Â#include "block-replication.h"
> +#include "tapdisk-interface.h"
> +#include "hashtable.h"
> +#include "hashtable_itr.h"
> +#include "hashtable_utility.h"
>
> Â#include <string.h>
> Â#include <errno.h>
> @@ -30,6 +34,8 @@
> Â#define DPRINTF(_f, _a...) syslog (LOG_DEBUG, "%s: " _f, log_prefix, ## _a)
> Â#define EPRINTF(_f, _a...) syslog (LOG_ERR, "%s: " _f, log_prefix, ## _a)
>
> +#define RAMDISK_HASHSIZE 128
> +
> Â/* connection status */
> Âenum {
> Â Â Â Â connection_none,
> @@ -466,3 +472,457 @@ static void td_replication_connect_event(event_id_t id, char mode,
> Âfail:
> Â Â Â Â td_replication_client_failed(t, rc);
> Â}
> +
> +
> +/* I/O replication */
> +static void replicated_write_callback(td_request_t treq, int err)
> +{
> +Â Â Â Âramdisk_t *ramdisk = treq.cb_data;
> +Â Â Â Âtd_vbd_request_t *vreq = treq.private;
> +Â Â Â Âint i;
> +Â Â Â Âuint64_t start;
> +Â Â Â Âconst char *log_prefix = ramdisk->log_prefix;
> +
> +Â Â Â Â/* the write failed for now, lets panic. this is very bad */
> +Â Â Â Âif (err) {
> +Â Â Â Â Â Â Â ÂEPRINTF("ramdisk write failed, disk image is not consistent\n");
> +Â Â Â Â Â Â Â Âexit(-1);
> +Â Â Â Â}
> +
> +Â Â Â Â/*
> +Â Â Â Â * The write succeeded. let's pull the vreq off whatever request list
> +Â Â Â Â * it is on and free() it
> +Â Â Â Â */
> +Â Â Â Âlist_del(&vreq->next);
> +Â Â Â Âfree(vreq);
> +
> +Â Â Â Âramdisk->inflight--;
> +Â Â Â Âstart = treq.sec;
> +Â Â Â Âfor (i = 0; i < treq.secs; i++) {
> +Â Â Â Â Â Â Â Âhashtable_remove(ramdisk->inprogress, &start);
> +Â Â Â Â Â Â Â Âstart++;
> +Â Â Â Â}
> +Â Â Â Âfree(treq.buf);
> +
> +Â Â Â Âif (!ramdisk->inflight && ramdisk->prev)
> +Â Â Â Â Â Â Â Âramdisk_flush_pended_requests(ramdisk);
> +}
> +
> +static int
> +create_write_request(ramdisk_t *ramdisk, td_sector_t sec, int secs, char *buf)
> +{
> +Â Â Â Âtd_request_t treq;
> +Â Â Â Âtd_vbd_request_t *vreq;
> +Â Â Â Âtd_vbd_t *vbd = ramdisk->image->private;
> +
> +   Âtreq.op   = TD_OP_WRITE;
> +   Âtreq.buf  Â= buf;
> +   Âtreq.sec  Â= sec;
> +   Âtreq.secs  = secs;
> +   Âtreq.image Â= ramdisk->image;
> +   Âtreq.cb   = replicated_write_callback;
> +Â Â Â Âtreq.cb_data = ramdisk;
> +   Âtreq.id   = 0;
> +   Âtreq.sidx  = 0;
> +
> +   Âvreq    Â= calloc(1, sizeof(td_vbd_request_t));
> +Â Â Â Âtreq.private = vreq;
> +
> +Â Â Â Âif(!vreq)
> +Â Â Â Â Â Â Â Âreturn -1;
> +
> +Â Â Â Âvreq->submitting = 1;
> +Â Â Â ÂINIT_LIST_HEAD(&vreq->next);
> +Â Â Â Âtapdisk_vbd_move_request(treq.private, &vbd->pending_requests);
> +
> +Â Â Â Âtd_forward_request(treq);
> +
> +Â Â Â Âvreq->submitting--;
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +/* http://www.concentric.net/~Ttwang/tech/inthash.htm */
> +static unsigned int uint64_hash(void *k)
> +{
> +Â Â Â Âuint64_t key = *(uint64_t*)k;
> +
> +Â Â Â Âkey = (~key) + (key << 18);
> +Â Â Â Âkey = key ^ (key >> 31);
> +Â Â Â Âkey = key * 21;
> +Â Â Â Âkey = key ^ (key >> 11);
> +Â Â Â Âkey = key + (key << 6);
> +Â Â Â Âkey = key ^ (key >> 22);
> +
> +Â Â Â Âreturn (unsigned int)key;
> +}
> +
> +static int rd_hash_equal(void *k1, void *k2)
> +{
> +Â Â Â Âuint64_t key1, key2;
> +
> +Â Â Â Âkey1 = *(uint64_t*)k1;
> +Â Â Â Âkey2 = *(uint64_t*)k2;
> +
> +Â Â Â Âreturn key1 == key2;
> +}
> +
> +static int uint64_compare(const void *k1, const void *k2)
> +{
> +Â Â Â Âuint64_t u1 = *(uint64_t*)k1;
> +Â Â Â Âuint64_t u2 = *(uint64_t*)k2;
> +
> +Â Â Â Â/* u1 - u2 is unsigned */
> +Â Â Â Âreturn u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
> +}
> +
> +static struct hashtable *ramdisk_new_hashtable(void)
> +{
> +Â Â Â Âreturn create_hashtable(RAMDISK_HASHSIZE, uint64_hash, rd_hash_equal);
> +}
> +
> +/*
> + * set psectors to an array of the sector numbers in the hash, returning
> + * the number of entries (or -1 on error)
> + */
> +static int ramdisk_get_sectors(struct hashtable *h, uint64_t **psectors,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â const char *log_prefix)
> +{
> +Â Â Â Âstruct hashtable_itr* itr;
> +Â Â Â Âuint64_t* sectors;
> +Â Â Â Âint count;
> +
> +Â Â Â Âif (!(count = hashtable_count(h)))
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âif (!(*psectors = malloc(count * sizeof(uint64_t)))) {
> +Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_get_sectors: error allocating sector map\n");
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +Â Â Â Âsectors = *psectors;
> +
> +Â Â Â Âitr = hashtable_iterator(h);
> +Â Â Â Âcount = 0;
> +Â Â Â Âdo {
> +Â Â Â Â Â Â Â Âsectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
> +Â Â Â Â} while (hashtable_iterator_advance(itr));
> +Â Â Â Âfree(itr);
> +
> +Â Â Â Âreturn count;
> +}
> +
> +static int ramdisk_write_hash(struct hashtable *h, uint64_t sector, char *buf,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsize_t len, const char *log_prefix)
> +{
> +Â Â Â Âchar *v;
> +Â Â Â Âuint64_t *key;
> +
> +Â Â Â Âif ((v = hashtable_search(h, §or))) {
> +Â Â Â Â Â Â Â Âmemcpy(v, buf, len);
> +Â Â Â Â Â Â Â Âreturn 0;
> +Â Â Â Â}
> +
> +Â Â Â Âif (!(v = malloc(len))) {
> +Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash: malloc failed\n");
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +Â Â Â Âmemcpy(v, buf, len);
> +Â Â Â Âif (!(key = malloc(sizeof(*key)))) {
> +Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash: error allocating key\n");
> +Â Â Â Â Â Â Â Âfree(v);
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +Â Â Â Â*key = sector;
> +Â Â Â Âif (!hashtable_insert(h, key, v)) {
> +Â Â Â Â Â Â Â ÂDPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
> +Â Â Â Â Â Â Â Âfree(key);
> +Â Â Â Â Â Â Â Âfree(v);
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +/*
> + * return -1 for OOM
> + * return -2 for merge lookup failure(should not happen)
> + * return -3 for WAW race
> + * return 0 on success.
> + */
> +static int merge_requests(ramdisk_t *ramdisk, uint64_t start,
> +Â Â Â Â Â Â Â Â Â Â Â Â Âsize_t count, char **mergedbuf)
> +{
> +Â Â Â Âchar* buf;
> +Â Â Â Âchar* sector;
> +Â Â Â Âint i;
> +Â Â Â Âuint64_t *key;
> +Â Â Â Âint rc = 0;
> +Â Â Â Âconst char *log_prefix = ramdisk->log_prefix;
> +
> +Â Â Â Âif (!(buf = valloc(count * ramdisk->sector_size))) {
> +Â Â Â Â Â Â Â ÂDPRINTF("merge_request: allocation failed\n");
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +
> +Â Â Â Âfor (i = 0; i < count; i++) {
> +Â Â Â Â Â Â Â Âif (!(sector = hashtable_search(ramdisk->prev, &start))) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("merge_request: lookup failed on %"PRIu64"\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âstart);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = -2;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Â/* Check inprogress requests to avoid waw non-determinism */
> +Â Â Â Â Â Â Â Âif (hashtable_search(ramdisk->inprogress, &start)) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂDPRINTF("merge_request: WAR RACE on %"PRIu64"\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âstart);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = -3;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â * Insert req into inprogress (brief period of duplication of
> +Â Â Â Â Â Â Â Â * hash entries until they are removed from prev. Read tracking
> +Â Â Â Â Â Â Â Â * would not be reading wrong entries)
> +Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Âif (!(key = malloc(sizeof(*key)))) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("%s: error allocating key\n", __FUNCTION__);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = -1;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â Â Â Â Â*key = start;
> +Â Â Â Â Â Â Â Âif (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â__FUNCTION__, start);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(key);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> +Â Â Â Â Â Â Â Â Â Â Â Ârc = -1;
> +Â Â Â Â Â Â Â Â Â Â Â Âgoto fail;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Âmemcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
> +Â Â Â Â Â Â Â Âstart++;
> +Â Â Â Â}
> +
> +Â Â Â Â*mergedbuf = buf;
> +Â Â Â Âreturn 0;
> +fail:
> +Â Â Â Âfor (start--; i > 0; i--, start--)
> +Â Â Â Â Â Â Â Âhashtable_remove(ramdisk->inprogress, &start);
> +Â Â Â Âreturn rc;
> +}
> +
> +#define HASHTABLE_DESTROY(hashtable, free)Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Âdo {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Âif (hashtable) {Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â \
> +Â Â Â Â Â Â Â Â Â Â Â Âhashtable_destroy(hashtable, free);Â Â Â\
> +Â Â Â Â Â Â Â Â Â Â Â Âhashtable = NULL;Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â Â Â Â Â}Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â\
> +Â Â Â Â} while (0)
> +
> +int ramdisk_init(ramdisk_t *ramdisk)
> +{
> +Â Â Â Âramdisk->inflight = 0;
> +Â Â Â Âramdisk->prev = NULL;
> +Â Â Â Âramdisk->inprogress = NULL;
> +Â Â Â Âramdisk->primary_cache = ramdisk_new_hashtable();
> +Â Â Â Âif (!ramdisk->primary_cache)
> +Â Â Â Â Â Â Â Âreturn -1;
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +void ramdisk_destroy(ramdisk_t *ramdisk)
> +{
> +Â Â Â Âconst char *log_prefix = ramdisk->log_prefix;
> +
> +Â Â Â Â/*
> +Â Â Â Â * ramdisk_destroy() is called only when we will close the tapdisk image.
> +Â Â Â Â * In this case, there are no pending requests in vbd.
> +Â Â Â Â *
> +Â Â Â Â * If ramdisk->inflight is not 0, it means that the requests created by
> +Â Â Â Â * us are still in vbd->pending_requests.
> +Â Â Â Â */
> +Â Â Â Âif (ramdisk->inflight) {
> +Â Â Â Â Â Â Â Â/* should not happen */
> +Â Â Â Â Â Â Â ÂEPRINTF("cannot destroy ramdisk\n");
> +Â Â Â Â Â Â Â Âreturn;
> +Â Â Â Â}
> +
> +Â Â Â ÂHASHTABLE_DESTROY(ramdisk->inprogress, 0);
> +Â Â Â ÂHASHTABLE_DESTROY(ramdisk->prev, 1);
> +Â Â Â ÂHASHTABLE_DESTROY(ramdisk->primary_cache, 1);
> +}
> +
> +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector,
> +Â Â Â Â Â Â Â Â int nb_sectors, char *buf)
> +{
> +Â Â Â Âint i;
> +Â Â Â Âchar *v;
> +Â Â Â Âuint64_t key;
> +
> +Â Â Â Âfor (i = 0; i < nb_sectors; i++) {
> +Â Â Â Â Â Â Â Âkey = sector + i;
> +Â Â Â Â Â Â Â Â/* check whether it is queued in a previous flush request */
> +Â Â Â Â Â Â Â Âif (!(ramdisk->prev &&
> +Â Â Â Â Â Â Â Â Â Â(v = hashtable_search(ramdisk->prev, &key)))) {
> +Â Â Â Â Â Â Â Â Â Â Â Â/* check whether it is an ongoing flush */
> +Â Â Â Â Â Â Â Â Â Â Â Âif (!(ramdisk->inprogress &&
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â(v = hashtable_search(ramdisk->inprogress, &key))))
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â Â Â Â Âmemcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
> +Â Â Â Â}
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âint nb_sectors, size_t sector_size,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âchar *buf, const char *log_prefix)
> +{
> +Â Â Â Âint i, rc;
> +
> +Â Â Â Âfor (i = 0; i < nb_sectors; i++) {
> +Â Â Â Â Â Â Â Ârc = ramdisk_write_hash(ramdisk->primary_cache, sector + i,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âbuf + i * sector_size,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âsector_size, log_prefix);
> +Â Â Â Â Â Â Â Âif (rc)
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn rc;
> +Â Â Â Â}
> +
> +Â Â Â Âreturn 0;
> +}
> +
> +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk)
> +{
> +Â Â Â Âuint64_t *sectors;
> +Â Â Â Âchar *buf = NULL;
> +Â Â Â Âuint64_t base, batchlen;
> +Â Â Â Âint i, j, count = 0;
> +Â Â Â Âconst char *log_prefix = ramdisk->log_prefix;
> +
> +Â Â Â Â/* everything is in flight */
> +Â Â Â Âif (!ramdisk->prev)
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âcount = ramdisk_get_sectors(ramdisk->prev, §ors, log_prefix);
> +Â Â Â Âif (count <= 0)
> +Â Â Â Â Â Â Â Â/* should not happen */
> +Â Â Â Â Â Â Â Âreturn count;
> +
> +Â Â Â Â/* Create the inprogress table if empty */
> +Â Â Â Âif (!ramdisk->inprogress) {
> +Â Â Â Â Â Â Â Âramdisk->inprogress = ramdisk_new_hashtable();
> +Â Â Â Â Â Â Â Âif (!ramdisk->inprogress) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("ramdisk_flush: creating the inprogress table failed:OOM\n");
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â}
> +
> +Â Â Â Â/* sort and merge sectors to improve disk performance */
> +Â Â Â Âqsort(sectors, count, sizeof(*sectors), uint64_compare);
> +
> +Â Â Â Âfor (i = 0; i < count;) {
> +Â Â Â Â Â Â Â Âbase = sectors[i++];
> +Â Â Â Â Â Â Â Âwhile (i < count && sectors[i] == sectors[i-1] + 1)
> +Â Â Â Â Â Â Â Â Â Â Â Âi++;
> +Â Â Â Â Â Â Â Âbatchlen = sectors[i-1] - base + 1;
> +
> +Â Â Â Â Â Â Â Âj = merge_requests(ramdisk, base, batchlen, &buf);
> +Â Â Â Â Â Â Â Âif (j) {
> +Â Â Â Â Â Â Â Â Â Â Â ÂEPRINTF("ramdisk_flush: merge_requests failed:%s\n",
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âj == -1 ? "OOM" :
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â(j == -2 ? "missing sector" :
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â "WAW race"));
> +Â Â Â Â Â Â Â Â Â Â Â Âif (j == -3)
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âcontinue;
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(sectors);
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â Â Â Â Â}
> +
> +Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â * NOTE: create_write_request() creates a treq AND forwards
> +Â Â Â Â Â Â Â Â * it down the driver chain
> +Â Â Â Â Â Â Â Â *
> +Â Â Â Â Â Â Â Â * TODO: handle create_write_request()'s error.
> +Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Âcreate_write_request(ramdisk, base, batchlen, buf);
> +
> +Â Â Â Â Â Â Â Âramdisk->inflight++;
> +
> +Â Â Â Â Â Â Â Âfor (j = 0; j < batchlen; j++) {
> +Â Â Â Â Â Â Â Â Â Â Â Âbuf = hashtable_search(ramdisk->prev, &base);
> +Â Â Â Â Â Â Â Â Â Â Â Âfree(buf);
> +Â Â Â Â Â Â Â Â Â Â Â Âhashtable_remove(ramdisk->prev, &base);
> +Â Â Â Â Â Â Â Â Â Â Â Âbase++;
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â}
> +
> +Â Â Â Âif (!hashtable_count(ramdisk->prev))
> +Â Â Â Â Â Â Â Â/* everything is in flight */
> +Â Â Â Â Â Â Â ÂHASHTABLE_DESTROY(ramdisk->prev, 0);
> +
> +Â Â Â Âfree(sectors);
> +Â Â Â Âreturn 0;
> +}
> +
> +int ramdisk_start_flush(ramdisk_t *ramdisk)
> +{
> +Â Â Â Âuint64_t *key;
> +Â Â Â Âchar *buf;
> +Â Â Â Âint rc = 0;
> +Â Â Â Âint i, j, count, batchlen;
> +Â Â Â Âuint64_t *sectors;
> +Â Â Â Âconst char *log_prefix = ramdisk->log_prefix;
> +Â Â Â Âstruct hashtable *cache;
> +
> +Â Â Â Âcache = ramdisk->primary_cache;
> +Â Â Â Âif (!hashtable_count(cache))
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âif (ramdisk->prev) {
> +Â Â Â Â Â Â Â Â/*
> +Â Â Â Â Â Â Â Â * a flush request issued while a previous flush is still in
> +Â Â Â Â Â Â Â Â * progress will merge with the previous request. If you want
> +Â Â Â Â Â Â Â Â * the previous request to be consistent, wait for it to
> +Â Â Â Â Â Â Â Â * complete.
> +Â Â Â Â Â Â Â Â */
> +Â Â Â Â Â Â Â Âcount = ramdisk_get_sectors(cache, §ors, log_prefix);
> +Â Â Â Â Â Â Â Âif (count < 0 )
> +Â Â Â Â Â Â Â Â Â Â Â Âreturn count;
> +
> +Â Â Â Â Â Â Â Âfor (i = 0; i < count; i++) {
> +Â Â Â Â Â Â Â Â Â Â Â Âbuf = hashtable_search(cache, sectors + i);
> +Â Â Â Â Â Â Â Â Â Â Â Âramdisk_write_hash(ramdisk->prev, sectors[i], buf,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â ramdisk->sector_size, log_prefix);
> +Â Â Â Â Â Â Â Â}
> +Â Â Â Â Â Â Â Âfree(sectors);
> +
> +Â Â Â Â Â Â Â Âhashtable_destroy(cache, 1);
> +Â Â Â Â} else
> +Â Â Â Â Â Â Â Âramdisk->prev = cache;
> +
> +Â Â Â Â/*
> +Â Â Â Â * We create a new hashtable so that new writes can be performed before
> +Â Â Â Â * the old hashtable is completely drained.
> +Â Â Â Â */
> +Â Â Â Âramdisk->primary_cache = ramdisk_new_hashtable();
> +Â Â Â Âif (!ramdisk->primary_cache) {
> +Â Â Â Â Â Â Â ÂEPRINTF("ramdisk_start_flush: creating cache table failed: OOM\n");
> +Â Â Â Â Â Â Â Âreturn -1;
> +Â Â Â Â}
> +
> +Â Â Â Âreturn ramdisk_flush_pended_requests(ramdisk);
> +}
> +
> +int ramdisk_writes_inflight(ramdisk_t *ramdisk)
> +{
> +Â Â Â Âif (!ramdisk->inflight && !ramdisk->prev)
> +Â Â Â Â Â Â Â Âreturn 0;
> +
> +Â Â Â Âreturn 1;
> +}
> diff --git a/tools/blktap2/drivers/block-replication.h b/tools/blktap2/drivers/block-replication.h
> index 358c08b..cbdac3c 100644
> --- a/tools/blktap2/drivers/block-replication.h
> +++ b/tools/blktap2/drivers/block-replication.h
> @@ -110,4 +110,69 @@ int td_replication_server_restart(td_replication_connect_t *t);
> Â */
> Âint td_replication_client_start(td_replication_connect_t *t);
>
> +/* I/O replication */
> +typedef struct ramdisk ramdisk_t;
> +struct ramdisk {
> +Â Â Â Âsize_t sector_size;
> +Â Â Â Âconst char *log_prefix;
> +Â Â Â Âtd_image_t *image;
> +
> +Â Â Â Â/* private */
> +Â Â Â Â/* count of outstanding requests to the base driver */
> +Â Â Â Âsize_t inflight;
> +Â Â Â Â/* prev holds the requests to be flushed, while inprogress holds
> +Â Â Â Â * requests being flushed. When requests complete, they are removed
> +Â Â Â Â * from inprogress.
> +Â Â Â Â * Whenever a new flush is merged with ongoing flush (i.e, prev),
> +Â Â Â Â * we have to make sure that none of the new requests overlap with
> +Â Â Â Â * ones in "inprogress". If it does, keep it back in prev and dont issue
> +Â Â Â Â * IO until the current one finishes. If we allow this IO to proceed,
> +Â Â Â Â * we might end up with two "overlapping" requests in the disk's queue and
> +Â Â Â Â * the disk may not offer any guarantee on which one is written first.
> +Â Â Â Â * IOW, make sure we dont create a write-after-write time ordering constraint.
> +Â Â Â Â */
> +Â Â Â Âstruct hashtable *prev;
> +Â Â Â Âstruct hashtable *inprogress;
> +Â Â Â Â/*
> +Â Â Â Â * The primary write request is queued in this
> +Â Â Â Â * hashtable, and will be flushed to ramdisk when
> +Â Â Â Â * the checkpoint finishes.
> +Â Â Â Â */
> +Â Â Â Âstruct hashtable *primary_cache;
> +};
> +
> +int ramdisk_init(ramdisk_t *ramdisk);
> +void ramdisk_destroy(ramdisk_t *ramdisk);
> +
> +/*
> + * try to read from ramdisk. Return -1 if some sectors are not in
> + * ramdisk. Otherwise, return 0.
> + */
> +int ramdisk_read(ramdisk_t *ramdisk, uint64_t sector,
> +Â Â Â Â Â Â Â Â int nb_sectors, char *buf);
> +
> +/*
> + * cache the write requests, and it will be flushed after a
> + * new checkpoint finishes
> + */
> +int ramdisk_cache_write_request(ramdisk_t *ramdisk, uint64_t sector,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âint nb_sectors, size_t sector_size,
> +Â Â Â Â Â Â Â Â Â Â Â Â Â Â Â Âchar* buf, const char *log_prefix);
> +
> +/* flush pended write requests to disk */
> +int ramdisk_flush_pended_requests(ramdisk_t *ramdisk);
> +/*
> + * flush cached write requests to disk. If WAW is detected, the cached
> + * write requests will be moved to pended queue. The pended write
> + * requests will be auto flushed after all inprogress write requests
> + * are flushed to disk. This function don't wait all write requests
> + * are flushed to disk.
> + */
> +int ramdisk_start_flush(ramdisk_t *ramdisk);
> +/*
> + * Return true if some write reqeusts are inprogress or pended,
> + * otherwise return false
> + */
> +int ramdisk_writes_inflight(ramdisk_t *ramdisk);
> +
> Â#endif
> --
> 1.9.3
>
Acked-by: Shriram Rajagopalan <rshriram@xxxxxxxxx>
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
http://lists.xen.org/xen-devel
|