turash/bugulma/backend/internal/service/websocket_service.go
Damir Mukimov 000eab4740
Major repository reorganization and missing backend endpoints implementation
Repository Structure:
- Move files from cluttered root directory into organized structure
- Create archive/ for archived data and scraper results
- Create bugulma/ for the complete application (frontend + backend)
- Create data/ for sample datasets and reference materials
- Create docs/ for comprehensive documentation structure
- Create scripts/ for utility scripts and API tools

Backend Implementation:
- Implement 3 missing backend endpoints identified in gap analysis:
  * GET /api/v1/organizations/{id}/matching/direct - Direct symbiosis matches
  * GET /api/v1/users/me/organizations - User organizations
  * POST /api/v1/proposals/{id}/status - Update proposal status
- Add complete proposal domain model, repository, and service layers
- Create database migration for proposals table
- Fix CLI server command registration issue

API Documentation:
- Add comprehensive proposals.md API documentation
- Update README.md with Users and Proposals API sections
- Document all request/response formats, error codes, and business rules

Code Quality:
- Follow existing Go backend architecture patterns
- Add proper error handling and validation
- Match frontend expected response schemas
- Maintain clean separation of concerns (handler -> service -> repository)
2025-11-25 06:01:16 +01:00

296 lines
7.0 KiB
Go

package service
import (
"log"
"net/http"
"sync"
"time"
"bugulma/backend/internal/domain"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// WebSocketMessage is now defined in the domain package
// WebSocketClient represents a connected WebSocket client
type WebSocketClient struct {
hub *WebSocketHub
conn *websocket.Conn
send chan domain.WebSocketMessage
orgID string
userID string
}
// WebSocketHub manages WebSocket connections and broadcasting
type WebSocketHub struct {
clients map[*WebSocketClient]bool
broadcast chan domain.WebSocketMessage
register chan *WebSocketClient
unregister chan *WebSocketClient
orgClients map[string]map[*WebSocketClient]bool // orgID -> clients
mu sync.RWMutex
}
// BroadcastToOrganization sends a message to all clients connected to a specific organization
func (h *WebSocketHub) BroadcastToOrganization(orgID string, message domain.WebSocketMessage) {
h.mu.RLock()
orgClients, exists := h.orgClients[orgID]
h.mu.RUnlock()
if !exists {
return
}
h.mu.RLock()
for client := range orgClients {
select {
case client.send <- message:
default:
// Client channel is full, close connection
close(client.send)
delete(orgClients, client)
}
}
h.mu.RUnlock()
}
// NewWebSocketHub creates a new WebSocket hub
func NewWebSocketHub() *WebSocketHub {
return &WebSocketHub{
clients: make(map[*WebSocketClient]bool),
broadcast: make(chan domain.WebSocketMessage),
register: make(chan *WebSocketClient),
unregister: make(chan *WebSocketClient),
orgClients: make(map[string]map[*WebSocketClient]bool),
}
}
// Run starts the WebSocket hub
func (hub *WebSocketHub) Run() {
for {
select {
case client := <-hub.register:
hub.addClient(client)
case client := <-hub.unregister:
hub.removeClient(client)
case message := <-hub.broadcast:
hub.broadcastMessage(message)
}
}
}
// addClient registers a new WebSocket client
func (hub *WebSocketHub) addClient(client *WebSocketClient) {
hub.mu.Lock()
defer hub.mu.Unlock()
hub.clients[client] = true
// Add to organization-specific clients
if hub.orgClients[client.orgID] == nil {
hub.orgClients[client.orgID] = make(map[*WebSocketClient]bool)
}
hub.orgClients[client.orgID][client] = true
log.Printf("WebSocket client connected for org %s", client.orgID)
}
// removeClient unregisters a WebSocket client
func (hub *WebSocketHub) removeClient(client *WebSocketClient) {
hub.mu.Lock()
defer hub.mu.Unlock()
if _, ok := hub.clients[client]; ok {
delete(hub.clients, client)
close(client.send)
// Remove from organization-specific clients
if orgClients, exists := hub.orgClients[client.orgID]; exists {
delete(orgClients, client)
if len(orgClients) == 0 {
delete(hub.orgClients, client.orgID)
}
}
}
log.Printf("WebSocket client disconnected for org %s", client.orgID)
}
// broadcastMessage sends a message to all clients
func (hub *WebSocketHub) broadcastMessage(message domain.WebSocketMessage) {
hub.mu.RLock()
defer hub.mu.RUnlock()
for client := range hub.clients {
select {
case client.send <- message:
default:
hub.removeClient(client)
}
}
}
// BroadcastToOrganization sends a message to all clients of a specific organization
// BroadcastToUser sends a message to all clients of a specific user
func (hub *WebSocketHub) BroadcastToUser(userID string, message domain.WebSocketMessage) {
hub.mu.RLock()
defer hub.mu.RUnlock()
for client := range hub.clients {
if client.userID == userID {
select {
case client.send <- message:
default:
hub.removeClient(client)
}
}
}
}
// HandleConnection handles a new WebSocket connection
func (hub *WebSocketHub) HandleConnection(c *gin.Context, orgID, userID string) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// In production, implement proper origin checking
return true
},
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WebSocket upgrade failed: %v", err)
return
}
client := &WebSocketClient{
hub: hub,
conn: conn,
send: make(chan domain.WebSocketMessage, 256),
orgID: orgID,
userID: userID,
}
hub.register <- client
// Start client goroutines
go client.writePump()
go client.readPump()
}
// writePump handles outgoing messages to the client
func (client *WebSocketClient) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
client.conn.Close()
}()
for {
select {
case message, ok := <-client.send:
client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
client.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
if err := client.conn.WriteJSON(message); err != nil {
log.Printf("Error writing WebSocket message: %v", err)
return
}
case <-ticker.C:
client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// readPump handles incoming messages from the client (mostly for ping/pong)
func (client *WebSocketClient) readPump() {
defer func() {
client.hub.unregister <- client
client.conn.Close()
}()
client.conn.SetReadLimit(512)
client.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
client.conn.SetPongHandler(func(string) error {
client.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, _, err := client.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("WebSocket error: %v", err)
}
break
}
}
}
// WebSocketService provides WebSocket functionality
type WebSocketService struct {
hub *WebSocketHub
}
// NewWebSocketService creates a new WebSocket service
func NewWebSocketService() *WebSocketService {
hub := NewWebSocketHub()
go hub.Run()
return &WebSocketService{
hub: hub,
}
}
// GetHub returns the WebSocket hub
func (ws *WebSocketService) GetHub() *WebSocketHub {
return ws.hub
}
// BroadcastMatchUpdate broadcasts a match update to organization clients
func (ws *WebSocketService) BroadcastMatchUpdate(orgID, matchID string, eventType string) {
message := domain.WebSocketMessage{
Type: "match_updated",
Payload: map[string]interface{}{
"match_id": matchID,
"event": eventType,
},
Timestamp: time.Now(),
}
ws.hub.BroadcastToOrganization(orgID, message)
}
// BroadcastNewMatch broadcasts a new match notification
func (ws *WebSocketService) BroadcastNewMatch(orgID string, matchData map[string]interface{}) {
message := domain.WebSocketMessage{
Type: "new_match",
Payload: matchData,
Timestamp: time.Now(),
}
ws.hub.BroadcastToOrganization(orgID, message)
}
// HandleConnection handles WebSocket connection upgrade
func (ws *WebSocketService) HandleConnection(c *gin.Context) {
orgID := c.Query("org")
userID := c.Query("user")
if orgID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "org parameter required"})
return
}
ws.hub.HandleConnection(c, orgID, userID)
}