https://gist.github.com/rushilgupta/228dfdf379121cb9426d5e90d34c5b96

INTRO

Concurrency is a domain I have wanted to explore for a long time because the locks and the race conditions have always intimidated me. I recall somebody suggesting concurrency patterns in golang because they said “you share the data and not the variables”.

Amused by that, I searched for “concurrency in golang” and bumped into this awesome slide by Rob Pike: https://talks.golang.org/2012/waza.slide#1 which does a great job of explaining channels, concurrency patterns and a mini-architecture of load-balancer (also explains the above one-liner).

Let’s dig in:

Goroutines

Slide #32:

  • Goroutines’re a bit like threads, but they’re much cheaper.
  • When a goroutine blocks, that thread blocks but no other goroutine blocks.
  • You spawn a go-routine using go func().

Cool let’s try it out.

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func func1() {
  6. fmt.Println("boom")
  7. }
  8. func main() {
  9. go func1()
  10. }
  1. go build main.go
  1. ./main

Huh, it outputs nothing.
So we look at point 2 of go-routines:

  • “When a goroutine blocks, that thread blocks but no other goroutine blocks.”

Umm..so the go-routine of func1() was spawned but it didn’t get time to execute?

Let’s add some delay.

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func func1() {
  7. fmt.Println("boom")
  8. }
  9. func main() {
  10. go func1()
  11. time.Sleep(time.Millisecond)
  12. }
  1. boom

Awesome! Now we know if we give go-routine enough time to execute it can perform some of our concurrently.
It’d be really great if the main program could communicate with func1() and share some data-structures.

Channels

They are the link through which we can achieve the above.

  • You write in this link using link<-data.
  • You read from this link using data<-link.
  • You can read as long as you writing. Or else it gets blocked (and waits for the next write).

So is it a shared queue?

Let’s see

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func func1(channel chan string) {
  6. fmt.Println("haha")
  7. channel <- "boom"
  8. }
  9. func main() {
  10. channel := make(chan string)
  11. go func1(channel)
  12. fmt.Println(<-channel)
  13. }
  1. haha
  2. boom

Indeed!

Yeap, the fmt.Println(<-channel) blocked the main program and waited for func1().
Our go-routine wrote “boom” to the shared queue and main program read from it.

Multiple channels

Why not! We can call this function again with second channel.
We’ll then have 2 links in which main can communicate with 2 go-routines.

  1. package main
  2. import (
  3. "fmt"
  4. )
  5. func func1(channel chan string) {
  6. fmt.Println("haha")
  7. channel <- "boom"
  8. }
  9. func main() {
  10. channel1 := make(chan string)
  11. channel2 := make(chan string)
  12. go func1(channel1)
  13. go func1(channel2)
  14. fmt.Println("channel one sends ", <-channel1)
  15. fmt.Println("channel two sends ", <-channel2)
  16. }
  1. haha
  2. haha
  3. channel one sends boom
  4. channel two sends boom

Switch

Slide #34:
Wouldn’t it be great if we had some kind of a switch for go routines?
We do!
Its called select!

select queries each channel and the channel which is ready to be read, gets selected and we print the data.
Now, the interesting part is undecidability of the order.

Since fmt.Println() is an (slower) I/O operation, its almost certain that neither channel would be ready for i=0.
However, we can’t really guarantee which channel will have data available first (for i=1). No, that is something the kernel decides.

The print order depends on which go-routine gets executed first.

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func func1(channel chan string) {
  7. channel <- "boom"
  8. }
  9. func main() {
  10. channel1 := make(chan string)
  11. channel2 := make(chan string)
  12. go func1(channel1)
  13. go func1(channel2)
  14. count := 0
  15. for ; count < 3; {
  16. select {
  17. case v := <-channel1:
  18. fmt.Println("channel 1 sends", v)
  19. case v := <-channel2:
  20. fmt.Println("channel 2 sends", v)
  21. default: // optional
  22. fmt.Println("neither channel was ready")
  23. }
  24. time.Sleep(time.Millisecond)
  25. count++;
  26. }
  27. }

Load balancer architecture:

  • Slide #45
    1. |Client1| |Load| <-DONE-- |Worker1| processing R3 coming from Client1
    2. |Client2| --REQ-> |Blncr| --WOK-> |Worker2| processing R8 coming from Client2
    3. |Worker3| processing R5 coming from Client1
    4. <-RESP-- response of R4 to Client2

Data Flow

  • k Clients pack the value x in Request object and sends it to REQ channel.
  • Load balancer blocks on REQ channel listening to Request(s).
  • Load balancer chooses a worker and sends Request to one of the channels of worker WOK(i).
  • Worker receives Request and processes x (say calculates sin(x) lol).
  • Worker updates load balancer using DONE channel. LB uses this for load-balancing.
  • Worker writes the sin(x) value in the RESP response channel (enclosed in Request object).

Channels in play

  • central REQ channel (Type: Request)
  • n WOK channels (n sized worker pool, Type: Work)
  • k RESP channels (k clients, Type: Float)
  • n DONE channels (Type: Work)

Client and Request

  • Each client is a forever-running loop go-routine.
  • In that loop, it is spawning requests that are sent to the central REQ channel linked to LB.
  • For response, requests use a common channel (RESP) per client.
  1. type Request struct {
  2. data int
  3. resp chan float64
  4. }
  5. func createAndRequest(req chan Request) {
  6. resp := make(chan float64)
  7. // spawn requests indefinitely
  8. for {
  9. // wait before next request
  10. time.Sleep(time.Duration(rand.Int63n(int64(time.Millisecond))))
  11. req <- Request{int(rand.Int31n(90)), resp}
  12. // read value from RESP channel
  13. <-resp
  14. }
  15. }

Worker and processing

  • Each worker is a forever-running loop go-routine.
  • In that loop, each worker is blocked on its channel trying to get Request object and then later process it.
  • Worker can take multiple requests. # of pending keeps track of number of requests being executed.
  • pending in other words means how many requests are present/being executed in the channel of each worker.
  1. type Work struct {
  2. // heap index
  3. idx int
  4. // WOK channel
  5. wok chan Request
  6. // number of pending request this worker is working on
  7. pending int
  8. }
  9. func (w *Work) doWork(done chan *Work) {
  10. // worker works indefinitely
  11. for {
  12. // extract request from WOK channel
  13. req := <-w.wok
  14. // write to RESP channel
  15. req.resp <- math.Sin(float64(req.data))
  16. // write to DONE channel
  17. done <- w
  18. }
  19. }

Balancer data structures

  • The crux of Balancer is a heap (Pool) which balances based on number of pending requests.
  • DONE channel, is used to notify heap that worker is finished and pending counter can be decremented.
  1. type Pool []*Work
  2. type Balancer struct {
  3. // a pool of workers
  4. pool Pool
  5. done chan *Work
  6. }
  7. func InitBalancer() *Balancer {
  8. done := make(chan *Work, nWorker)
  9. // create nWorker WOK channels
  10. b := &Balancer{make(Pool, 0, nWorker), done}
  11. for i := 0; i < nWorker; i++ {
  12. w := &Work{wok: make(chan Request, nRequester)}
  13. // put them in heap
  14. heap.Push(&b.pool, w)
  15. go w.doWork(b.done)
  16. }
  17. return b
  18. }

Heap implementations

  • It sucks but golang wants you to implement your own Len, Less, Push, Pop, Swap for Heap interface.
  • Copied shamlessly from github (look at references below).
  1. func (p Pool) Len() int { return len(p) }
  2. func (p Pool) Less(i, j int) bool {
  3. return p[i].pending < p[j].pending
  4. }
  5. func (p *Pool) Swap(i, j int) {
  6. a := *p
  7. a[i], a[j] = a[j], a[i]
  8. a[i].idx = i
  9. a[j].idx = j
  10. }
  11. func (p *Pool) Push(x interface{}) {
  12. n := len(*p)
  13. item := x.(*Work)
  14. item.idx = n
  15. *p = append(*p, item)
  16. }
  17. func (p *Pool) Pop() interface{} {
  18. old := *p
  19. n := len(old)
  20. item := old[n-1]
  21. item.idx = -1 // for safety
  22. *p = old[0 : n-1]
  23. return item
  24. }

Load balancing (channels)

  • If the central REQ channel has a request coming in from clients, dispatch it to least loaded Worker and update the heap.
  • If DONE channel reports back, the work assigned to WOK(i) has been finished.
  1. func (b *Balancer) balance(req chan Request) {
  2. for {
  3. select {
  4. // extract request from REQ channel
  5. case request := <-req:
  6. b.dispatch(request)
  7. // read from DONE channel
  8. case w := <-b.done:
  9. b.completed(w)
  10. }
  11. b.print()
  12. }
  13. }
  14. func (b *Balancer) dispatch(req Request) {
  15. // Grab least loaded worker
  16. w := heap.Pop(&b.pool).(*Work)
  17. w.wok <- req
  18. w.pending++
  19. // Put it back into heap while it is working
  20. heap.Push(&b.pool, w)
  21. }
  22. func (b *Balancer) completed(w *Work) {
  23. w.pending--
  24. // remove from heap
  25. heap.Remove(&b.pool, w.idx)
  26. // Put it back
  27. heap.Push(&b.pool, w)
  28. }

Glueing the code

  • Imports and main
  • Adding a print
  1. package main
  2. import (
  3. "container/heap"
  4. "fmt"
  5. "math"
  6. "math/rand"
  7. "time"
  8. )
  9. const nRequester = 100
  10. const nWorker = 10
  11. func (b *Balancer) print() {
  12. sum := 0
  13. sumsq := 0
  14. // Print pending stats for each worker
  15. for _, w := range b.pool {
  16. fmt.Printf("%d ", w.pending)
  17. sum += w.pending
  18. sumsq += w.pending * w.pending
  19. }
  20. // Print avg for worker pool
  21. avg := float64(sum) / float64(len(b.pool))
  22. variance := float64(sumsq)/float64(len(b.pool)) - avg*avg
  23. fmt.Printf(" %.2f %.2f\n", avg, variance)
  24. }
  25. func main() {
  26. work := make(chan Request)
  27. for i := 0; i < nRequester; i++ {
  28. go createAndRequest(work)
  29. }
  30. InitBalancer().balance(work)
  31. }

Output

  • Here you can see number of pending tasks per worker.
  • Since work is just computing a sine value, I had to reduce sleep-time at the client level before they fire next request.
  1. 0 1 2 3 4 5 6 7 8 9 avg variance
  2. 5 6 8 8 8 8 8 8 8 8 7.50 1.05
  3. 4 6 8 8 8 8 8 8 8 8 7.40 1.64
  4. 3 6 8 8 8 8 8 8 8 8 7.30 2.41
  5. 2 6 8 8 8 8 8 8 8 8 7.20 3.36
  6. 1 6 8 8 8 8 8 8 8 8 7.10 4.49
  7. 1 5 8 8 8 8 8 8 8 8 7.00 4.80
  8. 1 5 8 8 7 8 8 8 8 8 6.90 4.69
  9. 1 5 8 8 6 8 8 8 8 8 6.80 4.76
  10. 1 4 8 8 6 8 8 8 8 8 6.70 5.21
  11. 1 4 8 8 6 8 8 8 8 7 6.60 5.04
  12. 1 4 8 7 6 8 8 8 8 7 6.50 4.85
  13. 1 4 8 7 6 8 8 8 7 7 6.40 4.64
  14. 1 4 7 7 6 8 8 8 7 7 6.30 4.41

Footnote

  • Although this is still a single-process LB, it makes you appreciate the flexibility of asynchronous behavior.
  • How channels communicate in form of a light-weight queue and offloading the tasks in form of goroutines is pretty amazing to me.
  • Also, all the book-keeping of acquiring/releasing a lock is hidden from programmer and all you need to focus on “sharing data using channels and not the variables” ;).
  • Event driven architecture is amazing concept. There’s a nice writeup on event-driven architecture I read on hackernews the other day that tells you when and when not to use it: https://herbertograca.com/2017/10/05/event-driven-architecture/

References

ft_authoradmin  ft_create_time2017-10-21 11:53
 ft_update_time2017-10-29 14:42