CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/hex-phoenix

Peace of mind from prototype to production - comprehensive web framework for Elixir

Overview
Eval results
Files

real-time.mddocs/

Real-time Communication

Phoenix provides sophisticated real-time communication through WebSocket channels, enabling bidirectional messaging between clients and servers. The system supports broadcasting, presence tracking, and custom message handling with built-in fault tolerance.

Channel System

Channels enable persistent connections between clients and the server with topic-based message routing.

Phoenix.Channel

defmodule Phoenix.Channel do
  # Required callbacks
  @callback join(binary, map, Phoenix.Socket.t()) ::
    {:ok, Phoenix.Socket.t()} |
    {:ok, map, Phoenix.Socket.t()} |
    {:error, map}

  @callback handle_in(binary, map, Phoenix.Socket.t()) ::
    {:noreply, Phoenix.Socket.t()} |
    {:reply, map, Phoenix.Socket.t()} |
    {:stop, term, Phoenix.Socket.t()} |
    {:stop, term, map, Phoenix.Socket.t()}

  # Optional callbacks
  @callback handle_out(binary, map, Phoenix.Socket.t()) ::
    {:noreply, Phoenix.Socket.t()} |
    {:stop, term, Phoenix.Socket.t()}

  @callback handle_info(term, Phoenix.Socket.t()) ::
    {:noreply, Phoenix.Socket.t()} |
    {:push, map, Phoenix.Socket.t()} |
    {:stop, term, Phoenix.Socket.t()}

  @callback handle_call(term, GenServer.from(), Phoenix.Socket.t()) :: term

  @callback handle_cast(term, Phoenix.Socket.t()) :: term

  @callback terminate(term, Phoenix.Socket.t()) :: term

  @callback code_change(term, Phoenix.Socket.t(), term) :: {:ok, Phoenix.Socket.t()}

  # Public API
  def broadcast(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
  def broadcast!(Phoenix.Socket.t(), binary, map) :: :ok
  def broadcast_from(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
  def broadcast_from!(Phoenix.Socket.t(), binary, map) :: :ok

  def push(Phoenix.Socket.t(), binary, map) :: :ok
  def reply({binary, binary}, map) :: :ok
  def socket_ref(Phoenix.Socket.t()) :: {binary, binary}
end

Usage Examples

defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  # Handle client joining a room
  def join("room:" <> room_id, _params, socket) do
    if authorized?(socket.assigns.user_id, room_id) do
      send(self(), :after_join)
      {:ok, socket}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  # Handle messages from clients
  def handle_in("new_message", %{"body" => body}, socket) do
    user = MyApp.Accounts.get_user!(socket.assigns.user_id)

    message = %{
      body: body,
      user: user.name,
      timestamp: DateTime.utc_now()
    }

    # Broadcast to all clients in this topic
    broadcast!(socket, "new_message", message)

    # Store message in database
    MyApp.Chat.create_message(socket.assigns.room_id, user.id, body)

    {:noreply, socket}
  end

  def handle_in("typing", %{"typing" => typing}, socket) do
    broadcast_from!(socket, "typing", %{
      user: socket.assigns.user_name,
      typing: typing
    })
    {:noreply, socket}
  end

  # Handle system messages
  def handle_info(:after_join, socket) do
    # Send recent messages to newly joined client
    messages = MyApp.Chat.list_recent_messages(socket.assigns.room_id)
    push(socket, "message_history", %{messages: messages})
    {:noreply, socket}
  end

  # Handle client disconnection
  def terminate(reason, socket) do
    broadcast_from!(socket, "user_left", %{
      user: socket.assigns.user_name
    })
    :ok
  end

  defp authorized?(user_id, room_id) do
    MyApp.Chat.user_can_access_room?(user_id, room_id)
  end
end

Socket Management

Sockets manage the persistent connection and multiplex multiple channels.

Phoenix.Socket

defmodule Phoenix.Socket do
  # Required callbacks
  @callback connect(map, Phoenix.Socket.t(), map) ::
    {:ok, Phoenix.Socket.t()} | :error

  @callback id(Phoenix.Socket.t()) :: binary | nil

  # Socket struct fields
  defstruct [
    :id,           # socket identifier
    :assigns,      # socket assigns
    :channel,      # current channel module
    :channel_pid,  # channel process PID
    :endpoint,     # endpoint module
    :handler,      # socket handler module
    :joined,       # whether socket has joined a topic
    :join_ref,     # join reference
    :ref,          # message reference
    :pubsub_server, # pubsub server name
    :topic,        # current topic
    :transport,    # transport name (:websocket, :longpoll)
    :transport_pid, # transport process PID
    :serializer    # message serializer module
  ]

  @type t :: %__MODULE__{}
end

Usage Examples

defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  # Channels
  channel "room:*", MyAppWeb.RoomChannel
  channel "user:*", MyAppWeb.UserChannel
  channel "notifications:*", MyAppWeb.NotificationChannel

  # Socket authentication
  def connect(%{"token" => token}, socket, _connect_info) do
    case MyApp.Accounts.verify_socket_token(token) do
      {:ok, user_id} ->
        socket = assign(socket, :user_id, user_id)
        {:ok, socket}

      {:error, _reason} ->
        :error
    end
  end

  def connect(_params, _socket, _connect_info), do: :error

  # Socket identification for presence tracking
  def id(socket), do: "user:#{socket.assigns.user_id}"
end

Broadcasting and PubSub

Phoenix provides a distributed PubSub system for broadcasting messages across processes and nodes.

Broadcasting API

# From Phoenix.Channel
def broadcast(socket_or_endpoint, topic, event, message)
def broadcast!(socket_or_endpoint, topic, event, message)
def broadcast_from(socket_or_endpoint, pid, topic, event, message)
def broadcast_from!(socket_or_endpoint, pid, topic, event, message)

# From Phoenix.Endpoint
@callback broadcast(binary, binary, term) :: :ok | {:error, term}
@callback broadcast!(binary, binary, term) :: :ok
@callback broadcast_from(pid, binary, binary, term) :: :ok | {:error, term}
@callback broadcast_from!(pid, binary, binary, term) :: :ok
@callback local_broadcast(binary, binary, term) :: :ok
@callback local_broadcast_from(pid, binary, binary, term) :: :ok

Usage Examples

# Broadcasting from controllers
defmodule MyAppWeb.PostController do
  use Phoenix.Controller

  def create(conn, %{"post" => post_params}) do
    case MyApp.Blog.create_post(post_params) do
      {:ok, post} ->
        # Broadcast new post to all subscribers
        MyAppWeb.Endpoint.broadcast!(
          "posts:lobby",
          "new_post",
          %{post: post}
        )

        json(conn, %{post: post})

      {:error, changeset} ->
        conn
        |> put_status(:unprocessable_entity)
        |> json(%{errors: changeset})
    end
  end
end

# Broadcasting from contexts
defmodule MyApp.Chat do
  def send_message(room_id, user_id, body) do
    case create_message(room_id, user_id, body) do
      {:ok, message} ->
        # Broadcast to all room subscribers
        MyAppWeb.Endpoint.broadcast!(
          "room:#{room_id}",
          "new_message",
          %{message: message}
        )
        {:ok, message}

      error ->
        error
    end
  end
end

# Broadcasting from GenServers
defmodule MyApp.GameServer do
  use GenServer

  def handle_cast({:update_game_state, game_state}, state) do
    # Broadcast game state changes
    MyAppWeb.Endpoint.broadcast!(
      "game:#{state.game_id}",
      "state_updated",
      %{state: game_state}
    )

    {:noreply, %{state | game_state: game_state}}
  end
end

Transport Configuration

Phoenix supports multiple transport protocols with fallback mechanisms.

Transport Types

# In endpoint configuration
socket "/socket", MyAppWeb.UserSocket,
  websocket: true,
  longpoll: false

# WebSocket configuration
socket "/socket", MyAppWeb.UserSocket,
  websocket: [
    timeout: 45_000,
    transport_log: false,
    subprotocols: ["mqtt", "v1.example.com"]
  ]

# Long polling configuration
socket "/socket", MyAppWeb.UserSocket,
  longpoll: [
    window_ms: 10_000,
    pubsub_timeout_ms: 2_000,
    crypto: [
      max_age: 1_209_600  # 2 weeks
    ]
  ]

Client-Side Connection

// JavaScript client connection
import {Socket} from "phoenix"

let socket = new Socket("/socket", {
  params: {token: userToken},
  logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})

socket.connect()

// Join a channel
let channel = socket.channel("room:lobby", {})
channel.join()
  .receive("ok", resp => { console.log("Joined successfully", resp) })
  .receive("error", resp => { console.log("Unable to join", resp) })

// Handle messages
channel.on("new_message", payload => {
  console.log("New message:", payload.body)
})

// Send messages
channel.push("new_message", {body: "Hello, World!"})

Message Serialization

Phoenix supports multiple message serializers for different transport needs.

Phoenix.Socket.Serializer

defmodule Phoenix.Socket.Serializer do
  @callback encode!(Phoenix.Socket.Message.t()) :: iodata
  @callback decode!(iodata, keyword) :: Phoenix.Socket.Message.t()
end

# Built-in serializers
defmodule Phoenix.Socket.V1.JSONSerializer
defmodule Phoenix.Socket.V2.JSONSerializer

Message Structure

defmodule Phoenix.Socket.Message do
  defstruct [
    :topic,    # topic string
    :event,    # event string
    :payload,  # message payload
    :ref,      # message reference
    :join_ref  # join reference
  ]

  @type t :: %__MODULE__{
    topic: binary,
    event: binary,
    payload: map,
    ref: binary | nil,
    join_ref: binary | nil
  }
end

Error Handling and Lifecycle

defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  # Handle join errors
  def join("room:" <> room_id, _params, socket) do
    case MyApp.Chat.get_room(room_id) do
      nil ->
        {:error, %{reason: "room_not_found"}}

      room ->
        if MyApp.Chat.user_can_join?(socket.assigns.user_id, room) do
          {:ok, assign(socket, :room, room)}
        else
          {:error, %{reason: "access_denied"}}
        end
    end
  end

  # Handle message errors
  def handle_in("send_message", params, socket) do
    case validate_message(params) do
      {:ok, message_params} ->
        # Process message
        {:reply, {:ok, %{status: "sent"}}, socket}

      {:error, errors} ->
        {:reply, {:error, %{errors: errors}}, socket}
    end
  end

  # Handle crashes gracefully
  def terminate(reason, socket) do
    Logger.info("Channel terminated: #{inspect(reason)}")

    # Cleanup resources
    MyApp.Chat.user_left_room(
      socket.assigns.user_id,
      socket.assigns.room.id
    )

    :ok
  end
end

Testing Channels

defmodule MyAppWeb.RoomChannelTest do
  use MyAppWeb.ChannelCase

  setup do
    user = insert(:user)
    room = insert(:room)

    {:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
    {:ok, socket: socket, user: user, room: room}
  end

  test "joining a room", %{socket: socket, room: room} do
    {:ok, reply, socket} = subscribe_and_join(
      socket,
      MyAppWeb.RoomChannel,
      "room:#{room.id}"
    )

    assert reply == %{}
    assert socket.assigns.room.id == room.id
  end

  test "sending messages", %{socket: socket, room: room} do
    {:ok, _, socket} = subscribe_and_join(
      socket,
      MyAppWeb.RoomChannel,
      "room:#{room.id}"
    )

    ref = push(socket, "new_message", %{"body" => "Hello"})
    assert_reply ref, :ok

    assert_broadcast "new_message", %{body: "Hello"}
  end
end

Install with Tessl CLI

npx tessl i tessl/hex-phoenix

docs

code-generation.md

index.md

presence.md

real-time.md

security.md

testing.md

web-foundation.md

tile.json