golang worker pool and jobs pattern
The worker pool pattern is where we have a pool of worker spin up and ready to received incoming work from a job queue.
We use waitgroup to wait for all the job to finish.
Next we have a job channel that receive jobs and then results channel that collects outputs from these diffferent jobs.
We create a pool of worker (not threads) that ready to read in jobs coming in. New jobs that came in, will call processJob and place results in result channel.
In the end, we print out all the data i the result channel.
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Data string
Delay time.Duration
}
type WorkerPool struct {
jobs <-chan Job
results chan<- Job
workers int
wg sync.WaitGroup
ctx context.Context
}
func (wp *WorkerPool) startWorker() {
wp.wg.Add(1)
go func() {
defer wp.wg.Done()
for {
select {
case job, ok := <-wp.jobs:
if !ok {
return // Channel closed
}
wp.processJob(job)
case <-wp.ctx.Done():
return // Context cancelled
}
}
}()
}
func (wp *WorkerPool) processJob(job Job) {
fmt.Printf("Worker processing job %d: %s\n", job.ID, job.Data)
time.Sleep(job.Delay) // Simulate work
wp.results <- job
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
jobs := make(chan Job, 10)
results := make(chan Job, 10)
pool := &WorkerPool{
jobs: jobs,
results: results,
workers: 3,
ctx: ctx,
}
// Start workers
for i := 0; i < pool.workers; i++ {
pool.startWorker()
}
// Send jobs
go func() {
for i := 1; i <= 10; i++ {
jobs <- Job{
ID: i,
Data: fmt.Sprintf("Task-%d", i),
Delay: time.Duration(rand.Intn(2)) * time.Second,
}
}
close(jobs) // Signal no more jobs
}()
// Collect results
go func() {
for result := range results {
fmt.Printf("Completed job %d\n", result.ID)
}
}()
pool.wg.Wait() // Wait for all workers to finish
close(results)
}
Comments