2017-08-25 79 views
0

我从google io 2010中获取loadbalancer代码,并为优先级队列和Balancer的同步锁定添加了实现。我故意设置workFn函数的延迟大于requester,所以我可以看到待定值将如何增加。我在cli中运行它,注意到在所有工人启动后,程序停止并等待所有工人的值为并且什么也没有显示。我无法弄清楚错误在哪里,有时候completed只会调用一次或两次。看起来<-b.done在选择的情况下没有正确处理。简单负载均衡器无法正常工作

package main 

import (
    "container/heap" 
    "fmt" 
    "math/rand" 
    "os" 
    "sync" 
    "time" 
) 

var nWorker int32 = 6 

func main() { 
    rchanel := make(chan Request) 
    workers := Pool{ 
     {make(chan Request), 0, 0}, 
     {make(chan Request), 0, 1}, 
     {make(chan Request), 0, 2}, 
     {make(chan Request), 0, 3}, 
     {make(chan Request), 0, 4}, 
     {make(chan Request), 0, 5}, 
    } 
    doneChan := make(chan *Worker) 
    balancer := Balancer{workers, sync.Mutex{}, doneChan} 
    for _, elem := range workers { 
     go elem.work(doneChan) 
    } 
    go balancer.balance(rchanel) 
    go requester(rchanel) 

    var input string 
    fmt.Scanln(&input) 
} 

type Request struct { 
    fn func() int 
    c chan int 
} 

func requester(work chan Request) { 
    c := make(chan int) 
    for { 
     time.Sleep(time.Duration(rand.Int31n(nWorker)) * 2e4) 
     work <- Request{workFn, c} 
     go func() { 
      result := <-c 
      fmt.Fprintf(os.Stderr, "Done: %v \n", result) 
     }() 
    } 
} 

func workFn() int { 
    val := rand.Int31n(nWorker) 
    time.Sleep(time.Duration(val) * 2e8) 
    return int(val) 
} 

type Worker struct { 
    requests chan Request 
    pending int 
    index int 
} 

func (w *Worker) work(done chan *Worker) { 
    for { 
     req := <-w.requests 
     req.c <- req.fn() 
     done <- w 
    } 
} 

type Pool []*Worker 

func (p Pool) Less(i, j int) bool { 
    return p[i].pending < p[j].pending 
} 
func (p Pool) Swap(i, j int) { 
    p[i], p[j] = p[j], p[i] 
    p[i].index = i 
    p[j].index = j 
} 
func (p Pool) Len() int { return len(p) } 
func (p *Pool) Push(x interface{}) { 
    n := len(*p) 
    worker := x.(*Worker) 
    worker.index = n 
    *p = append(*p, worker) 
} 
func (p *Pool) Pop() interface{} { 
    old := *p 
    n := len(old) 
    item := old[n-1] 
    item.index = -1 
    *p = old[0 : n-1] 
    return item 
} 

type Balancer struct { 
    pool Pool 
    mu sync.Mutex 
    done chan *Worker 
} 

func (b *Balancer) dispatch(req Request) { 
    b.mu.Lock() 
    w := heap.Pop(&b.pool).(*Worker) 
    w.requests <- req 
    w.pending++ 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 
func (b *Balancer) completed(w *Worker) { 
    b.mu.Lock() 
    w.pending-- 
    heap.Remove(&b.pool, w.index) 
    heap.Push(&b.pool, w) 
    b.mu.Unlock() 
} 

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      b.dispatch(req) 
      b.printStatus() 
     case w := <-b.done: 
      b.completed(w) 
      b.printStatus() 
     } 
    } 
} 

func (b *Balancer) printStatus() { 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
} 
+0

需要注意的是,这段代码会在'Worker.work'上泄漏goroutines。首先观察。仍在审查寻找问题的代码。 – RayfenWindspear

回答

1

的问题是,balance()够程最终被挡在dispatch()w.requests <- req在同一时间,具体Worker被阻断work()done <- w,生产运行balance()的够程死锁。

这里是你需要的修复。 balance()需要在内部使用goroutines。这将解决这个问题,因为现在如果dispatch()completed()中的例程阻止无关紧要,balance()的主程序将从channel继续selects。

注:这不适用于操场,因为它永远持续下去。

func (b *Balancer) balance(work chan Request) { 
    for { 
     select { 
     case req := <-work: 
      go func() { 
       b.dispatch(req) 
       b.printStatus() 
      }() 
     case w := <-b.done: 
      go func() { 
       b.completed(w) 
       b.printStatus() 
      }() 
     } 
    } 
} 

现在printStatus通话可以同时进行,它需要利用mutex的一样好,甚至你会得到随机panic秒。

func (b *Balancer) printStatus() { 
    b.mu.Lock() 
    fmt.Fprintf(os.Stderr, "Status: %v %v %v %v %v %v\n", b.pool[0].pending, b.pool[1].pending, b.pool[2].pending, b.pool[3].pending, b.pool[4].pending, b.pool[5].pending) 
    b.mu.Unlock() 
} 

现在,如果我可以弄清楚为什么pending值只是不断增加......据我所知,应该Worker.work()永远只允许pending01因为Worker必须等待对done <- w然后才能从dispatch()获得另一个Request。我相信这是理想的结果,但不是吗?

+0

嗨@RayfenWindspear,感谢您的解释,是有意增加了待处理的请求。但我仍然不明白为什么我们在'dispatch'里面被阻塞,它是完全不同的通道,'done <-w'和'w.requests <-req',并且'select'在调度时不会再执行一次行动,我说得对吗? –

+1

@MaksymVolodin正确的,只有一个例程在运行,它会阻止等待特定的工作人员检索'Request',而工人被阻止尝试'完成< - w'。实际上我不确定这是一个很好的例子,因为很难找到涉及到这么多渠道的事情,并且以不直观的方式进行交互。我没有必要去看一个频道,只需要一遍又一遍地检查它的定义,看看它究竟做了多少次。确实是一段艰难的代码。这并不意味着它不正确。 – RayfenWindspear