mirror of
https://github.com/SamyRai/turash.git
synced 2025-12-26 23:01:33 +00:00
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools
Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
* GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
* GET /api/v1/users/me/organizations - User organizations
* POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue
API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules
Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
13 KiB
13 KiB
Go Channels & Goroutines Development Guide
Library: Built-in Go concurrency primitives
Used In: MVP - Event processing, background workers
Purpose: Native Go concurrency for event-driven architecture
Where It's Used
- Event processing (replacing Kafka for MVP)
- Background workers (match computation, PostGIS sync)
- Worker pools (controlled concurrency)
- Graceful shutdown handling
Official Documentation
- Go Concurrency: https://go.dev/doc/effective_go#concurrency
- Go by Example - Goroutines: https://gobyexample.com/goroutines
- Go by Example - Channels: https://gobyexample.com/channels
- Go Tour - Concurrency: https://go.dev/tour/concurrency/1
Key Concepts
1. Goroutines
// Start goroutine
go func() {
// Work in background
processEvent()
}()
// With function
go processEvent()
// With arguments
go processEvent(eventType, eventData)
2. Channels
// Unbuffered channel
ch := make(chan Event)
// Buffered channel
ch := make(chan Event, 100)
// Send to channel
ch <- event
// Receive from channel
event := <-ch
// Receive with ok check
event, ok := <-ch
if !ok {
// Channel closed
}
// Close channel
close(ch)
3. Basic Event Processing
// Event structure
type EventType string
const (
EventResourceFlowCreated EventType = "resource_flow_created"
EventResourceFlowUpdated EventType = "resource_flow_updated"
EventSiteCreated EventType = "site_created"
)
type Event struct {
Type EventType
Data interface{}
Time time.Time
}
// Create event channel
var eventChan = make(chan Event, 100) // Buffered
// Background worker
go func() {
for event := range eventChan {
switch event.Type {
case EventResourceFlowCreated:
handleResourceFlowCreated(event.Data)
case EventResourceFlowUpdated:
handleResourceFlowUpdated(event.Data)
case EventSiteCreated:
handleSiteCreated(event.Data)
}
}
}()
// Publish event
func PublishEvent(eventType EventType, data interface{}) {
eventChan <- Event{
Type: eventType,
Data: data,
Time: time.Now(),
}
}
4. Worker Pool Pattern
// Worker pool for controlled concurrency
type WorkerPool struct {
workers int
eventChan chan Event
workerPool chan chan Event
quit chan bool
}
func NewWorkerPool(workers int, bufferSize int) *WorkerPool {
return &WorkerPool{
workers: workers,
eventChan: make(chan Event, bufferSize),
workerPool: make(chan chan Event, workers),
quit: make(chan bool),
}
}
func (wp *WorkerPool) Start() {
// Start workers
for i := 0; i < wp.workers; i++ {
worker := NewWorker(wp.workerPool)
worker.Start()
}
// Dispatch events to workers
go wp.dispatch()
}
func (wp *WorkerPool) dispatch() {
for {
select {
case event := <-wp.eventChan:
// Get available worker
worker := <-wp.workerPool
worker <- event
case <-wp.quit:
return
}
}
}
func (wp *WorkerPool) Publish(event Event) {
wp.eventChan <- event
}
func (wp *WorkerPool) Stop() {
close(wp.quit)
}
// Worker implementation
type Worker struct {
workerPool chan chan Event
eventChan chan Event
quit chan bool
}
func NewWorker(workerPool chan chan Event) *Worker {
return &Worker{
workerPool: workerPool,
eventChan: make(chan Event),
quit: make(chan bool),
}
}
func (w *Worker) Start() {
go func() {
for {
w.workerPool <- w.eventChan // Register worker as available
select {
case event := <-w.eventChan:
w.processEvent(event)
case <-w.quit:
return
}
}
}()
}
func (w *Worker) processEvent(event Event) {
// Process event
switch event.Type {
case EventResourceFlowCreated:
handleResourceFlowCreated(event.Data)
}
}
5. Select Statement
// Select for multiple channels
select {
case event := <-eventChan:
handleEvent(event)
case <-ctx.Done():
return // Context cancelled
case <-time.After(5 * time.Second):
// Timeout
}
// Non-blocking select with default
select {
case event := <-eventChan:
handleEvent(event)
default:
// No event available, continue
}
6. Context Usage
// Context for cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Worker with context
go func() {
for {
select {
case event := <-eventChan:
handleEvent(ctx, event)
case <-ctx.Done():
return // Context cancelled
}
}
}()
// Context with timeout per operation
func processEvent(ctx context.Context, event Event) error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Do work with timeout
return doWork(ctx)
}
7. WaitGroup for Coordination
import "sync"
var wg sync.WaitGroup
// Wait for multiple goroutines
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
processItem(id)
}(i)
}
// Wait for all to complete
wg.Wait()
// With Go 1.25 WaitGroup.Go (if available)
var wg sync.WaitGroup
wg.Go(func() { processItem(1) })
wg.Go(func() { processItem(2) })
wg.Wait()
8. Once for Initialization
import "sync"
var once sync.Once
func initialize() {
once.Do(func() {
// This will only execute once
setupDatabase()
setupCache()
})
}
MVP-Specific Patterns
Event Publisher Interface
// Interface for event publishing (allows swapping implementations)
type EventPublisher interface {
Publish(ctx context.Context, event Event) error
}
// Channel-based implementation (MVP)
type ChannelEventPublisher struct {
ch chan Event
}
func NewChannelEventPublisher(bufferSize int) *ChannelEventPublisher {
return &ChannelEventPublisher{
ch: make(chan Event, bufferSize),
}
}
func (p *ChannelEventPublisher) Publish(ctx context.Context, event Event) error {
select {
case p.ch <- event:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Start workers
func (p *ChannelEventPublisher) StartWorkers(ctx context.Context, numWorkers int, handler EventHandler) {
for i := 0; i < numWorkers; i++ {
go p.worker(ctx, handler)
}
}
func (p *ChannelEventPublisher) worker(ctx context.Context, handler EventHandler) {
for {
select {
case event := <-p.ch:
if err := handler(ctx, event); err != nil {
log.Printf("Error handling event: %v", err)
}
case <-ctx.Done():
return
}
}
}
// Later: Redis Streams implementation (same interface)
type RedisEventPublisher struct {
client *redis.Client
}
func (p *RedisEventPublisher) Publish(ctx context.Context, event Event) error {
// Use Redis Streams
return p.client.XAdd(ctx, &redis.XAddArgs{
Stream: "events",
Values: map[string]interface{}{
"type": event.Type,
"data": event.Data,
},
}).Err()
}
Event Handler Pattern
type EventHandler interface {
Handle(ctx context.Context, event Event) error
}
// Resource flow handler
type ResourceFlowHandler struct {
neo4jService *Neo4jService
postGISService *PostGISService
matchService *MatchService
}
func (h *ResourceFlowHandler) Handle(ctx context.Context, event Event) error {
switch event.Type {
case EventResourceFlowCreated:
return h.handleResourceFlowCreated(ctx, event.Data)
case EventResourceFlowUpdated:
return h.handleResourceFlowUpdated(ctx, event.Data)
default:
return fmt.Errorf("unknown event type: %s", event.Type)
}
}
func (h *ResourceFlowHandler) handleResourceFlowCreated(ctx context.Context, data interface{}) error {
flow, ok := data.(ResourceFlow)
if !ok {
return fmt.Errorf("invalid event data")
}
// Sync to PostGIS
if err := h.postGISService.SyncSite(ctx, flow.SiteID); err != nil {
return err
}
// Trigger match computation
go h.matchService.ComputeMatches(ctx, flow.ID)
return nil
}
Graceful Shutdown
func main() {
// Setup services
eventPub := NewChannelEventPublisher(100)
// Start workers
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
handler := &ResourceFlowHandler{...}
eventPub.StartWorkers(ctx, 5, handler)
// Setup HTTP server
srv := &http.Server{
Addr: ":8080",
Handler: router,
}
// Start server in goroutine
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}()
// Wait for interrupt signal
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, os.Kill)
<-quit
// Graceful shutdown
log.Println("Shutting down...")
// Cancel context (stop workers)
cancel()
// Shutdown HTTP server
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal(err)
}
log.Println("Server stopped")
}
Common Patterns
Pipeline Pattern
// Pipeline for data processing
func Pipeline(ctx context.Context, in <-chan Event) <-chan ProcessedEvent {
out := make(chan ProcessedEvent)
go func() {
defer close(out)
for event := range in {
processed := processEvent(event)
select {
case out <- processed:
case <-ctx.Done():
return
}
}
}()
return out
}
// Fan-out pattern (multiple workers)
func FanOut(ctx context.Context, in <-chan Event, workers int) []<-chan Event {
outputs := make([]<-chan Event, workers)
for i := 0; i < workers; i++ {
out := make(chan Event)
outputs[i] = out
go func() {
defer close(out)
for event := range in {
select {
case out <- event:
case <-ctx.Done():
return
}
}
}()
}
return outputs
}
// Fan-in pattern (merge multiple channels)
func FanIn(ctx context.Context, inputs ...<-chan Event) <-chan Event {
out := make(chan Event)
var wg sync.WaitGroup
for _, in := range inputs {
wg.Add(1)
go func(ch <-chan Event) {
defer wg.Done()
for event := range ch {
select {
case out <- event:
case <-ctx.Done():
return
}
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Rate Limiting
// Token bucket rate limiter using channels
type RateLimiter struct {
ticker *time.Ticker
limit chan struct{}
}
func NewRateLimiter(rate time.Duration, burst int) *RateLimiter {
rl := &RateLimiter{
ticker: time.NewTicker(rate),
limit: make(chan struct{}, burst),
}
// Fill bucket
go func() {
for range rl.ticker.C {
select {
case rl.limit <- struct{}{}:
default:
}
}
}()
return rl
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
select {
case <-rl.limit:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
Best Practices
- Always close channels - prevent goroutine leaks
- Use context for cancellation - graceful shutdown
- Check channel closure -
value, ok := <-ch - Avoid blocking on unbuffered channels - use select with timeout
- Use buffered channels - when producer/consumer speeds differ
- Worker pools - for controlled concurrency
- WaitGroups - for coordinating multiple goroutines
- Once - for initialization that should happen once
Common Pitfalls
- Goroutine leaks - always ensure goroutines can exit
- Deadlocks - be careful with channel operations
- Race conditions - use sync primitives or channels
- Context propagation - pass context to goroutines
Tutorials & Resources
- Go Concurrency Patterns: https://go.dev/blog/pipelines
- Advanced Go Concurrency: https://go.dev/blog/context
- Go by Example: https://gobyexample.com/channels
- Effective Go - Concurrency: https://go.dev/doc/effective_go#concurrency