[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Added asynchronous support to the blockstore.



ChangeSet 1.1336.1.1, 2005/03/22 15:50:43+00:00, jrb44@xxxxxxxxxxxxxxxxxx

        Added asynchronous support to the blockstore.



 blockstore.c |  531 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
 blockstore.h |    4 
 2 files changed, 455 insertions(+), 80 deletions(-)


diff -Nru a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c
--- a/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
+++ b/tools/blktap/blockstore.c 2005-03-22 15:03:49 -05:00
@@ -13,31 +13,73 @@
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <stdarg.h>
 #include "blockstore.h"
 
 #define BLOCKSTORE_REMOTE
+//#define BSDEBUG
 
-#ifdef BLOCKSTORE_REMOTE
+/*****************************************************************************
+ * Debugging
+ */
+#ifdef BSDEBUG
+void DB(char *format, ...)
+{
+    va_list args;
+    
+    va_start(args, format);
+    vfprintf(stderr, format, args);
+    va_end(args);
+}
+#else
+#define DB(format, ...) (void)0
+#endif
 
-//#define BSDEBUG
+#ifdef BLOCKSTORE_REMOTE
 
 #include <sys/socket.h>
 #include <sys/ioctl.h>
 #include <netinet/in.h>
 #include <netdb.h>
 
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+/*****************************************************************************
+ *                                                                           *
+ *****************************************************************************/
+
+/*****************************************************************************
+ * Network state                                                             *
+ *****************************************************************************/
 
+/* The individual disk servers we talks to. These will be referenced by
+ * an integer index into bsservers[].
+ */
 bsserver_t bsservers[MAX_SERVERS];
+
+/* The cluster map. This is indexed by an integer cluster number.
+ */
 bscluster_t bsclusters[MAX_CLUSTERS];
 
+/* Local socket.
+ */
 struct sockaddr_in sin_local;
 int bssock = 0;
 
+/*****************************************************************************
+ * Message queue management                                                  *
+ *****************************************************************************/
+
+/* Protects the queue manipulation critcal regions.
+ */
+#define ENTER_QUEUE_CR (void)0
+#define LEAVE_QUEUE_CR (void)0
+
+/* A message queue entry. We allocate one of these for every request we send.
+ * Asynchronous reply reception also used one of these.
+ */
 typedef struct bsq_t_struct {
     struct bsq_t_struct *prev;
     struct bsq_t_struct *next;
+    int status;
     int server;
     int length;
     struct msghdr msghdr;
@@ -46,8 +88,134 @@
     void *block;
 } bsq_t;
 
+#define BSQ_STATUS_MATCHED 1
+
+#define ENTER_LUID_CR (void)0
+#define LEAVE_LUID_CR (void)0
+
+static u64 luid_cnt = 0x1000ULL;
+u64 new_luid(void) {
+    u64 luid;
+    ENTER_LUID_CR;
+    luid = luid_cnt++;
+    LEAVE_LUID_CR;
+    return luid;
+}
+
+/* Queue of outstanding requests.
+ */
 bsq_t *bs_head = NULL;
 bsq_t *bs_tail = NULL;
+int bs_qlen = 0;
+
+/*
+ */
+void queuedebug(char *msg) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    fprintf(stderr, "Q: %s len=%u\n", msg, bs_qlen);
+    for (q = bs_head; q; q = q->next) {
+        fprintf(stderr, "  luid=%016llx server=%u\n",
+                q->message.luid, q->server);
+    }
+    LEAVE_QUEUE_CR;
+}
+
+int enqueue(bsq_t *qe) {
+    ENTER_QUEUE_CR;
+    qe->next = NULL;
+    qe->prev = bs_tail;
+    if (!bs_head)
+        bs_head = qe;
+    else
+        bs_tail->next = qe;
+    bs_tail = qe;
+    bs_qlen++;
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("enqueue");
+#endif
+    return 0;
+}
+
+int dequeue(bsq_t *qe) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    for (q = bs_head; q; q = q->next) {
+        if (q == qe) {
+            if (q->prev)
+                q->prev->next = q->next;
+            else 
+                bs_head = q->next;
+            if (q->next)
+                q->next->prev = q->prev;
+            else
+                bs_tail = q->prev;
+            bs_qlen--;
+            goto found;
+        }
+    }
+
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("dequeue not found");
+#endif
+    return 0;
+
+    found:
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("dequeue not found");
+#endif
+    return 1;
+}
+
+bsq_t *queuesearch(bsq_t *qe) {
+    bsq_t *q;
+    ENTER_QUEUE_CR;
+    for (q = bs_head; q; q = q->next) {
+        if ((qe->server == q->server) &&
+            (qe->message.operation == q->message.operation) &&
+            (qe->message.luid == q->message.luid)) {
+
+            if ((q->message.operation == BSOP_READBLOCK) &&
+                ((q->message.flags & BSOP_FLAG_ERROR) == 0)) {
+                q->block = qe->block;
+                qe->block = NULL;
+            }
+            q->length = qe->length;
+            q->message.flags = qe->message.flags;
+            q->message.id = qe->message.id;
+            q->status |= BSQ_STATUS_MATCHED;
+
+            if (q->prev)
+                q->prev->next = q->next;
+            else 
+                bs_head = q->next;
+            if (q->next)
+                q->next->prev = q->prev;
+            else
+                bs_tail = q->prev;
+            q->next = NULL;
+            q->prev = NULL;
+            bs_qlen--;
+            goto found;
+        }
+    }
+
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("queuesearch not found");
+#endif
+    return NULL;
+
+    found:
+    LEAVE_QUEUE_CR;
+#ifdef BSDEBUG
+    queuedebug("queuesearch found");
+#endif
+    return q;
+}
 
 int send_message(bsq_t *qe) {
     int rc;
@@ -71,16 +239,21 @@
         qe->iov[1].iov_len = BLOCK_SIZE;
     }
 
-    rc = sendmsg(bssock, &(qe->msghdr), 0);
+    qe->message.luid = new_luid();
+
+    qe->status = 0;
+    if (enqueue(qe) < 0) {
+        fprintf(stderr, "Error enqueuing request.\n");
+        return -1;
+    }
+
+    DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
+    rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
     //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
     //           (struct sockaddr *)&(bsservers[qe->server].sin),
     //           sizeof(struct sockaddr_in));
     if (rc < 0)
         return rc;
-    
-    ENTER_QUEUE_CR;
-    
-    LEAVE_QUEUE_CR;
 
     return rc;
 }
@@ -115,22 +288,148 @@
     return rc;
 }
 
+int get_server_number(struct sockaddr_in *sin) {
+    int i;
+


-------------------------------------------------------
This SF.net email is sponsored by: 2005 Windows Mobile Application Contest
Submit applications for Windows Mobile(tm)-based Pocket PCs or Smartphones
for the chance to win $25,000 and application distribution. Enter today at
http://ads.osdn.com/?ad_id=6882&alloc_id=15148&op=click
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.