[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] [PATCH RFC 02/59] controller: Revamp communication structure



From: George Dunlap <george.dunlap@xxxxxxxxxx>

Two general-purporse channels rather than one per worker.

Set up two workers, gather and collate the information at the central
station.

Have the worker print out a report at start-of-day, so we get timing
information for the first actual report.

Catch SIGINT as a shorthand testing way of managing tear-down
gracefully.

Signed-off-by: George Dunlap <george.dunlap@xxxxxxxxxx>
---
 main.go | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 78 insertions(+), 18 deletions(-)

diff --git a/main.go b/main.go
index 6e90754..0cb9f51 100644
--- a/main.go
+++ b/main.go
@@ -2,13 +2,18 @@ package main
 
 import (
        "fmt"
+       "os"
        "os/exec"
+       "os/signal"
        "encoding/json"
        "bufio"
        "io"
+       
 )
 
 type Worker struct {
+       Id int
+       
        c *exec.Cmd
 
        stdout io.ReadCloser
@@ -17,14 +22,14 @@ type Worker struct {
 }
 
 type WorkerReport struct {
+       Id int
        Now int
        Mops int
        MaxDelta int
 }
 
-func (w *Worker) Start() (err error) {
+func (w *Worker) Init() (err error) {
        w.c = exec.Command("../worker/worker-proc", "burnwait", "20", 
"20000000")
-       
 
        w.stdout, err = w.c.StdoutPipe()
        if err != nil {
@@ -32,47 +37,102 @@ func (w *Worker) Start() (err error) {
                return
        }
 
-       w.c.Start()
-
-       b, err := json.Marshal(WorkerReport{5,6,7})
-       fmt.Print("Example json: ", string(b))
-       
        return
 }
 
-func (w *Worker) Wait() {
-       w.c.Wait()
+func (w *Worker) Shutdown() {
+       w.c.Process.Kill()
 }
 
-func (w *Worker) Process() {
+func (w *Worker) Process(report chan WorkerReport, done chan bool) {
+       w.c.Start()
+
        scanner := bufio.NewScanner(w.stdout)
 
        for scanner.Scan() {
                s := scanner.Text()
                
-               fmt.Println("Got these bytes: ", s);
+               //fmt.Println("Got these bytes: ", s);
 
                if w.jsonStarted {
                        var r WorkerReport
-                       
                        json.Unmarshal([]byte(s), &r)
-                       fmt.Println(r)
+                       r.Id = w.Id
+                       report <- r
                } else {
                        if s == "START JSON" {
-                               fmt.Println("Got token to start parsing json")
+                               //fmt.Println("Got token to start parsing json")
                                w.jsonStarted = true
                        }
                }
        }
+
+       done <- true
+
+       w.c.Wait()
+}
+
+const (
+       USEC = 1000
+       MSEC = USEC * 1000
+       SEC = MSEC * 1000
+)
+
+type WorkerState struct {
+       Worker
+       LastReport WorkerReport
+}
+
+func Report(ws *WorkerState, r WorkerReport) {
+       //fmt.Println(r)
+
+       lr := ws.LastReport
+
+       if (lr.Now > 0) {
+               time := float64(r.Now - lr.Now) / SEC
+               mops := r.Mops - lr.Mops
+
+               tput := float64(mops) / time
+
+               fmt.Printf("%d Time: %2.3f Mops: %d Tput: %4.2f\n", r.Id, time, 
mops, tput);
+       }
+
+       ws.LastReport = r
 }
 
 func main() {
+       count := 2
+       
+       report := make(chan WorkerReport)
+       done := make(chan bool)
+       signals := make(chan os.Signal, 1)
 
-       w:=Worker{}
+       signal.Notify(signals, os.Interrupt)
        
-       w.Start()
+       i := 0
 
-       w.Process()
+       Workers := make([]WorkerState, count)
+       
+       for i = 0; i< count; i++ {
+               Workers[i].Id = i
+               
+               Workers[i].Init()
+               
+               go Workers[i].Process(report, done)
+       }
 
-       w.Wait()
+       for i > 0 {
+               select {
+               case r := <-report:
+                       Report(&Workers[r.Id], r)
+               case <-done:
+                       i--;
+                       fmt.Println(i, "workers left");
+               case <-signals:
+                       fmt.Println("SIGINT receieved, shutting down workers")
+                       for j := range Workers {
+                               Workers[j].Shutdown()
+                       }
+               }
+       }
 }
-- 
2.7.4


_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxx
https://lists.xen.org/xen-devel

 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.