# Go Channels & Goroutines Development Guide **Library**: Built-in Go concurrency primitives **Used In**: MVP - Event processing, background workers **Purpose**: Native Go concurrency for event-driven architecture --- ## Where It's Used - **Event processing** (replacing Kafka for MVP) - **Background workers** (match computation, PostGIS sync) - **Worker pools** (controlled concurrency) - **Graceful shutdown** handling --- ## Official Documentation - **Go Concurrency**: https://go.dev/doc/effective_go#concurrency - **Go by Example - Goroutines**: https://gobyexample.com/goroutines - **Go by Example - Channels**: https://gobyexample.com/channels - **Go Tour - Concurrency**: https://go.dev/tour/concurrency/1 --- ## Key Concepts ### 1. Goroutines ```go // Start goroutine go func() { // Work in background processEvent() }() // With function go processEvent() // With arguments go processEvent(eventType, eventData) ``` ### 2. Channels ```go // Unbuffered channel ch := make(chan Event) // Buffered channel ch := make(chan Event, 100) // Send to channel ch <- event // Receive from channel event := <-ch // Receive with ok check event, ok := <-ch if !ok { // Channel closed } // Close channel close(ch) ``` ### 3. Basic Event Processing ```go // Event structure type EventType string const ( EventResourceFlowCreated EventType = "resource_flow_created" EventResourceFlowUpdated EventType = "resource_flow_updated" EventSiteCreated EventType = "site_created" ) type Event struct { Type EventType Data interface{} Time time.Time } // Create event channel var eventChan = make(chan Event, 100) // Buffered // Background worker go func() { for event := range eventChan { switch event.Type { case EventResourceFlowCreated: handleResourceFlowCreated(event.Data) case EventResourceFlowUpdated: handleResourceFlowUpdated(event.Data) case EventSiteCreated: handleSiteCreated(event.Data) } } }() // Publish event func PublishEvent(eventType EventType, data interface{}) { eventChan <- Event{ Type: eventType, Data: data, Time: time.Now(), } } ``` ### 4. Worker Pool Pattern ```go // Worker pool for controlled concurrency type WorkerPool struct { workers int eventChan chan Event workerPool chan chan Event quit chan bool } func NewWorkerPool(workers int, bufferSize int) *WorkerPool { return &WorkerPool{ workers: workers, eventChan: make(chan Event, bufferSize), workerPool: make(chan chan Event, workers), quit: make(chan bool), } } func (wp *WorkerPool) Start() { // Start workers for i := 0; i < wp.workers; i++ { worker := NewWorker(wp.workerPool) worker.Start() } // Dispatch events to workers go wp.dispatch() } func (wp *WorkerPool) dispatch() { for { select { case event := <-wp.eventChan: // Get available worker worker := <-wp.workerPool worker <- event case <-wp.quit: return } } } func (wp *WorkerPool) Publish(event Event) { wp.eventChan <- event } func (wp *WorkerPool) Stop() { close(wp.quit) } // Worker implementation type Worker struct { workerPool chan chan Event eventChan chan Event quit chan bool } func NewWorker(workerPool chan chan Event) *Worker { return &Worker{ workerPool: workerPool, eventChan: make(chan Event), quit: make(chan bool), } } func (w *Worker) Start() { go func() { for { w.workerPool <- w.eventChan // Register worker as available select { case event := <-w.eventChan: w.processEvent(event) case <-w.quit: return } } }() } func (w *Worker) processEvent(event Event) { // Process event switch event.Type { case EventResourceFlowCreated: handleResourceFlowCreated(event.Data) } } ``` ### 5. Select Statement ```go // Select for multiple channels select { case event := <-eventChan: handleEvent(event) case <-ctx.Done(): return // Context cancelled case <-time.After(5 * time.Second): // Timeout } // Non-blocking select with default select { case event := <-eventChan: handleEvent(event) default: // No event available, continue } ``` ### 6. Context Usage ```go // Context for cancellation ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Worker with context go func() { for { select { case event := <-eventChan: handleEvent(ctx, event) case <-ctx.Done(): return // Context cancelled } } }() // Context with timeout per operation func processEvent(ctx context.Context, event Event) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() // Do work with timeout return doWork(ctx) } ``` ### 7. WaitGroup for Coordination ```go import "sync" var wg sync.WaitGroup // Wait for multiple goroutines for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() processItem(id) }(i) } // Wait for all to complete wg.Wait() // With Go 1.25 WaitGroup.Go (if available) var wg sync.WaitGroup wg.Go(func() { processItem(1) }) wg.Go(func() { processItem(2) }) wg.Wait() ``` ### 8. Once for Initialization ```go import "sync" var once sync.Once func initialize() { once.Do(func() { // This will only execute once setupDatabase() setupCache() }) } ``` --- ## MVP-Specific Patterns ### Event Publisher Interface ```go // Interface for event publishing (allows swapping implementations) type EventPublisher interface { Publish(ctx context.Context, event Event) error } // Channel-based implementation (MVP) type ChannelEventPublisher struct { ch chan Event } func NewChannelEventPublisher(bufferSize int) *ChannelEventPublisher { return &ChannelEventPublisher{ ch: make(chan Event, bufferSize), } } func (p *ChannelEventPublisher) Publish(ctx context.Context, event Event) error { select { case p.ch <- event: return nil case <-ctx.Done(): return ctx.Err() } } // Start workers func (p *ChannelEventPublisher) StartWorkers(ctx context.Context, numWorkers int, handler EventHandler) { for i := 0; i < numWorkers; i++ { go p.worker(ctx, handler) } } func (p *ChannelEventPublisher) worker(ctx context.Context, handler EventHandler) { for { select { case event := <-p.ch: if err := handler(ctx, event); err != nil { log.Printf("Error handling event: %v", err) } case <-ctx.Done(): return } } } // Later: Redis Streams implementation (same interface) type RedisEventPublisher struct { client *redis.Client } func (p *RedisEventPublisher) Publish(ctx context.Context, event Event) error { // Use Redis Streams return p.client.XAdd(ctx, &redis.XAddArgs{ Stream: "events", Values: map[string]interface{}{ "type": event.Type, "data": event.Data, }, }).Err() } ``` ### Event Handler Pattern ```go type EventHandler interface { Handle(ctx context.Context, event Event) error } // Resource flow handler type ResourceFlowHandler struct { neo4jService *Neo4jService postGISService *PostGISService matchService *MatchService } func (h *ResourceFlowHandler) Handle(ctx context.Context, event Event) error { switch event.Type { case EventResourceFlowCreated: return h.handleResourceFlowCreated(ctx, event.Data) case EventResourceFlowUpdated: return h.handleResourceFlowUpdated(ctx, event.Data) default: return fmt.Errorf("unknown event type: %s", event.Type) } } func (h *ResourceFlowHandler) handleResourceFlowCreated(ctx context.Context, data interface{}) error { flow, ok := data.(ResourceFlow) if !ok { return fmt.Errorf("invalid event data") } // Sync to PostGIS if err := h.postGISService.SyncSite(ctx, flow.SiteID); err != nil { return err } // Trigger match computation go h.matchService.ComputeMatches(ctx, flow.ID) return nil } ``` ### Graceful Shutdown ```go func main() { // Setup services eventPub := NewChannelEventPublisher(100) // Start workers ctx, cancel := context.WithCancel(context.Background()) defer cancel() handler := &ResourceFlowHandler{...} eventPub.StartWorkers(ctx, 5, handler) // Setup HTTP server srv := &http.Server{ Addr: ":8080", Handler: router, } // Start server in goroutine go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() // Wait for interrupt signal quit := make(chan os.Signal, 1) signal.Notify(quit, os.Interrupt, os.Kill) <-quit // Graceful shutdown log.Println("Shutting down...") // Cancel context (stop workers) cancel() // Shutdown HTTP server ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := srv.Shutdown(ctx); err != nil { log.Fatal(err) } log.Println("Server stopped") } ``` --- ## Common Patterns ### Pipeline Pattern ```go // Pipeline for data processing func Pipeline(ctx context.Context, in <-chan Event) <-chan ProcessedEvent { out := make(chan ProcessedEvent) go func() { defer close(out) for event := range in { processed := processEvent(event) select { case out <- processed: case <-ctx.Done(): return } } }() return out } // Fan-out pattern (multiple workers) func FanOut(ctx context.Context, in <-chan Event, workers int) []<-chan Event { outputs := make([]<-chan Event, workers) for i := 0; i < workers; i++ { out := make(chan Event) outputs[i] = out go func() { defer close(out) for event := range in { select { case out <- event: case <-ctx.Done(): return } } }() } return outputs } // Fan-in pattern (merge multiple channels) func FanIn(ctx context.Context, inputs ...<-chan Event) <-chan Event { out := make(chan Event) var wg sync.WaitGroup for _, in := range inputs { wg.Add(1) go func(ch <-chan Event) { defer wg.Done() for event := range ch { select { case out <- event: case <-ctx.Done(): return } } }(in) } go func() { wg.Wait() close(out) }() return out } ``` ### Rate Limiting ```go // Token bucket rate limiter using channels type RateLimiter struct { ticker *time.Ticker limit chan struct{} } func NewRateLimiter(rate time.Duration, burst int) *RateLimiter { rl := &RateLimiter{ ticker: time.NewTicker(rate), limit: make(chan struct{}, burst), } // Fill bucket go func() { for range rl.ticker.C { select { case rl.limit <- struct{}{}: default: } } }() return rl } func (rl *RateLimiter) Wait(ctx context.Context) error { select { case <-rl.limit: return nil case <-ctx.Done(): return ctx.Err() } } ``` --- ## Best Practices 1. **Always close channels** - prevent goroutine leaks 2. **Use context for cancellation** - graceful shutdown 3. **Check channel closure** - `value, ok := <-ch` 4. **Avoid blocking on unbuffered channels** - use select with timeout 5. **Use buffered channels** - when producer/consumer speeds differ 6. **Worker pools** - for controlled concurrency 7. **WaitGroups** - for coordinating multiple goroutines 8. **Once** - for initialization that should happen once --- ## Common Pitfalls 1. **Goroutine leaks** - always ensure goroutines can exit 2. **Deadlocks** - be careful with channel operations 3. **Race conditions** - use sync primitives or channels 4. **Context propagation** - pass context to goroutines --- ## Tutorials & Resources - **Go Concurrency Patterns**: https://go.dev/blog/pipelines - **Advanced Go Concurrency**: https://go.dev/blog/context - **Go by Example**: https://gobyexample.com/channels - **Effective Go - Concurrency**: https://go.dev/doc/effective_go#concurrency