[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-changelog] Added concurrency niceness to the bottom end of the blockstore.



ChangeSet 1.1363, 2005/03/24 18:50:40+00:00, jrb44@xxxxxxxxxxxxxxxxxx

        Added concurrency niceness to the bottom end of the blockstore.
        
        Signed-off-by: James Bulpin <James.Bulpin@xxxxxxxxxxxx>



 blockstore.c |  120 +++++++++++++++++++++++++++++++++++++++++++++++++++++------
 blockstore.h |    4 -
 2 files changed, 111 insertions(+), 13 deletions(-)


diff -Nru a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c
--- a/tools/blktap/blockstore.c 2005-03-24 17:02:36 -05:00
+++ b/tools/blktap/blockstore.c 2005-03-24 17:02:36 -05:00
@@ -15,10 +15,11 @@
 #include <sys/stat.h>
 #include <stdarg.h>
 #include "blockstore.h"
+#include <pthread.h>
 #include "parallax-threaded.h"
 
 #define BLOCKSTORE_REMOTE
-//#define BSDEBUG
+#define BSDEBUG
 
 /*****************************************************************************
  * Debugging
@@ -27,7 +28,7 @@
 void DB(char *format, ...)
 {
     va_list args;
-    
+    fprintf(stderr, "[%05u] ", (int)pthread_getspecific(tid_key));
     va_start(args, format);
     vfprintf(stderr, format, args);
     va_end(args);
@@ -44,10 +45,6 @@
 #include <netdb.h>
 
 /*****************************************************************************
- *                                                                           *
- *****************************************************************************/
-
-/*****************************************************************************
  * Network state                                                             *
  *****************************************************************************/
 
@@ -71,8 +68,30 @@
 
 /* Protects the queue manipulation critcal regions.
  */
-#define ENTER_QUEUE_CR (void)0
-#define LEAVE_QUEUE_CR (void)0
+pthread_mutex_t ptmutex_queue;
+#define ENTER_QUEUE_CR pthread_mutex_lock(&ptmutex_queue)
+#define LEAVE_QUEUE_CR pthread_mutex_unlock(&ptmutex_queue)
+
+pthread_mutex_t ptmutex_recv;
+#define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
+#define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
+
+int notify = 0;
+pthread_mutex_t ptmutex_notify;
+pthread_cond_t ptcv_notify;
+#define RECV_NOTIFY { \
+    pthread_mutex_lock(&ptmutex_notify); \
+    notify = 1; \
+    pthread_cond_signal(&ptcv_notify); \
+    pthread_mutex_unlock(&ptmutex_notify); }
+#define RECV_AWAIT { \
+    pthread_mutex_lock(&ptmutex_notify); \
+    if (notify) \
+        notify = 0; \
+    else \
+        pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
+    pthread_mutex_unlock(&ptmutex_notify); }
+    
 
 /* A message queue entry. We allocate one of these for every request we send.
  * Asynchronous reply reception also used one of these.
@@ -91,8 +110,9 @@
 
 #define BSQ_STATUS_MATCHED 1
 
-#define ENTER_LUID_CR (void)0
-#define LEAVE_LUID_CR (void)0
+pthread_mutex_t ptmutex_luid;
+#define ENTER_LUID_CR pthread_mutex_lock(&ptmutex_luid)
+#define LEAVE_LUID_CR pthread_mutex_unlock(&ptmutex_luid)
 
 static u64 luid_cnt = 0x1000ULL;
 u64 new_luid(void) {
@@ -218,6 +238,10 @@
     return q;
 }
 
+/*****************************************************************************
+ * Network communication                                                     *
+ *****************************************************************************/
+
 int send_message(bsq_t *qe) {
     int rc;
 
@@ -331,7 +355,7 @@
 bsq_t *recv_any(void) {
     struct sockaddr_in from;
     int rc;
-
+    
     DB("ENTER recv_any\n");
 
     rx_qe.msghdr.msg_name = &from;
@@ -361,6 +385,7 @@
         perror("recv_any");
         return NULL;
     }
+
     rx_qe.length = rc;    
     rx_qe.server = get_server_number(&from);
 
@@ -395,8 +420,13 @@
         return numreqs;
     }
 
+    RECV_AWAIT;
+
+    /*
     rxagain:
+    ENTER_RECV_CR;
     q = recv_any();
+    LEAVE_RECV_CR;
     if (!q)
         return -1;
 
@@ -406,11 +436,42 @@
         fprintf(stderr, "Unmatched RX\n");
         goto rxagain;
     }
+    */
 
     goto checkmatch;
 
 }
 
+/* receive loop
+ */
+void *receive_loop(void *arg)
+{
+    bsq_t *q, *m;
+
+    for(;;) {
+        q = recv_any();
+        if (!q) {
+            fprintf(stderr, "recv_any error\n");
+        }
+        else {
+            m = queuesearch(q);
+            recv_recycle_buffer(q);
+            if (!m) {
+                fprintf(stderr, "Unmatched RX\n");
+            }
+            else {
+                DB("RX MATCH");
+                RECV_NOTIFY;
+            }
+        }
+    }
+}
+pthread_t pthread_recv;
+
+/*****************************************************************************
+ * Reading                                                                   *
+ *****************************************************************************/
+
 void *readblock_indiv(int server, u64 id) {
     void *block;
     bsq_t *qe;
@@ -538,6 +599,10 @@
     return block;
 }
 
+/*****************************************************************************
+ * Writing                                                                   *
+ *****************************************************************************/
+
 bsq_t *writeblock_indiv(int server, u64 id, void *block) {
 
     bsq_t *qe;
@@ -663,6 +728,10 @@
     return -1;
 }
 
+/*****************************************************************************
+ * Allocation                                                                *
+ *****************************************************************************/
+
 /**
  * allocblock: write a new block to disk
  *   @block: pointer to block
@@ -791,6 +860,9 @@
 
 #else /* /BLOCKSTORE_REMOTE */
 
+/*****************************************************************************
+ * Local storage version                                                     *
+ *****************************************************************************/
  
 /**
  * readblock: read a block from disk
@@ -923,6 +995,10 @@
 
 #endif /* BLOCKSTORE_REMOTE */
 
+/*****************************************************************************
+ * Memory management                                                         *
+ *****************************************************************************/
+
 /**
  * newblock: get a new in-memory block set to zeros
  *
@@ -1053,6 +1129,10 @@
     printf("Total of %Ld ids on freelist.\n", total);
 }
 
+/*****************************************************************************
+ * Initialisation                                                            *
+ *****************************************************************************/
+
 int __init_blockstore(void)
 {
     int i;
@@ -1062,6 +1142,13 @@
     
 #ifdef BLOCKSTORE_REMOTE
     struct hostent *addr;
+
+    pthread_mutex_init(&ptmutex_queue, NULL);
+    pthread_mutex_init(&ptmutex_luid, NULL);
+    pthread_mutex_init(&ptmutex_recv, NULL);
+    pthread_mutex_init(&ptmutex_notify, NULL);
+    pthread_cond_init(&ptcv_notify, NULL);
+
     bsservers[0].hostname = "firebug.cl.cam.ac.uk";
     bsservers[1].hostname = "planb.cl.cam.ac.uk";
     bsservers[2].hostname = "simcity.cl.cam.ac.uk";
@@ -1137,6 +1224,8 @@
         return -1;
     }
 
+    pthread_create(&pthread_recv, NULL, receive_loop, NULL);
+
 #else /* /BLOCKSTORE_REMOTE */
     block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
 
@@ -1169,4 +1258,13 @@
         
 #endif /*  BLOCKSTORE_REMOTE */   
     return 0;
+}
+
+void __exit_blockstore(void)


-------------------------------------------------------
SF email is sponsored by - The IT Product Guide
Read honest & candid reviews on hundreds of IT Products from real users.
Discover which products truly live up to the hype. Start reading now.
http://ads.osdn.com/?ad_id=6595&alloc_id=14396&op=click
_______________________________________________
Xen-changelog mailing list
Xen-changelog@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-changelog


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.