Concurrency is perhaps the Go programming languages greatest strength, however, the official advice is to use it sparingly. To treat it with suspicion when you actually see it in the wild.
Whilst I understand this advice, it does feel like a shame that you rarely get to use it. I found myself needing to write a pipeline of tasks, and channels seemed like a perfect fit.
This series will go through some of the common patterns and concurrent algorithms used in creating streaming data architectures.
Another, more recent feature of the Go programming language is Generics. Another feature we’re told to avoid if possible. Which, again, I agree with, but feels a shame. It just so happens I think data pipelines are also a pretty good use case for generics. The aim of this mini-series will be to create a framework for creating generic, streaming pipelines.
The first pattern we will take a look at is the ‘fan-in’ pattern.
Fan In
Fan in is perhaps the simplest, iterate through each input channel, take each output and stream it into a single output channel:
func MergeIn[T any](streams ...<-chan T) chan T {
out := make(chan T)
var wg sync.WaitGroup
output := func(stream <-chan T) {
defer wg.Done()
for val := range stream {
out <- val
}
}
wg.Add(len(streams))
for _, stream := range streams {
go output(stream)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Let’s go through this example line by line.
First of all, you will notice we’re using the new generics language feature, telling the compiler to accept the type T, which can be anything. This is useful because now our pipeline can be re-used for various types and use cases. The function then takes in a slice of channels, using a variadic argument, where we can pass in as many arguments as we like. These are then treated as a slice.
Secondly, we create an empty ‘output’ channel, this will store the data inputted into each of the input channels. We then create a WaitGroup. A WaitGroup in Golang is a syncronisation primitive from the ‘sync’ package. This allows us to create a sort of semaphore data structure. Whereby we tell our code to wait until a certain amount of actions have taken place before moving on. In this case, we want to wait until each of the input streams have been closed.
Our callback function output
iterates through each stream (passed as an argument). Performing a ‘range’ on a channel in Go blocks until the channel is closed. Once that input channel is closed, defer wg.Done()
is called. This decrements the WaitGroup’s counter. Once wg.Done()
is called to an equal amount of times as wg.Add()
is called, then wg.Wait()
unblocks, and the program can continue as it’s no longer waiting for channels to close.
Below our callback function, we call wg.Add(len(streams))
as mentioned above. This tells the WaitGroup to wait for however many input streams have been provided. Next, we iterate through each input stream and pass them into the callback function using a Go routine. Without using a Go routine here, the process would block on the first channel, and the channels after that would simply be ignored and the program would block indefinitely.
Introducing the go routine for each channel would mean that the function would return before finishing with each of the input streams. Which is why the WaitGroup is vital here.
Finally, we have another Go routine which waits for the WaitGroup to be complete, and then closes the output channel. This is important because we don’t want to block the output channel from being returned for further processing in the next stages of a pipeline pattern. But, we do need to close the output channel when all of the input channels have been read. Otherwise, any code reading from the returned ‘output’ channel, would also never get the signal to complete.
func main() {
a := make(chan int)
b := make(chan int)
c := make(chan int)
go func() {
a <- 1
a <- 2
a <- 3
}()
output := MergeIn(a, b, c)
for val := range output {
log.Println("value:", val)
}
}
And there we have it! You might consider this to be a needlessly lengthy post for the sakes of 23 lines of code. But, the reason Go programmers are urged to exercise caution when it comes to concurrency, is that it’s behaviour is far less obvious than regular procedural, syncronous code. It’s important to understand, and try to imagine the various possible states a concurrent data structure could be in at any given time. Which means a mere 23 lines of code has a complexity orders of magnitudes greater than it’s non-concurrent equivalent.
I hope you found this useful, in the next post I will be looking at ‘fan-out’, which has some quirks and complexities of it’s own.