golang worker pool and jobs pattern

The worker pool pattern is where we have a pool of worker spin up and ready to received incoming work from a job queue. 

We use waitgroup to wait for all the job to finish.

Next we have a job channel that receive jobs and then results channel that collects outputs from these diffferent jobs. 

We create a pool of worker (not threads) that ready to read in jobs coming in. New jobs that came in, will call processJob and place results in result channel. 

In the end, we print out all the data i the result channel. 


package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    ID    int
    Data  string
    Delay time.Duration
}

type WorkerPool struct {
    jobs    <-chan Job
    results chan<- Job
    workers int
    wg      sync.WaitGroup
    ctx     context.Context
}

func (wp *WorkerPool) startWorker() {
    wp.wg.Add(1)
    go func() {
        defer wp.wg.Done()
        for {
            select {
            case job, ok := <-wp.jobs:
                if !ok {
                    return // Channel closed
                }
                wp.processJob(job)
            case <-wp.ctx.Done():
                return // Context cancelled
            }
        }
    }()
}

func (wp *WorkerPool) processJob(job Job) {
    fmt.Printf("Worker processing job %d: %s\n", job.ID, job.Data)
    time.Sleep(job.Delay) // Simulate work
    wp.results <- job
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    jobs := make(chan Job, 10)
    results := make(chan Job, 10)

    pool := &WorkerPool{
        jobs:    jobs,
        results: results,
        workers: 3,
        ctx:     ctx,
    }

    // Start workers
    for i := 0; i < pool.workers; i++ {
        pool.startWorker()
    }

    // Send jobs
    go func() {
        for i := 1; i <= 10; i++ {
            jobs <- Job{
                ID:    i,
                Data:  fmt.Sprintf("Task-%d", i),
                Delay: time.Duration(rand.Intn(2)) * time.Second,
            }
        }
        close(jobs) // Signal no more jobs
    }()

    // Collect results
    go func() {
        for result := range results {
            fmt.Printf("Completed job %d\n", result.ID)
        }
    }()

    pool.wg.Wait() // Wait for all workers to finish
    close(results)
}





Comments

Popular posts from this blog

gemini cli getting file not defined error

NodeJS: Error: spawn EINVAL in window for node version 20.20 and 18.20

vllm : Failed to infer device type