WebSocket Real Time Communication Guide



This content originally appeared on DEV Community and was authored by member_a07758c4

As a junior computer science student, I have always been fascinated by real-time communication technologies. During my exploration of modern web development, I discovered that WebSocket technology opens up a whole new world of possibilities for creating interactive, responsive applications. This journey led me to understand the complete implementation from handshake protocol to message broadcasting.

Project Information
🚀 Hyperlane Framework: GitHub Repository
📧 Author Contact: root@ltpp.vip
📖 Documentation: Official Docs

Understanding WebSocket Fundamentals

In my ten years of programming learning experience, I found that WebSocket represents a paradigm shift from traditional request-response patterns to persistent, bidirectional communication. Unlike HTTP, which follows a strict client-server request model, WebSocket enables both parties to initiate communication at any time.

The beauty of WebSocket lies in its simplicity and efficiency. Once the initial handshake is complete, the overhead for each message is minimal, making it perfect for real-time applications like chat systems, live updates, and collaborative tools.

use hyperlane::*;
use hyperlane_macros::*;
use tokio::sync::{broadcast, RwLock};
use std::sync::Arc;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};

// WebSocket connection manager
struct WebSocketManager {
    connections: Arc<RwLock<HashMap<String, ConnectionInfo>>>,
    message_broadcaster: broadcast::Sender<BroadcastMessage>,
    connection_stats: Arc<RwLock<ConnectionStats>>,
}

#[derive(Clone, Debug)]
struct ConnectionInfo {
    id: String,
    user_id: Option<String>,
    room_id: Option<String>,
    connected_at: chrono::DateTime<chrono::Utc>,
    last_ping: chrono::DateTime<chrono::Utc>,
    message_count: u64,
}

#[derive(Clone, Debug)]
struct BroadcastMessage {
    message_type: MessageType,
    sender_id: Option<String>,
    target: BroadcastTarget,
    payload: serde_json::Value,
    timestamp: chrono::DateTime<chrono::Utc>,
}

#[derive(Clone, Debug)]
enum MessageType {
    Chat,
    SystemNotification,
    UserJoined,
    UserLeft,
    Ping,
    Pong,
}

#[derive(Clone, Debug)]
enum BroadcastTarget {
    All,
    Room(String),
    User(String),
    ExceptSender(String),
}

#[derive(Clone, Debug, Default)]
struct ConnectionStats {
    total_connections: u64,
    active_connections: u64,
    peak_connections: u64,
    total_messages: u64,
    messages_per_second: f64,
    last_stats_update: Option<chrono::DateTime<chrono::Utc>>,
}

impl WebSocketManager {
    fn new() -> Self {
        let (broadcaster, _) = broadcast::channel(1000);

        Self {
            connections: Arc::new(RwLock::new(HashMap::new())),
            message_broadcaster: broadcaster,
            connection_stats: Arc::new(RwLock::new(ConnectionStats::default())),
        }
    }

    async fn register_connection(&self, connection_id: String, user_id: Option<String>) -> broadcast::Receiver<BroadcastMessage> {
        let connection_info = ConnectionInfo {
            id: connection_id.clone(),
            user_id: user_id.clone(),
            room_id: None,
            connected_at: chrono::Utc::now(),
            last_ping: chrono::Utc::now(),
            message_count: 0,
        };

        {
            let mut connections = self.connections.write().await;
            connections.insert(connection_id.clone(), connection_info);
        }

        // Update statistics
        {
            let mut stats = self.connection_stats.write().await;
            stats.total_connections += 1;
            stats.active_connections += 1;
            if stats.active_connections > stats.peak_connections {
                stats.peak_connections = stats.active_connections;
            }
        }

        // Broadcast user joined message
        if let Some(user_id) = user_id {
            let join_message = BroadcastMessage {
                message_type: MessageType::UserJoined,
                sender_id: Some(user_id.clone()),
                target: BroadcastTarget::ExceptSender(connection_id.clone()),
                payload: serde_json::json!({
                    "user_id": user_id,
                    "message": format!("User {} joined", user_id)
                }),
                timestamp: chrono::Utc::now(),
            };

            let _ = self.message_broadcaster.send(join_message);
        }

        self.message_broadcaster.subscribe()
    }

    async fn unregister_connection(&self, connection_id: &str) {
        let connection_info = {
            let mut connections = self.connections.write().await;
            connections.remove(connection_id)
        };

        if let Some(info) = connection_info {
            // Update statistics
            {
                let mut stats = self.connection_stats.write().await;
                stats.active_connections -= 1;
            }

            // Broadcast user left message
            if let Some(user_id) = info.user_id {
                let leave_message = BroadcastMessage {
                    message_type: MessageType::UserLeft,
                    sender_id: Some(user_id.clone()),
                    target: BroadcastTarget::All,
                    payload: serde_json::json!({
                        "user_id": user_id,
                        "message": format!("User {} left", user_id)
                    }),
                    timestamp: chrono::Utc::now(),
                };

                let _ = self.message_broadcaster.send(leave_message);
            }
        }
    }

    async fn join_room(&self, connection_id: &str, room_id: String) -> Result<(), String> {
        let mut connections = self.connections.write().await;
        if let Some(connection) = connections.get_mut(connection_id) {
            connection.room_id = Some(room_id.clone());

            // Broadcast room join message
            let join_message = BroadcastMessage {
                message_type: MessageType::SystemNotification,
                sender_id: connection.user_id.clone(),
                target: BroadcastTarget::Room(room_id.clone()),
                payload: serde_json::json!({
                    "type": "room_join",
                    "user_id": connection.user_id,
                    "room_id": room_id,
                    "message": format!("User joined room {}", room_id)
                }),
                timestamp: chrono::Utc::now(),
            };

            let _ = self.message_broadcaster.send(join_message);
            Ok(())
        } else {
            Err("Connection not found".to_string())
        }
    }

    async fn leave_room(&self, connection_id: &str) -> Result<(), String> {
        let mut connections = self.connections.write().await;
        if let Some(connection) = connections.get_mut(connection_id) {
            if let Some(room_id) = connection.room_id.take() {
                // Broadcast room leave message
                let leave_message = BroadcastMessage {
                    message_type: MessageType::SystemNotification,
                    sender_id: connection.user_id.clone(),
                    target: BroadcastTarget::Room(room_id.clone()),
                    payload: serde_json::json!({
                        "type": "room_leave",
                        "user_id": connection.user_id,
                        "room_id": room_id,
                        "message": format!("User left room {}", room_id)
                    }),
                    timestamp: chrono::Utc::now(),
                };

                let _ = self.message_broadcaster.send(leave_message);
            }
            Ok(())
        } else {
            Err("Connection not found".to_string())
        }
    }

    async fn send_message(&self, connection_id: &str, message: IncomingMessage) -> Result<(), String> {
        let sender_info = {
            let mut connections = self.connections.write().await;
            if let Some(connection) = connections.get_mut(connection_id) {
                connection.message_count += 1;
                connection.last_ping = chrono::Utc::now();
                Some((connection.user_id.clone(), connection.room_id.clone()))
            } else {
                None
            }
        };

        if let Some((sender_id, room_id)) = sender_info {
            let broadcast_message = BroadcastMessage {
                message_type: MessageType::Chat,
                sender_id: sender_id.clone(),
                target: match message.target.as_str() {
                    "all" => BroadcastTarget::All,
                    "room" => BroadcastTarget::Room(room_id.unwrap_or_else(|| "general".to_string())),
                    user_id => BroadcastTarget::User(user_id.to_string()),
                },
                payload: serde_json::json!({
                    "type": "chat_message",
                    "sender_id": sender_id,
                    "content": message.content,
                    "timestamp": chrono::Utc::now().timestamp()
                }),
                timestamp: chrono::Utc::now(),
            };

            // Update message statistics
            {
                let mut stats = self.connection_stats.write().await;
                stats.total_messages += 1;

                // Calculate messages per second
                let now = chrono::Utc::now();
                if let Some(last_update) = stats.last_stats_update {
                    let duration = now.signed_duration_since(last_update);
                    if duration.num_seconds() > 0 {
                        stats.messages_per_second = stats.total_messages as f64 / duration.num_seconds() as f64;
                    }
                } else {
                    stats.last_stats_update = Some(now);
                }
            }

            let _ = self.message_broadcaster.send(broadcast_message);
            Ok(())
        } else {
            Err("Connection not found".to_string())
        }
    }

    async fn ping_connection(&self, connection_id: &str) -> Result<(), String> {
        let mut connections = self.connections.write().await;
        if let Some(connection) = connections.get_mut(connection_id) {
            connection.last_ping = chrono::Utc::now();
            Ok(())
        } else {
            Err("Connection not found".to_string())
        }
    }

    async fn get_connection_stats(&self) -> ConnectionStats {
        self.connection_stats.read().await.clone()
    }

    async fn get_room_users(&self, room_id: &str) -> Vec<String> {
        let connections = self.connections.read().await;
        connections.values()
            .filter(|conn| conn.room_id.as_ref() == Some(&room_id.to_string()))
            .filter_map(|conn| conn.user_id.clone())
            .collect()
    }

    async fn cleanup_stale_connections(&self, timeout_seconds: i64) {
        let now = chrono::Utc::now();
        let mut stale_connections = Vec::new();

        {
            let connections = self.connections.read().await;
            for (id, info) in connections.iter() {
                let duration = now.signed_duration_since(info.last_ping);
                if duration.num_seconds() > timeout_seconds {
                    stale_connections.push(id.clone());
                }
            }
        }

        for connection_id in stale_connections {
            self.unregister_connection(&connection_id).await;
        }
    }
}

#[derive(Deserialize)]
struct IncomingMessage {
    content: String,
    target: String, // "all", "room", or specific user_id
}

#[derive(Serialize)]
struct OutgoingMessage {
    message_type: String,
    sender_id: Option<String>,
    content: serde_json::Value,
    timestamp: i64,
}

static WEBSOCKET_MANAGER: once_cell::sync::Lazy<WebSocketManager> =
    once_cell::sync::Lazy::new(|| WebSocketManager::new());

// WebSocket endpoint implementation
#[websocket]
async fn websocket_endpoint(ctx: Context) {
    let query_params = ctx.get_query_params().await;
    let user_id = query_params.get("user_id").cloned();
    let connection_id = format!("conn_{}", uuid::Uuid::new_v4());

    // Register connection and get message receiver
    let mut message_receiver = WEBSOCKET_MANAGER.register_connection(connection_id.clone(), user_id.clone()).await;

    // Send welcome message
    let welcome_message = OutgoingMessage {
        message_type: "welcome".to_string(),
        sender_id: None,
        content: serde_json::json!({
            "connection_id": connection_id,
            "message": "Connected successfully"
        }),
        timestamp: chrono::Utc::now().timestamp(),
    };

    ctx.send_websocket_message(serde_json::to_string(&welcome_message).unwrap()).await;

    // Handle incoming and outgoing messages
    loop {
        tokio::select! {
            // Handle incoming WebSocket messages
            incoming = ctx.receive_websocket_message() => {
                match incoming {
                    Ok(message) => {
                        if let Ok(parsed_message) = serde_json::from_str::<IncomingMessage>(&message) {
                            if let Err(e) = WEBSOCKET_MANAGER.send_message(&connection_id, parsed_message).await {
                                eprintln!("Error sending message: {}", e);
                                break;
                            }
                        } else if message == "ping" {
                            // Handle ping
                            let _ = WEBSOCKET_MANAGER.ping_connection(&connection_id).await;
                            ctx.send_websocket_message("pong".to_string()).await;
                        }
                    }
                    Err(_) => {
                        // Connection closed
                        break;
                    }
                }
            }

            // Handle broadcast messages
            broadcast_msg = message_receiver.recv() => {
                match broadcast_msg {
                    Ok(msg) => {
                        // Check if this message should be sent to this connection
                        let should_send = match &msg.target {
                            BroadcastTarget::All => true,
                            BroadcastTarget::Room(room_id) => {
                                let connections = WEBSOCKET_MANAGER.connections.read().await;
                                if let Some(conn) = connections.get(&connection_id) {
                                    conn.room_id.as_ref() == Some(room_id)
                                } else {
                                    false
                                }
                            }
                            BroadcastTarget::User(target_user_id) => {
                                let connections = WEBSOCKET_MANAGER.connections.read().await;
                                if let Some(conn) = connections.get(&connection_id) {
                                    conn.user_id.as_ref() == Some(target_user_id)
                                } else {
                                    false
                                }
                            }
                            BroadcastTarget::ExceptSender(sender_conn_id) => {
                                connection_id != *sender_conn_id
                            }
                        };

                        if should_send {
                            let outgoing_message = OutgoingMessage {
                                message_type: format!("{:?}", msg.message_type).to_lowercase(),
                                sender_id: msg.sender_id,
                                content: msg.payload,
                                timestamp: msg.timestamp.timestamp(),
                            };

                            ctx.send_websocket_message(serde_json::to_string(&outgoing_message).unwrap()).await;
                        }
                    }
                    Err(_) => {
                        // Broadcast channel closed
                        break;
                    }
                }
            }
        }
    }

    // Clean up connection
    WEBSOCKET_MANAGER.unregister_connection(&connection_id).await;
}

// REST endpoints for WebSocket management
#[post]
async fn join_room_endpoint(ctx: Context) {
    let connection_id = ctx.get_route_param("connection_id").await.unwrap_or_default();
    let request_body: Vec<u8> = ctx.get_request_body().await;
    let room_request: JoinRoomRequest = serde_json::from_slice(&request_body).unwrap();

    match WEBSOCKET_MANAGER.join_room(&connection_id, room_request.room_id).await {
        Ok(()) => {
            ctx.set_response_status_code(200)
                .await
                .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
                .await
                .set_response_body(r#"{"status": "joined"}"#)
                .await;
        }
        Err(error) => {
            ctx.set_response_status_code(400)
                .await
                .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
                .await
                .set_response_body(format!(r#"{{"error": "{}"}}"#, error))
                .await;
        }
    }
}

#[derive(Deserialize)]
struct JoinRoomRequest {
    room_id: String,
}

#[post]
async fn leave_room_endpoint(ctx: Context) {
    let connection_id = ctx.get_route_param("connection_id").await.unwrap_or_default();

    match WEBSOCKET_MANAGER.leave_room(&connection_id).await {
        Ok(()) => {
            ctx.set_response_status_code(200)
                .await
                .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
                .await
                .set_response_body(r#"{"status": "left"}"#)
                .await;
        }
        Err(error) => {
            ctx.set_response_status_code(400)
                .await
                .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
                .await
                .set_response_body(format!(r#"{{"error": "{}"}}"#, error))
                .await;
        }
    }
}

#[get]
async fn websocket_stats_endpoint(ctx: Context) {
    let stats = WEBSOCKET_MANAGER.get_connection_stats().await;

    ctx.set_response_status_code(200)
        .await
        .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
        .await
        .set_response_body(serde_json::to_string(&stats).unwrap())
        .await;
}

#[get]
async fn room_users_endpoint(ctx: Context) {
    let room_id = ctx.get_route_param("room_id").await.unwrap_or_default();
    let users = WEBSOCKET_MANAGER.get_room_users(&room_id).await;

    let response = serde_json::json!({
        "room_id": room_id,
        "users": users,
        "user_count": users.len()
    });

    ctx.set_response_status_code(200)
        .await
        .set_response_header(CONTENT_TYPE, APPLICATION_JSON)
        .await
        .set_response_body(serde_json::to_string(&response).unwrap())
        .await;
}

// Periodic cleanup task
async fn start_cleanup_task() {
    let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));

    loop {
        interval.tick().await;
        WEBSOCKET_MANAGER.cleanup_stale_connections(300).await; // 5 minutes timeout
    }
}

Advanced WebSocket Features

In my exploration of WebSocket technology, I discovered several advanced features that make real-time applications more robust and scalable:

  1. Connection Pooling: Managing multiple connections efficiently
  2. Message Broadcasting: Distributing messages to multiple clients
  3. Room Management: Organizing users into logical groups
  4. Heartbeat Mechanism: Detecting and handling connection failures
  5. Message Queuing: Handling offline users and message persistence

These features transform simple WebSocket connections into powerful real-time communication systems capable of supporting complex applications like collaborative editors, multiplayer games, and live streaming platforms.

Performance Considerations

Through my testing and optimization work, I learned that WebSocket performance depends on several factors:

  • Message Serialization: Efficient encoding/decoding of messages
  • Connection Management: Proper cleanup and resource management
  • Broadcasting Strategy: Optimized message distribution algorithms
  • Memory Usage: Careful management of connection state and message buffers

The framework I’ve been studying handles these concerns elegantly, providing high-performance WebSocket support with minimal overhead and maximum scalability.

This article documents my journey as a junior student exploring WebSocket technology and real-time communication. Through practical implementation and testing, I gained deep insights into the challenges and solutions of building real-time web applications. I hope my experience can help other students understand this powerful technology.

For more information, please visit Hyperlane GitHub page or contact the author: root@ltpp.vip


This content originally appeared on DEV Community and was authored by member_a07758c4