Channels of Go are great to deal with real concurrent programs. By real, I mean server or daemon processes which are supposed to work or listen permanently, without a down time. The posts on Go blog (1) (2) are very useful to learn the best practices and to understand the merits of channels.

On the other hand, when I use Go as a C substitute for my algorithms, I deal with bulk data. Often, I have to process an array of data as fast as possible. Although concurrency is not parallelism, I use the goroutines to better utilize 8 (pseudo-)cores of my CPU. A system similar to the map-reduce framework often comes handy: multiple heavy processors and multiple filters. This blog post about pipelines recommends the use of quit channels for graceful shutdowns but their shutdown is not as graceful as I want. As such, using that system does not give me the guaranty of completion.

Let me elaborate with an example. Here are three functions:

  • process iterates over its assigned slice, process each element and send the processed value to out channel. When it completes its slice, it signals wg.Done().
  • filter reads a value from in channel and adds it to the final list if it is eligible.
  • main is a glue code that starts a goroutine for the filter, shares the data between two processors and prints out the final values when the processors are finished.
func filter(final *[]Type, in chan *Type) {
  for {
    v := <-in
    if verify(v) {
      *final = append(final, *v)
    }
  }
}

func processor(data []int, out chan *Type, *wg sync.WaitGroup){
  for _, v := range data {
    out <-timeConsumingOperation(v)
  }
  wg.Done()
}

func main(){
  data := readData()
  pipe := make(chan *Type,10)
  final := make([]Type,0,100)

  var wg sync.WaitGroup
  
  go filter(&final, pipe)

  half := len(data)/2
  wg.Add(2)
  go process(data[:half], pipe, &wg)
  go process(data[half:], pipe, &wg)

  wg.Wait()
  
  fmt.Println(final)
}

Well, as experienced gophers can easily see it has a problem: it exits prematurely. We don’t wait for filter to finish.

Monitoring the conclusion of filter is not trivial. Since it’s in an infinite loop, we cannot just use sync.WaitGroup. Let’s start with the question: when should it say it is finished?

  • When all the processors are complete and there is no data in the channel. Task of signaling filter about the completion of processors looks like a perfect fit for quit channels, as mentioned in the blog post. We can introduce a select statement and quit channel and let filter deal with its completion. However, I couldn’t find an elegant and race condition safe solution using quit channel.

I turned to collective knowledge and search for an elegant solution. I came accross this very nice answer on stackoverflow and I learned that receive operator returns two values: next value on the channel and the channel condition. Here is the table of possible values for v, ok := <-myChannel:

  len(myChannel) == 0 len(myChannel) > 0
Channel open Block v == value
ok == true
Channel closed v == ZeroValue
ok == false
v == value
ok == false

Therefore, signalling of the completion of processors can be done over one channel, elegantly.

Final code looks like this:

func filter(final *[]Type, in chan *Type, complete chan bool) {
  for {
    v,ok := <-in
    if !ok && v == nil{  // NEW
      complete <-true    // NEW
    }                    // NEW
    if verify(v) {
      *final = append(final, *v)
    }
  }
}

func processor(data []int, out chan *Type, *wg sync.WaitGroup){
  for _,v := range data {
    out <-timeConsumingOperation(v)
  }
  wg.Done()
}

func main(){
  data := readData()
  pipe := make(chan *Type,10)
  final := make([]Type,0,100)

  var wg sync.WaitGroup
  
  go filter(&final, pipe)

  half := len(data)/2
  wg.Add(2)
  go process(data[:half], pipe, &wg)
  go process(data[half:], pipe, &wg)

  wg.Wait()

  close(pipe) // NEW
  <-complete  // NEW
  
  fmt.Println(final)
}

PS: While I was preparing the example, I realized that Zero value used often for some types, expecially built-ins like int and float. I’m not yet sure about how to handle them. Maybe I’ll return to it in a later post.