Go: Broadcast channels?

https://science.mroman.ch/gobroadcastchannels.html

In Go channels are a very key and convenient mechanism for communication. However, if we want to pass a message to multiple listeners then we need multiple channels. That is, writes to channels aren’t broadcast. As soon as one listener reads a value from the channel that value is removed from the channel and other listeners listening on the same channel will not see that value. If we want to send a message (or value) to multiple listeners we need to write that message to (N) different channels.

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. ch := make(chan int)
  8. var wg sync.WaitGroup
  9. wg.Add(2)
  10. go func() {
  11. for v := range ch {
  12. fmt.Println("A", v)
  13. }
  14. wg.Done()
  15. }()
  16. go func() {
  17. for v := range ch {
  18. fmt.Println("B", v)
  19. }
  20. wg.Done()
  21. }()
  22. ch <- 1
  23. ch <- 2
  24. close(ch)
  25. wg.Wait()
  26. }

The above example will print two lines with the second field being 1 and 2. Depending on the exact scheduling it might print A 1 \ B 2, or A 1 \ A 2 or B 1 \ B 2 or any other such variant but each go routine will print one line as each go routine sees one value. If we want each value to be seen by all listening go routines we need more channels.

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. chA := make(chan int)
  8. chB := make(chan int)
  9. var wg sync.WaitGroup
  10. wg.Add(2)
  11. go func() {
  12. for v := range chA {
  13. fmt.Println("A", v)
  14. }
  15. wg.Done()
  16. }()
  17. go func() {
  18. for v := range chB {
  19. fmt.Println("B", v)
  20. }
  21. wg.Done()
  22. }()
  23. for i := 0; i < 10; i++ {
  24. chA <- i
  25. chB <- i
  26. }
  27. close(chA)
  28. close(chB)
  29. wg.Wait()
  30. }

Now each listener sees every value. We’re not quite happy with this though. Now we have to keep track of many channels and what if we want to dynamically add or remove listeners? Let’s look at one first intermediate improvement.

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. chA := make(chan int)
  8. chB := make(chan int)
  9. **chBroadcast := make(chan int)**
  10. var wg sync.WaitGroup
  11. wg.Add(3)
  12. **go func() {
  13. for v := range chBroadcast {
  14. chA <- v
  15. chB <- v
  16. }
  17. close(chA)
  18. close(chB)
  19. wg.Done()
  20. }()**
  21. go func() {
  22. for v := range chA {
  23. fmt.Println("A", v)
  24. }
  25. wg.Done()
  26. }()
  27. go func() {
  28. for v := range chB {
  29. fmt.Println("B", v)
  30. }
  31. wg.Done()
  32. }()
  33. for i := 0; i < 10; i++ {
  34. **chBroadcast <- i**
  35. }
  36. close(chBroadcast)
  37. wg.Wait()
  38. }

Notice the difference? Now we only have to write our values to one single channel chBroadcast. This is now our broadcast channel. But now let’s get more complicated with dynamically adding and removing listeners!

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. type BroadcastService struct {
  7. // This is the channel the service will listen on...
  8. chBroadcast chan int
  9. // and forward it to these.
  10. chListeners []chan int
  11. // Requests for new listeners to be added...
  12. chNewRequests chan (chan int)
  13. // Requests for listeners to be removed...
  14. chRemoveRequests chan (chan int)
  15. }
  16. // Create a new BroadcastService.
  17. func NewBroadcastService() *BroadcastService {
  18. return &BroadcastService{
  19. chBroadcast: make(chan int),
  20. chListeners: make([]chan int, 3),
  21. chNewRequests: make(chan (chan int)),
  22. chRemoveRequests: make(chan (chan int)),
  23. }
  24. }
  25. // This creates a new listener and returns the channel a goroutine
  26. // should listen on.
  27. func (bs *BroadcastService) Listener() chan int {
  28. ch := make(chan int)
  29. bs.chNewRequests <- ch
  30. return ch
  31. }
  32. // This removes a listener.
  33. func (bs *BroadcastService) RemoveListener(ch chan int) {
  34. bs.chRemoveRequests <- ch
  35. }
  36. func (bs *BroadcastService) addListener(ch chan int) {
  37. for i, v := range bs.chListeners {
  38. if v == nil {
  39. bs.chListeners[i] = ch
  40. return
  41. }
  42. }
  43. bs.chListeners = append(bs.chListeners, ch)
  44. }
  45. func (bs *BroadcastService) removeListener(ch chan int) {
  46. for i, v := range bs.chListeners {
  47. if v == ch {
  48. bs.chListeners[i] = nil
  49. // important to close! otherwise the goroutine listening on it
  50. // might block forever!
  51. close(ch)
  52. return
  53. }
  54. }
  55. }
  56. func (bs *BroadcastService) Run() chan int {
  57. go func() {
  58. for {
  59. // process requests for new listeners or removal of listeners
  60. select {
  61. case newCh := <-bs.chNewRequests:
  62. bs.addListener(newCh)
  63. case removeCh := <-bs.chRemoveRequests:
  64. bs.removeListener(removeCh)
  65. case v, ok := <-bs.chBroadcast:
  66. // terminate everything if the input channel is closed
  67. if !ok {
  68. goto terminate
  69. }
  70. // forward the value to all channels
  71. for _, dstCh := range bs.chListeners {
  72. if dstCh == nil {
  73. continue
  74. }
  75. dstCh <- v
  76. }
  77. }
  78. }
  79. terminate:
  80. // close all listeners
  81. for _, dstCh := range bs.chListeners {
  82. if dstCh == nil {
  83. continue
  84. }
  85. close(dstCh)
  86. }
  87. }()
  88. return bs.chBroadcast
  89. }
  90. func main() {
  91. bs := NewBroadcastService()
  92. chBroadcast := bs.Run()
  93. chA := bs.Listener()
  94. chB := bs.Listener()
  95. var wg sync.WaitGroup
  96. wg.Add(2)
  97. go func() {
  98. for v := range chA {
  99. fmt.Println("A", v)
  100. }
  101. wg.Done()
  102. }()
  103. go func() {
  104. for v := range chB {
  105. fmt.Println("B", v)
  106. }
  107. wg.Done()
  108. }()
  109. for i := 0; i < 3; i++ {
  110. chBroadcast <- i
  111. }
  112. bs.RemoveListener(chA)
  113. for i := 3; i < 6; i++ {
  114. chBroadcast <- i
  115. }
  116. close(chBroadcast)
  117. wg.Wait()
  118. }
ft_authoradmin  ft_create_time2019-06-05 11:19
 ft_update_time2019-06-05 11:20