Some Golang concurrency examples

Table of Contents

H202 example with Oxygen atom as leader

1type H2O2FactoryWithLeader struct {
2    oxygenMutex chan struct{}
3    precomH     chan chan struct{}
4    precomO     chan chan struct{}
5}
6
7func NewFactoryWithLeader() H2O2FactoryWithLeader {
8    h2o2f := H2O2FactoryWithLeader{
9        oxygenMutex: make(chan struct{}, 1),
10        precomH:     make(chan chan struct{}),
11        precomO:     make(chan chan struct{}),
12    }
13    h2o2f.oxygenMutex <- struct{}{} //mutex can be simulated by reading and writing to a channel
14    return h2o2f
15}
16
17func (h2o2f *H2O2FactoryWithLeader) hydrogen(bond func()) {
18    commit := make(chan struct{}) 
19    h2o2f.precomH <- commit       // Insert channel to leader for communication
20    <-commit                      // Listen for permission to bond
21    bond()                        
22    commit <- struct{}{}          // Notify that the bonding is done
23}
24
25func (h2o2f *H2O2FactoryWithLeader) oxygen(bond func()) {
26    // create communication channel for the case where oxygen atom is follower
27    commit := make(chan struct{}) 
28
29    select {
30    case <-h2o2f.oxygenMutex: // Leader
31        // Receive arrival requests from 2 hydrogen atoms, and 1 oxygen atom
32        h1 := <-h2o2f.precomH
33        h2 := <-h2o2f.precomH
34        o2 := <-h2o2f.precomO
35
36        // Tell the other atoms to start bonding
37        h1 <- struct{}{}
38        h2 <- struct{}{}
39        o2 <- struct{}{}
40
41        bond()
42
43        // Wait until the the other atoms to finish
44        <-h1
45        <-h2
46        <-o2
47
48        // Step down from being leader
49        h2o2f.oxygenMutex <- struct{}{}
50
51    case h2o2f.precomO <- commit: // Follower
52        <-commit             // Listen for permission to bond
53        bond()               
54        commit <- struct{}{} // Update that bonding has been completed
55    }
56}

Fan-in Fan-out (Example 1)

1type Event struct {
2    id       int64
3    procTime time.Duration
4}
5
6type Worker struct {
7    inputChan  <-chan Event
8    outputChan chan<- Event
9}
10
11type EventFunc func(Event) Event
12
13func main() {
14    numChannel := runtime.NumCPU()
15    processedChannel := make(chan Event, 5)
16    finalChannel := make(chan Event, 5)
17    inputStream := generateEvents()
18
19    var wg sync.WaitGroup
20    ctx, cancel := context.WithCancel(context.Background())
21    defer cancel()
22
23    // Start workers
24    for i := 0; i < numChannel; i++ {
25        wg.Add(1)
26        worker := Worker{
27            inputChan:  inputStream,
28            outputChan: processedChannel,
29        }
30        worker.start(ctx, &wg, func(e Event) Event {
31            time.Sleep(e.procTime)
32            return e
33        })
34    }
35
36    // Serialize processed events
37    serializer(processedChannel, finalChannel)
38
39    // Read and print events
40    readerDone := make(chan interface{})
41    go read(finalChannel, readerDone)
42
43    wg.Wait()
44    close(processedChannel)
45    <-readerDone
46}
47
48func read(finalChannel <-chan Event, readerDone chan interface{}) {
49    go func() {
50        for output := range finalChannel {
51            fmt.Printf("Event id: %d\n", output.id)
52        }
53        close(readerDone)
54    }()
55}
56
57func generateEvents() <-chan Event {
58    dataChan := make(chan Event)
59    go func() {
60        for i := 0; i <= 30; i++ {
61            event := Event{
62                id:       int64(i),
63                procTime: time.Duration(i%3+1) * time.Second,
64            }
65            dataChan <- event
66        }
67        close(dataChan)
68    }()
69    return dataChan
70}
71
72func serializer(processedChannel <-chan Event, finalChannel chan<- Event) {
73    eventMap := make(map[int64]Event)
74    var currentEventId int64 = 1
75
76    go func() {
77        for event := range processedChannel {
78            if event.id == currentEventId {
79                finalChannel <- event
80                currentEventId++
81                for {
82                    if next, found := eventMap[currentEventId]; found {
83                        finalChannel <- next
84                        delete(eventMap, currentEventId)
85                        currentEventId++
86                    } else {
87                        break
88                    }
89                }
90            } else {
91                eventMap[event.id] = event
92            }
93        }
94        close(finalChannel)
95    }()
96}
97
98func (w *Worker) start(ctx context.Context, wg *sync.WaitGroup, fn EventFunc) {
99    go func() {
100        defer wg.Done()
101        for {
102            select {
103            case e, more := <-w.inputChan:
104                if !more {
105                    return
106                }
107                select {
108                case w.outputChan <- fn(e):
109                case <-ctx.Done():
110                    return
111                }
112            case <-ctx.Done():
113                return
114            }
115        }
116    }()
117}

Fan-in Fan-out (Example 2)

1type Event struct {
2    id       int64
3    procTime time.Duration
4}
5
6type EventFunc func(Event) Event
7
8type worker struct {
9    inputCh  chan Event
10    outputCh chan Event
11}
12
13func newWorker() *worker {
14    return &worker{
15        inputCh:  make(chan Event),
16        outputCh: make(chan Event),
17    }
18}
19
20func (w *worker) start(
21    ctx context.Context,
22    fn EventFunc, workerQueue chan *worker, wg *sync.WaitGroup,
23) {
24    go func() {
25        defer func() {
26            close(w.outputCh)
27            wg.Done()
28        }()
29        for {
30            select {
31            case workerQueue <- w: // <-- sign up for work
32                e := <-w.inputCh
33                w.outputCh <- fn(e)
34            case <-ctx.Done():
35                return
36            }
37        }
38    }()
39}
40
41func orderedMux(
42    ctx context.Context, cancel context.CancelFunc,
43    inputCh chan Event, workerQueue chan *worker, workerOutputCh chan chan Event,
44) {
45    go func() {
46        for {
47            select {
48            case e, more := <-inputCh:
49                if !more {
50                    cancel()
51                    return
52                }
53                select {
54                case w := <-workerQueue:
55                    workerOutputCh <- w.outputCh
56                    w.inputCh <- e
57                case <-ctx.Done():
58                    return
59                }
60            case <-ctx.Done():
61                return
62            }
63        }
64    }()
65}
66
67func genEventsCh() chan Event {
68    dataCh := make(chan Event)
69    go func() {
70        counter := int64(1)
71        rand.Seed(time.Now().Unix())
72        for i := 0; i < 30; i++ {
73            dataCh <- Event{
74                id:       counter,
75                procTime: time.Duration(rand.Intn(100)) * time.Millisecond,
76            }
77            counter++
78        }
79        close(dataCh)
80    }()
81    return dataCh
82}
83
84func main() {
85    numWorkers := 10
86    ctx, cancel := context.WithCancel(context.Background())
87    workerQueue := make(chan *worker)
88    workerOutputCh := make(chan chan Event, numWorkers)
89    inputCh := genEventsCh()
90
91    orderedMux(ctx, cancel, inputCh, workerQueue, workerOutputCh)
92
93    var wg sync.WaitGroup
94    for i := 0; i < numWorkers; i++ {
95        wg.Add(1)
96        newWorker().start(ctx, func(e Event) Event {
97            time.Sleep(e.procTime)
98            return e
99        }, workerQueue, &wg)
100    }
101
102    readerDone := make(chan struct{})
103    go func() {
104        for outputCh := range workerOutputCh {
105            output, more := <-outputCh
106            if !more {
107                break
108            }
109            fmt.Printf("Event id: %d\n", output.id)
110        }
111        close(readerDone)
112    }()
113
114    wg.Wait()
115    close(workerOutputCh)
116    <-readerDone
117    cancel()
118}

Pipeline

1func main() {
2    inputsGenerated := generate()
3    multiplyRes := multiply(inputsGenerated, 2)
4    addRes := add(multiplyRes, 2)
5    for num := range addRes {
6        fmt.Println(num)
7    }
8}
9
10func generate() <-chan int {
11    ch := make(chan int)
12    go func() {
13        nums := [5]int{1, 2, 3, 4, 5}
14        for _, num := range nums {
15            ch <- num
16        }
17        close(ch)
18    }()
19    return ch
20}
21
22func multiply(input <-chan int, constant int) <-chan int {
23    output := make(chan int)
24    go func() {
25        for num := range input {
26            output <- num * constant
27        }
28        close(output)
29    }()
30    return output
31}
32
33func add(input <-chan int, constant int) <-chan int {
34    output := make(chan int)
35    go func() {
36        for num := range input {
37            output <- num + constant
38        }
39        close(output)
40    }()
41    return output
42}

Load balancer

1import (
2    "container/heap"
3    "fmt"
4)
5
6type Worker struct {
7    requests chan Request // Channel to receive requests
8    pending  int           // Number of pending requests
9    index    int           // Index in the heap
10}
11
12type Request struct {
13    payload string
14}
15
16// Pool is an array of workers, implementing heap.Interface
17type Pool []*Worker
18
19// Len is part of heap.Interface
20func (p Pool) Len() int {
21    return len(p)
22}
23
24// Less is part of heap.Interface
25// We want the worker with fewer pending jobs to come first
26func (p Pool) Less(i, j int) bool {
27    return p[i].pending < p[j].pending
28}
29
30// Swap is part of heap.Interface
31func (p Pool) Swap(i, j int) {
32    p[i], p[j] = p[j], p[i]
33    p[i].index = i
34    p[j].index = j
35}
36
37// Push is called when adding a new element into the heap
38func (p *Pool) Push(x any) {
39    n := len(*p)
40    worker := x.(*Worker)
41    worker.index = n
42    *p = append(*p, worker)
43}
44
45// Pop is called when removing the smallest element from the heap
46func (p *Pool) Pop() any {
47    old := *p
48    n := len(old)
49    worker := old[n-1]
50    worker.index = -1 // Mark as removed
51    *p = old[0 : n-1]
52    return worker
53}
54
55// Balancer struct holds the pool
56type Balancer struct {
57    pool Pool
58}
59
60// dispatch sends a request to the least loaded worker
61func (b *Balancer) dispatch(req Request) {
62    // Pick the least loaded worker
63    w := heap.Pop(&b.pool).(*Worker)
64    w.requests <- req
65    w.pending++
66    heap.Push(&b.pool, w)
67}
68
69// completed tells the balancer a worker has completed a job
70func (b *Balancer) completed(w *Worker) {
71    w.pending--
72    heap.Remove(&b.pool, w.index)
73    heap.Push(&b.pool, w)
74}
75
76func main() {
77    worker1 := &Worker{requests: make(chan Request, 10)}
78    worker2 := &Worker{requests: make(chan Request, 10)}
79    worker3 := &Worker{requests: make(chan Request, 10)}
80
81    b := &Balancer{
82        pool: []*Worker{worker1, worker2, worker3},
83    }
84
85    heap.Init(&b.pool)
86
87    b.dispatch(Request{payload: "task 1"})
88    b.dispatch(Request{payload: "task 2"})
89    b.dispatch(Request{payload: "task 3"})
90
91    b.completed(worker1)
92
93    fmt.Println("Worker 1 pending:", worker1.pending)
94    fmt.Println("Worker 2 pending:", worker2.pending)
95    fmt.Println("Worker 3 pending:", worker3.pending)
96}