When it comes to performance and efficiently utilizing the available resources in modern systems, Go is unparalleled. Ask Docker.
When it comes to concurrent applications, worker pools play a crucial role in managing and distributing the work across a fixed number of worker threads. A common scenario where the worker pools come in hand is when performing the parallel processing such as the incoming requests of a web server and more.
In this guide, we will dive further in the world of concurrent programming in the Go programming language and learn more about the worker pools and how we can utilize them.
Introduction to Workers and Worker Pools
What Is a Worker?
In concurrent programming, a worker refers to a goroutine or a thread that is responsible for executing a specific task or function in a concurrent manner.
Workers are incredible as they can perform the independent units of work in parallel.
What Is a Worker Pool?
A worker pool, on the other hand, refers to a collection of worker goroutines that can work together to process a batch of tasks in an efficient manner.
Worker pools allow us to limit the number of concurrent goroutines that execute the tasks to prevent an excessive resource consumption. This helps to control the level of parallelism for a given program.
Some common characteristics of worker pools include the following:
- Fixed Pool Size – Worker pools have a fixed number of workers goroutines and the tasks are distributed among these workers.
This avoids the overhead of dynamically creating and destroying the goroutines for each task.
- Task Queue – Tasks are typically organized in a queue or a channel. Workers retrieve the tasks from the queue and execute them concurrently.
- Load Balancing – Worker pools distribute the tasks evenly among the workers, ensuring that the workload is balanced.
Let us move on to the steps of working with worker pools in Go.
Worker Pools In Go: Create a Worker Pool
Let us start by creating a simple worker pool. We use the goroutines and channels which are the most fundamental and powerful primitives for concurrent programming in Go.
Take a look at the following code:
import (
"fmt"
"sync"
)
type Worker struct {
ID int
TaskCh chan Task
QuitCh chan bool
}
type Task func()
func NewWorker(id int) *Worker {
return &Worker{
ID: id,
TaskCh: make(chan Task),
QuitCh: make(chan bool),
}
}
func (w *Worker) Start(wg *sync.WaitGroup, taskCh <-chan Task) {
go func() {
defer wg.Done()
for {
select {
case task, ok := <-taskCh:
if !ok {
return
}
task()
case <-w.QuitCh:
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.QuitCh <- true
}()
}
type WorkerPool struct {
Workers []*Worker
TaskCh chan Task
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
taskCh := make(chan Task)
pool := &WorkerPool{
Workers: make([]*Worker, numWorkers),
TaskCh: taskCh,
}
for i := 0; i < numWorkers; i++ {
pool.Workers[i] = NewWorker(i)
pool.Workers[i].Start(&pool.wg, taskCh)
}
return pool
}
func (p *WorkerPool) AddTask(task Task) {
p.TaskCh <- task
}
func (p *WorkerPool) Wait() {
close(p.TaskCh)
p.wg.Wait()
for _, worker := range p.Workers {
worker.Stop()
}
}
func main() {
pool := NewWorkerPool(3)
for i := 0; i < 5; i++ {
taskID := i
pool.AddTask(func() {
fmt.Printf("Task %d executed by worker\n", taskID)
})
}
pool.Wait()
}
- In the given code, we start by creating a “Worker” struct which represents a worker in the worker pool. The worker has fields for the ID, a channel to receive the tasks (TaskCh), and a channel to signal the worker to quit (QuitCh).
- Next, we define a “Task” function type which represents a unit of work that is executed by a worker in the pool.
- In the next step, we create a “NewWorker” function which is a constructor function that helps us to create and return a new worker with a given ID. The function also initializes the worker’s channels.
- In the Start() method, we implement the functionality to start a goroutine that listens for a task from the task channel and executes them. To coordinate the worker’s lifecycle, we make sure to use the sync.WaitGroup (wg).
- The next step is implementing the worker’s stop method. The method is of “Worker” type and its role is to send a stop signal on the quit channel. This instructs the worker to stop processing the task.
- Following that is the definition for the “WorkerPool” struct which allows us to represent a pool of workers. This contains a slice of worker instances, a task channel to receive the tasks, and the sync.WaitGroup(wg) to help coordinate the workers.
- Next, we setup the “NewWorkerPool” constructor function that creates and returns a new worker pool with the specified number of workers.
- The “AddTask” method allows us to add a task to the worker pool by sending it to the task channel.
- Finally, we have the “Wait” method whose role is to wait for all the tasks to complete before closing the task channel and wait for the goroutines to finish using the “sync.WaitGroup” before stopping each worker.
- In main, we basically create a new worker pool with three workers using the NewWorkerPool(3) method. We then add five tasks to the worker pool using anonymous functions.
- The tasks basically prints a message with the task ID.
- Finally, we call the pool.wait() to wait for all tasks to complete before shutting down.
Caution
One of the most potential pit falls of worker pools is dealing with deadlocks. We cannot cover the deadlocks in this post but for the previously shown example, ensure the following:
- The “Start” method of the worker accepts the task channel as an argument which ensures that we can close it externally.
- Create the task channel once and pass it to each worker. We also ensure to close the shared channel in the “Wait” method. This signals that no more method should be added.
Conclusion
In this tutorial, we explored the basics of implementing and working with worker pools in Go which plays a critical part in the concurrent apps. Be cautious of deadlocks.