add WebSocket support for real-time delivery updates with JWT authentication and automatic reconnection
Some checks failed
Build and Push Docker Images / build-backend (push) Has been cancelled
Build and Push Docker Images / build-frontend (push) Has been cancelled

This commit is contained in:
Egor Pozharov
2026-05-21 15:52:05 +06:00
parent c87aea47ce
commit d1efebbb34
16 changed files with 408 additions and 73 deletions

View File

@@ -2,10 +2,12 @@ package delivery
import (
"errors"
"log"
"net/http"
"time"
sqlc "github.com/chedius/delivery-tracker/internal/db/sqlc"
"github.com/chedius/delivery-tracker/internal/ws"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgtype"
@@ -13,6 +15,7 @@ import (
type Handler struct {
queries *sqlc.Queries
hub *ws.Hub
}
// DeliveryRequest represents the request body for creating or updating a delivery
@@ -38,8 +41,8 @@ type DeliveryRequest struct {
Comment string `json:"comment"`
}
func NewHandler(queries *sqlc.Queries) *Handler {
return &Handler{queries: queries}
func NewHandler(queries *sqlc.Queries, hub *ws.Hub) *Handler {
return &Handler{queries: queries, hub: hub}
}
// GET /api/deliveries/:id
@@ -139,6 +142,7 @@ func (h *Handler) CreateDelivery(c *gin.Context) {
return
}
h.hub.Broadcast(ws.NewEvent(ws.DeliveryCreated, res))
c.JSON(http.StatusOK, gin.H{"message": "Delivery created", "id": res.ID.String()})
}
@@ -199,6 +203,11 @@ func (h *Handler) UpdateDelivery(c *gin.Context) {
return
}
if updated, err := h.queries.GetDeliveryByID(c.Request.Context(), pgtype.UUID{Bytes: parsedID, Valid: true}); err == nil {
h.hub.Broadcast(ws.NewEvent(ws.DeliveryUpdated, updated))
} else {
log.Printf("delivery: failed to fetch updated delivery %s for ws broadcast: %v", id, err)
}
c.JSON(http.StatusOK, gin.H{"message": "Delivery updated"})
}
@@ -240,6 +249,7 @@ func (h *Handler) UpdateDeliveryStatus(c *gin.Context) {
return
}
h.hub.Broadcast(ws.NewEvent(ws.DeliveryStatusChanged, ws.StatusPayload{ID: id, Status: status}))
c.JSON(http.StatusOK, gin.H{"message": "Delivery status updated"})
}
@@ -263,6 +273,7 @@ func (h *Handler) DeleteDelivery(c *gin.Context) {
return
}
h.hub.Broadcast(ws.NewEvent(ws.DeliveryDeleted, ws.DeletePayload{ID: id}))
c.JSON(http.StatusOK, gin.H{"message": "Delivery deleted"})
}

View File

@@ -0,0 +1,74 @@
package ws
import (
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
}
func NewClient(hub *Hub, conn *websocket.Conn) *Client {
return &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
}
// ReadPump keeps the connection alive and handles pong frames.
// We don't expect real messages from the client (read-only WS).
func (c *Client) ReadPump() {
defer func() {
c.hub.Unregister(c)
c.conn.Close()
}()
c.conn.SetReadLimit(512)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
if _, _, err := c.conn.ReadMessage(); err != nil {
break
}
}
}
// WritePump sends messages from hub to the client and pings to keep alive.
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case msg, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, nil)
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}

View File

@@ -0,0 +1,40 @@
package ws
import (
"encoding/json"
"log"
)
type EventType string
const (
DeliveryCreated EventType = "delivery.created"
DeliveryUpdated EventType = "delivery.updated"
DeliveryStatusChanged EventType = "delivery.status_changed"
DeliveryDeleted EventType = "delivery.deleted"
)
type Event struct {
Type EventType `json:"type"`
Payload any `json:"payload"`
}
type StatusPayload struct {
ID string `json:"id"`
Status string `json:"status"`
}
type DeletePayload struct {
ID string `json:"id"`
}
// NewEvent serializes an event to JSON. Returns nil on marshal failure;
// callers should treat nil as "skip broadcast".
func NewEvent(eventType EventType, payload any) []byte {
data, err := json.Marshal(Event{Type: eventType, Payload: payload})
if err != nil {
log.Printf("ws: failed to marshal event %s: %v", eventType, err)
return nil
}
return data
}

View File

@@ -0,0 +1,41 @@
package ws
import (
"net/http"
"github.com/chedius/delivery-tracker/internal/auth"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// HandleWS returns a Gin handler that upgrades HTTP to WebSocket
// after validating the JWT token from the ?token= query param.
func HandleWS(hub *Hub, jwtSecret []byte) gin.HandlerFunc {
return func(c *gin.Context) {
token := c.Query("token")
if token == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
return
}
if _, err := auth.ParseToken(token, jwtSecret); err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
return
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
client := NewClient(hub, conn)
hub.Register(client)
go client.WritePump()
go client.ReadPump()
}
}

View File

@@ -0,0 +1,48 @@
package ws
import "sync"
type Hub struct {
mu sync.RWMutex
clients map[*Client]struct{}
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]struct{}),
}
}
func (h *Hub) Register(c *Client) {
h.mu.Lock()
h.clients[c] = struct{}{}
h.mu.Unlock()
}
func (h *Hub) Unregister(c *Client) {
h.mu.Lock()
if _, ok := h.clients[c]; ok {
delete(h.clients, c)
close(c.send)
}
h.mu.Unlock()
}
func (h *Hub) Broadcast(msg []byte) {
if msg == nil {
return
}
h.mu.RLock()
defer h.mu.RUnlock()
for c := range h.clients {
select {
case c.send <- msg:
default:
// Client too slow, schedule disconnect
go func(c *Client) {
h.Unregister(c)
c.conn.Close()
}(c)
}
}
}