package service import ( "log" "net/http" "sync" "time" "bugulma/backend/internal/domain" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) // WebSocketMessage is now defined in the domain package // WebSocketClient represents a connected WebSocket client type WebSocketClient struct { hub *WebSocketHub conn *websocket.Conn send chan domain.WebSocketMessage orgID string userID string } // WebSocketHub manages WebSocket connections and broadcasting type WebSocketHub struct { clients map[*WebSocketClient]bool broadcast chan domain.WebSocketMessage register chan *WebSocketClient unregister chan *WebSocketClient orgClients map[string]map[*WebSocketClient]bool // orgID -> clients mu sync.RWMutex } // BroadcastToOrganization sends a message to all clients connected to a specific organization func (h *WebSocketHub) BroadcastToOrganization(orgID string, message domain.WebSocketMessage) { h.mu.RLock() orgClients, exists := h.orgClients[orgID] h.mu.RUnlock() if !exists { return } h.mu.RLock() for client := range orgClients { select { case client.send <- message: default: // Client channel is full, close connection close(client.send) delete(orgClients, client) } } h.mu.RUnlock() } // NewWebSocketHub creates a new WebSocket hub func NewWebSocketHub() *WebSocketHub { return &WebSocketHub{ clients: make(map[*WebSocketClient]bool), broadcast: make(chan domain.WebSocketMessage), register: make(chan *WebSocketClient), unregister: make(chan *WebSocketClient), orgClients: make(map[string]map[*WebSocketClient]bool), } } // Run starts the WebSocket hub func (hub *WebSocketHub) Run() { for { select { case client := <-hub.register: hub.addClient(client) case client := <-hub.unregister: hub.removeClient(client) case message := <-hub.broadcast: hub.broadcastMessage(message) } } } // addClient registers a new WebSocket client func (hub *WebSocketHub) addClient(client *WebSocketClient) { hub.mu.Lock() defer hub.mu.Unlock() hub.clients[client] = true // Add to organization-specific clients if hub.orgClients[client.orgID] == nil { hub.orgClients[client.orgID] = make(map[*WebSocketClient]bool) } hub.orgClients[client.orgID][client] = true log.Printf("WebSocket client connected for org %s", client.orgID) } // removeClient unregisters a WebSocket client func (hub *WebSocketHub) removeClient(client *WebSocketClient) { hub.mu.Lock() defer hub.mu.Unlock() if _, ok := hub.clients[client]; ok { delete(hub.clients, client) close(client.send) // Remove from organization-specific clients if orgClients, exists := hub.orgClients[client.orgID]; exists { delete(orgClients, client) if len(orgClients) == 0 { delete(hub.orgClients, client.orgID) } } } log.Printf("WebSocket client disconnected for org %s", client.orgID) } // broadcastMessage sends a message to all clients func (hub *WebSocketHub) broadcastMessage(message domain.WebSocketMessage) { hub.mu.RLock() defer hub.mu.RUnlock() for client := range hub.clients { select { case client.send <- message: default: hub.removeClient(client) } } } // BroadcastToOrganization sends a message to all clients of a specific organization // BroadcastToUser sends a message to all clients of a specific user func (hub *WebSocketHub) BroadcastToUser(userID string, message domain.WebSocketMessage) { hub.mu.RLock() defer hub.mu.RUnlock() for client := range hub.clients { if client.userID == userID { select { case client.send <- message: default: hub.removeClient(client) } } } } // HandleConnection handles a new WebSocket connection func (hub *WebSocketHub) HandleConnection(c *gin.Context, orgID, userID string) { upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // In production, implement proper origin checking return true }, } conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Printf("WebSocket upgrade failed: %v", err) return } client := &WebSocketClient{ hub: hub, conn: conn, send: make(chan domain.WebSocketMessage, 256), orgID: orgID, userID: userID, } hub.register <- client // Start client goroutines go client.writePump() go client.readPump() } // writePump handles outgoing messages to the client func (client *WebSocketClient) writePump() { ticker := time.NewTicker(54 * time.Second) defer func() { ticker.Stop() client.conn.Close() }() for { select { case message, ok := <-client.send: client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if !ok { client.conn.WriteMessage(websocket.CloseMessage, []byte{}) return } if err := client.conn.WriteJSON(message); err != nil { log.Printf("Error writing WebSocket message: %v", err) return } case <-ticker.C: client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // readPump handles incoming messages from the client (mostly for ping/pong) func (client *WebSocketClient) readPump() { defer func() { client.hub.unregister <- client client.conn.Close() }() client.conn.SetReadLimit(512) client.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) client.conn.SetPongHandler(func(string) error { client.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) for { _, _, err := client.conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("WebSocket error: %v", err) } break } } } // WebSocketService provides WebSocket functionality type WebSocketService struct { hub *WebSocketHub } // NewWebSocketService creates a new WebSocket service func NewWebSocketService() *WebSocketService { hub := NewWebSocketHub() go hub.Run() return &WebSocketService{ hub: hub, } } // GetHub returns the WebSocket hub func (ws *WebSocketService) GetHub() *WebSocketHub { return ws.hub } // BroadcastMatchUpdate broadcasts a match update to organization clients func (ws *WebSocketService) BroadcastMatchUpdate(orgID, matchID string, eventType string) { message := domain.WebSocketMessage{ Type: "match_updated", Payload: map[string]interface{}{ "match_id": matchID, "event": eventType, }, Timestamp: time.Now(), } ws.hub.BroadcastToOrganization(orgID, message) } // BroadcastNewMatch broadcasts a new match notification func (ws *WebSocketService) BroadcastNewMatch(orgID string, matchData map[string]interface{}) { message := domain.WebSocketMessage{ Type: "new_match", Payload: matchData, Timestamp: time.Now(), } ws.hub.BroadcastToOrganization(orgID, message) } // HandleConnection handles WebSocket connection upgrade func (ws *WebSocketService) HandleConnection(c *gin.Context) { orgID := c.Query("org") userID := c.Query("user") if orgID == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "org parameter required"}) return } ws.hub.HandleConnection(c, orgID, userID) }