turash/dev_guides/06_go_channels_goroutines.md
Damir Mukimov 4a2fda96cd
Initial commit: Repository setup with .gitignore, golangci-lint v2.6.0, and code quality checks
- Initialize git repository
- Add comprehensive .gitignore for Go projects
- Install golangci-lint v2.6.0 (latest v2) globally
- Configure .golangci.yml with appropriate linters and formatters
- Fix all formatting issues (gofmt)
- Fix all errcheck issues (unchecked errors)
- Adjust complexity threshold for validation functions
- All checks passing: build, test, vet, lint
2025-11-01 07:36:22 +01:00

13 KiB

Go Channels & Goroutines Development Guide

Library: Built-in Go concurrency primitives
Used In: MVP - Event processing, background workers
Purpose: Native Go concurrency for event-driven architecture


Where It's Used

  • Event processing (replacing Kafka for MVP)
  • Background workers (match computation, PostGIS sync)
  • Worker pools (controlled concurrency)
  • Graceful shutdown handling

Official Documentation


Key Concepts

1. Goroutines

// Start goroutine
go func() {
    // Work in background
    processEvent()
}()

// With function
go processEvent()

// With arguments
go processEvent(eventType, eventData)

2. Channels

// Unbuffered channel
ch := make(chan Event)

// Buffered channel
ch := make(chan Event, 100)

// Send to channel
ch <- event

// Receive from channel
event := <-ch

// Receive with ok check
event, ok := <-ch
if !ok {
    // Channel closed
}

// Close channel
close(ch)

3. Basic Event Processing

// Event structure
type EventType string

const (
    EventResourceFlowCreated EventType = "resource_flow_created"
    EventResourceFlowUpdated EventType = "resource_flow_updated"
    EventSiteCreated         EventType = "site_created"
)

type Event struct {
    Type    EventType
    Data    interface{}
    Time    time.Time
}

// Create event channel
var eventChan = make(chan Event, 100) // Buffered

// Background worker
go func() {
    for event := range eventChan {
        switch event.Type {
        case EventResourceFlowCreated:
            handleResourceFlowCreated(event.Data)
        case EventResourceFlowUpdated:
            handleResourceFlowUpdated(event.Data)
        case EventSiteCreated:
            handleSiteCreated(event.Data)
        }
    }
}()

// Publish event
func PublishEvent(eventType EventType, data interface{}) {
    eventChan <- Event{
        Type: eventType,
        Data: data,
        Time: time.Now(),
    }
}

4. Worker Pool Pattern

// Worker pool for controlled concurrency
type WorkerPool struct {
    workers    int
    eventChan  chan Event
    workerPool chan chan Event
    quit       chan bool
}

func NewWorkerPool(workers int, bufferSize int) *WorkerPool {
    return &WorkerPool{
        workers:    workers,
        eventChan:  make(chan Event, bufferSize),
        workerPool: make(chan chan Event, workers),
        quit:       make(chan bool),
    }
}

func (wp *WorkerPool) Start() {
    // Start workers
    for i := 0; i < wp.workers; i++ {
        worker := NewWorker(wp.workerPool)
        worker.Start()
    }
    
    // Dispatch events to workers
    go wp.dispatch()
}

func (wp *WorkerPool) dispatch() {
    for {
        select {
        case event := <-wp.eventChan:
            // Get available worker
            worker := <-wp.workerPool
            worker <- event
        case <-wp.quit:
            return
        }
    }
}

func (wp *WorkerPool) Publish(event Event) {
    wp.eventChan <- event
}

func (wp *WorkerPool) Stop() {
    close(wp.quit)
}

// Worker implementation
type Worker struct {
    workerPool chan chan Event
    eventChan  chan Event
    quit       chan bool
}

func NewWorker(workerPool chan chan Event) *Worker {
    return &Worker{
        workerPool: workerPool,
        eventChan:  make(chan Event),
        quit:       make(chan bool),
    }
}

func (w *Worker) Start() {
    go func() {
        for {
            w.workerPool <- w.eventChan // Register worker as available
            
            select {
            case event := <-w.eventChan:
                w.processEvent(event)
            case <-w.quit:
                return
            }
        }
    }()
}

func (w *Worker) processEvent(event Event) {
    // Process event
    switch event.Type {
    case EventResourceFlowCreated:
        handleResourceFlowCreated(event.Data)
    }
}

5. Select Statement

// Select for multiple channels
select {
case event := <-eventChan:
    handleEvent(event)
case <-ctx.Done():
    return // Context cancelled
case <-time.After(5 * time.Second):
    // Timeout
}

// Non-blocking select with default
select {
case event := <-eventChan:
    handleEvent(event)
default:
    // No event available, continue
}

6. Context Usage

// Context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Worker with context
go func() {
    for {
        select {
        case event := <-eventChan:
            handleEvent(ctx, event)
        case <-ctx.Done():
            return // Context cancelled
        }
    }
}()

// Context with timeout per operation
func processEvent(ctx context.Context, event Event) error {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()
    
    // Do work with timeout
    return doWork(ctx)
}

7. WaitGroup for Coordination

import "sync"

var wg sync.WaitGroup

// Wait for multiple goroutines
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func(id int) {
        defer wg.Done()
        processItem(id)
    }(i)
}

// Wait for all to complete
wg.Wait()

// With Go 1.25 WaitGroup.Go (if available)
var wg sync.WaitGroup
wg.Go(func() { processItem(1) })
wg.Go(func() { processItem(2) })
wg.Wait()

8. Once for Initialization

import "sync"

var once sync.Once

func initialize() {
    once.Do(func() {
        // This will only execute once
        setupDatabase()
        setupCache()
    })
}

MVP-Specific Patterns

Event Publisher Interface

// Interface for event publishing (allows swapping implementations)
type EventPublisher interface {
    Publish(ctx context.Context, event Event) error
}

// Channel-based implementation (MVP)
type ChannelEventPublisher struct {
    ch chan Event
}

func NewChannelEventPublisher(bufferSize int) *ChannelEventPublisher {
    return &ChannelEventPublisher{
        ch: make(chan Event, bufferSize),
    }
}

func (p *ChannelEventPublisher) Publish(ctx context.Context, event Event) error {
    select {
    case p.ch <- event:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

// Start workers
func (p *ChannelEventPublisher) StartWorkers(ctx context.Context, numWorkers int, handler EventHandler) {
    for i := 0; i < numWorkers; i++ {
        go p.worker(ctx, handler)
    }
}

func (p *ChannelEventPublisher) worker(ctx context.Context, handler EventHandler) {
    for {
        select {
        case event := <-p.ch:
            if err := handler(ctx, event); err != nil {
                log.Printf("Error handling event: %v", err)
            }
        case <-ctx.Done():
            return
        }
    }
}

// Later: Redis Streams implementation (same interface)
type RedisEventPublisher struct {
    client *redis.Client
}

func (p *RedisEventPublisher) Publish(ctx context.Context, event Event) error {
    // Use Redis Streams
    return p.client.XAdd(ctx, &redis.XAddArgs{
        Stream: "events",
        Values: map[string]interface{}{
            "type": event.Type,
            "data": event.Data,
        },
    }).Err()
}

Event Handler Pattern

type EventHandler interface {
    Handle(ctx context.Context, event Event) error
}

// Resource flow handler
type ResourceFlowHandler struct {
    neo4jService  *Neo4jService
    postGISService *PostGISService
    matchService  *MatchService
}

func (h *ResourceFlowHandler) Handle(ctx context.Context, event Event) error {
    switch event.Type {
    case EventResourceFlowCreated:
        return h.handleResourceFlowCreated(ctx, event.Data)
    case EventResourceFlowUpdated:
        return h.handleResourceFlowUpdated(ctx, event.Data)
    default:
        return fmt.Errorf("unknown event type: %s", event.Type)
    }
}

func (h *ResourceFlowHandler) handleResourceFlowCreated(ctx context.Context, data interface{}) error {
    flow, ok := data.(ResourceFlow)
    if !ok {
        return fmt.Errorf("invalid event data")
    }
    
    // Sync to PostGIS
    if err := h.postGISService.SyncSite(ctx, flow.SiteID); err != nil {
        return err
    }
    
    // Trigger match computation
    go h.matchService.ComputeMatches(ctx, flow.ID)
    
    return nil
}

Graceful Shutdown

func main() {
    // Setup services
    eventPub := NewChannelEventPublisher(100)
    
    // Start workers
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    
    handler := &ResourceFlowHandler{...}
    eventPub.StartWorkers(ctx, 5, handler)
    
    // Setup HTTP server
    srv := &http.Server{
        Addr:    ":8080",
        Handler: router,
    }
    
    // Start server in goroutine
    go func() {
        if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatal(err)
        }
    }()
    
    // Wait for interrupt signal
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, os.Interrupt, os.Kill)
    <-quit
    
    // Graceful shutdown
    log.Println("Shutting down...")
    
    // Cancel context (stop workers)
    cancel()
    
    // Shutdown HTTP server
    ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    if err := srv.Shutdown(ctx); err != nil {
        log.Fatal(err)
    }
    
    log.Println("Server stopped")
}

Common Patterns

Pipeline Pattern

// Pipeline for data processing
func Pipeline(ctx context.Context, in <-chan Event) <-chan ProcessedEvent {
    out := make(chan ProcessedEvent)
    
    go func() {
        defer close(out)
        for event := range in {
            processed := processEvent(event)
            
            select {
            case out <- processed:
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return out
}

// Fan-out pattern (multiple workers)
func FanOut(ctx context.Context, in <-chan Event, workers int) []<-chan Event {
    outputs := make([]<-chan Event, workers)
    for i := 0; i < workers; i++ {
        out := make(chan Event)
        outputs[i] = out
        
        go func() {
            defer close(out)
            for event := range in {
                select {
                case out <- event:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    return outputs
}

// Fan-in pattern (merge multiple channels)
func FanIn(ctx context.Context, inputs ...<-chan Event) <-chan Event {
    out := make(chan Event)
    
    var wg sync.WaitGroup
    for _, in := range inputs {
        wg.Add(1)
        go func(ch <-chan Event) {
            defer wg.Done()
            for event := range ch {
                select {
                case out <- event:
                case <-ctx.Done():
                    return
                }
            }
        }(in)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

Rate Limiting

// Token bucket rate limiter using channels
type RateLimiter struct {
    ticker *time.Ticker
    limit  chan struct{}
}

func NewRateLimiter(rate time.Duration, burst int) *RateLimiter {
    rl := &RateLimiter{
        ticker: time.NewTicker(rate),
        limit:  make(chan struct{}, burst),
    }
    
    // Fill bucket
    go func() {
        for range rl.ticker.C {
            select {
            case rl.limit <- struct{}{}:
            default:
            }
        }
    }()
    
    return rl
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    select {
    case <-rl.limit:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

Best Practices

  1. Always close channels - prevent goroutine leaks
  2. Use context for cancellation - graceful shutdown
  3. Check channel closure - value, ok := <-ch
  4. Avoid blocking on unbuffered channels - use select with timeout
  5. Use buffered channels - when producer/consumer speeds differ
  6. Worker pools - for controlled concurrency
  7. WaitGroups - for coordinating multiple goroutines
  8. Once - for initialization that should happen once

Common Pitfalls

  1. Goroutine leaks - always ensure goroutines can exit
  2. Deadlocks - be careful with channel operations
  3. Race conditions - use sync primitives or channels
  4. Context propagation - pass context to goroutines

Tutorials & Resources