Some Golang concurrency examples
H202 example with Oxygen atom as leader
1type H2O2FactoryWithLeader struct {
2 oxygenMutex chan struct{}
3 precomH chan chan struct{}
4 precomO chan chan struct{}
5}
6
7func NewFactoryWithLeader() H2O2FactoryWithLeader {
8 h2o2f := H2O2FactoryWithLeader{
9 oxygenMutex: make(chan struct{}, 1),
10 precomH: make(chan chan struct{}),
11 precomO: make(chan chan struct{}),
12 }
13 h2o2f.oxygenMutex <- struct{}{} //mutex can be simulated by reading and writing to a channel
14 return h2o2f
15}
16
17func (h2o2f *H2O2FactoryWithLeader) hydrogen(bond func()) {
18 commit := make(chan struct{})
19 h2o2f.precomH <- commit // Insert channel to leader for communication
20 <-commit // Listen for permission to bond
21 bond()
22 commit <- struct{}{} // Notify that the bonding is done
23}
24
25func (h2o2f *H2O2FactoryWithLeader) oxygen(bond func()) {
26 // create communication channel for the case where oxygen atom is follower
27 commit := make(chan struct{})
28
29 select {
30 case <-h2o2f.oxygenMutex: // Leader
31 // Receive arrival requests from 2 hydrogen atoms, and 1 oxygen atom
32 h1 := <-h2o2f.precomH
33 h2 := <-h2o2f.precomH
34 o2 := <-h2o2f.precomO
35
36 // Tell the other atoms to start bonding
37 h1 <- struct{}{}
38 h2 <- struct{}{}
39 o2 <- struct{}{}
40
41 bond()
42
43 // Wait until the the other atoms to finish
44 <-h1
45 <-h2
46 <-o2
47
48 // Step down from being leader
49 h2o2f.oxygenMutex <- struct{}{}
50
51 case h2o2f.precomO <- commit: // Follower
52 <-commit // Listen for permission to bond
53 bond()
54 commit <- struct{}{} // Update that bonding has been completed
55 }
56}
Fan-in Fan-out (Example 1)
1type Event struct {
2 id int64
3 procTime time.Duration
4}
5
6type Worker struct {
7 inputChan <-chan Event
8 outputChan chan<- Event
9}
10
11type EventFunc func(Event) Event
12
13func main() {
14 numChannel := runtime.NumCPU()
15 processedChannel := make(chan Event, 5)
16 finalChannel := make(chan Event, 5)
17 inputStream := generateEvents()
18
19 var wg sync.WaitGroup
20 ctx, cancel := context.WithCancel(context.Background())
21 defer cancel()
22
23 // Start workers
24 for i := 0; i < numChannel; i++ {
25 wg.Add(1)
26 worker := Worker{
27 inputChan: inputStream,
28 outputChan: processedChannel,
29 }
30 worker.start(ctx, &wg, func(e Event) Event {
31 time.Sleep(e.procTime)
32 return e
33 })
34 }
35
36 // Serialize processed events
37 serializer(processedChannel, finalChannel)
38
39 // Read and print events
40 readerDone := make(chan interface{})
41 go read(finalChannel, readerDone)
42
43 wg.Wait()
44 close(processedChannel)
45 <-readerDone
46}
47
48func read(finalChannel <-chan Event, readerDone chan interface{}) {
49 go func() {
50 for output := range finalChannel {
51 fmt.Printf("Event id: %d\n", output.id)
52 }
53 close(readerDone)
54 }()
55}
56
57func generateEvents() <-chan Event {
58 dataChan := make(chan Event)
59 go func() {
60 for i := 0; i <= 30; i++ {
61 event := Event{
62 id: int64(i),
63 procTime: time.Duration(i%3+1) * time.Second,
64 }
65 dataChan <- event
66 }
67 close(dataChan)
68 }()
69 return dataChan
70}
71
72func serializer(processedChannel <-chan Event, finalChannel chan<- Event) {
73 eventMap := make(map[int64]Event)
74 var currentEventId int64 = 1
75
76 go func() {
77 for event := range processedChannel {
78 if event.id == currentEventId {
79 finalChannel <- event
80 currentEventId++
81 for {
82 if next, found := eventMap[currentEventId]; found {
83 finalChannel <- next
84 delete(eventMap, currentEventId)
85 currentEventId++
86 } else {
87 break
88 }
89 }
90 } else {
91 eventMap[event.id] = event
92 }
93 }
94 close(finalChannel)
95 }()
96}
97
98func (w *Worker) start(ctx context.Context, wg *sync.WaitGroup, fn EventFunc) {
99 go func() {
100 defer wg.Done()
101 for {
102 select {
103 case e, more := <-w.inputChan:
104 if !more {
105 return
106 }
107 select {
108 case w.outputChan <- fn(e):
109 case <-ctx.Done():
110 return
111 }
112 case <-ctx.Done():
113 return
114 }
115 }
116 }()
117}
Fan-in Fan-out (Example 2)
1type Event struct {
2 id int64
3 procTime time.Duration
4}
5
6type EventFunc func(Event) Event
7
8type worker struct {
9 inputCh chan Event
10 outputCh chan Event
11}
12
13func newWorker() *worker {
14 return &worker{
15 inputCh: make(chan Event),
16 outputCh: make(chan Event),
17 }
18}
19
20func (w *worker) start(
21 ctx context.Context,
22 fn EventFunc, workerQueue chan *worker, wg *sync.WaitGroup,
23) {
24 go func() {
25 defer func() {
26 close(w.outputCh)
27 wg.Done()
28 }()
29 for {
30 select {
31 case workerQueue <- w: // <-- sign up for work
32 e := <-w.inputCh
33 w.outputCh <- fn(e)
34 case <-ctx.Done():
35 return
36 }
37 }
38 }()
39}
40
41func orderedMux(
42 ctx context.Context, cancel context.CancelFunc,
43 inputCh chan Event, workerQueue chan *worker, workerOutputCh chan chan Event,
44) {
45 go func() {
46 for {
47 select {
48 case e, more := <-inputCh:
49 if !more {
50 cancel()
51 return
52 }
53 select {
54 case w := <-workerQueue:
55 workerOutputCh <- w.outputCh
56 w.inputCh <- e
57 case <-ctx.Done():
58 return
59 }
60 case <-ctx.Done():
61 return
62 }
63 }
64 }()
65}
66
67func genEventsCh() chan Event {
68 dataCh := make(chan Event)
69 go func() {
70 counter := int64(1)
71 rand.Seed(time.Now().Unix())
72 for i := 0; i < 30; i++ {
73 dataCh <- Event{
74 id: counter,
75 procTime: time.Duration(rand.Intn(100)) * time.Millisecond,
76 }
77 counter++
78 }
79 close(dataCh)
80 }()
81 return dataCh
82}
83
84func main() {
85 numWorkers := 10
86 ctx, cancel := context.WithCancel(context.Background())
87 workerQueue := make(chan *worker)
88 workerOutputCh := make(chan chan Event, numWorkers)
89 inputCh := genEventsCh()
90
91 orderedMux(ctx, cancel, inputCh, workerQueue, workerOutputCh)
92
93 var wg sync.WaitGroup
94 for i := 0; i < numWorkers; i++ {
95 wg.Add(1)
96 newWorker().start(ctx, func(e Event) Event {
97 time.Sleep(e.procTime)
98 return e
99 }, workerQueue, &wg)
100 }
101
102 readerDone := make(chan struct{})
103 go func() {
104 for outputCh := range workerOutputCh {
105 output, more := <-outputCh
106 if !more {
107 break
108 }
109 fmt.Printf("Event id: %d\n", output.id)
110 }
111 close(readerDone)
112 }()
113
114 wg.Wait()
115 close(workerOutputCh)
116 <-readerDone
117 cancel()
118}
Pipeline
1func main() {
2 inputsGenerated := generate()
3 multiplyRes := multiply(inputsGenerated, 2)
4 addRes := add(multiplyRes, 2)
5 for num := range addRes {
6 fmt.Println(num)
7 }
8}
9
10func generate() <-chan int {
11 ch := make(chan int)
12 go func() {
13 nums := [5]int{1, 2, 3, 4, 5}
14 for _, num := range nums {
15 ch <- num
16 }
17 close(ch)
18 }()
19 return ch
20}
21
22func multiply(input <-chan int, constant int) <-chan int {
23 output := make(chan int)
24 go func() {
25 for num := range input {
26 output <- num * constant
27 }
28 close(output)
29 }()
30 return output
31}
32
33func add(input <-chan int, constant int) <-chan int {
34 output := make(chan int)
35 go func() {
36 for num := range input {
37 output <- num + constant
38 }
39 close(output)
40 }()
41 return output
42}
Load balancer
1import (
2 "container/heap"
3 "fmt"
4)
5
6type Worker struct {
7 requests chan Request // Channel to receive requests
8 pending int // Number of pending requests
9 index int // Index in the heap
10}
11
12type Request struct {
13 payload string
14}
15
16// Pool is an array of workers, implementing heap.Interface
17type Pool []*Worker
18
19// Len is part of heap.Interface
20func (p Pool) Len() int {
21 return len(p)
22}
23
24// Less is part of heap.Interface
25// We want the worker with fewer pending jobs to come first
26func (p Pool) Less(i, j int) bool {
27 return p[i].pending < p[j].pending
28}
29
30// Swap is part of heap.Interface
31func (p Pool) Swap(i, j int) {
32 p[i], p[j] = p[j], p[i]
33 p[i].index = i
34 p[j].index = j
35}
36
37// Push is called when adding a new element into the heap
38func (p *Pool) Push(x any) {
39 n := len(*p)
40 worker := x.(*Worker)
41 worker.index = n
42 *p = append(*p, worker)
43}
44
45// Pop is called when removing the smallest element from the heap
46func (p *Pool) Pop() any {
47 old := *p
48 n := len(old)
49 worker := old[n-1]
50 worker.index = -1 // Mark as removed
51 *p = old[0 : n-1]
52 return worker
53}
54
55// Balancer struct holds the pool
56type Balancer struct {
57 pool Pool
58}
59
60// dispatch sends a request to the least loaded worker
61func (b *Balancer) dispatch(req Request) {
62 // Pick the least loaded worker
63 w := heap.Pop(&b.pool).(*Worker)
64 w.requests <- req
65 w.pending++
66 heap.Push(&b.pool, w)
67}
68
69// completed tells the balancer a worker has completed a job
70func (b *Balancer) completed(w *Worker) {
71 w.pending--
72 heap.Remove(&b.pool, w.index)
73 heap.Push(&b.pool, w)
74}
75
76func main() {
77 worker1 := &Worker{requests: make(chan Request, 10)}
78 worker2 := &Worker{requests: make(chan Request, 10)}
79 worker3 := &Worker{requests: make(chan Request, 10)}
80
81 b := &Balancer{
82 pool: []*Worker{worker1, worker2, worker3},
83 }
84
85 heap.Init(&b.pool)
86
87 b.dispatch(Request{payload: "task 1"})
88 b.dispatch(Request{payload: "task 2"})
89 b.dispatch(Request{payload: "task 3"})
90
91 b.completed(worker1)
92
93 fmt.Println("Worker 1 pending:", worker1.pending)
94 fmt.Println("Worker 2 pending:", worker2.pending)
95 fmt.Println("Worker 3 pending:", worker3.pending)
96}