Peace of mind from prototype to production - comprehensive web framework for Elixir
Phoenix provides sophisticated real-time communication through WebSocket channels, enabling bidirectional messaging between clients and servers. The system supports broadcasting, presence tracking, and custom message handling with built-in fault tolerance.
Channels enable persistent connections between clients and the server with topic-based message routing.
defmodule Phoenix.Channel do
# Required callbacks
@callback join(binary, map, Phoenix.Socket.t()) ::
{:ok, Phoenix.Socket.t()} |
{:ok, map, Phoenix.Socket.t()} |
{:error, map}
@callback handle_in(binary, map, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} |
{:reply, map, Phoenix.Socket.t()} |
{:stop, term, Phoenix.Socket.t()} |
{:stop, term, map, Phoenix.Socket.t()}
# Optional callbacks
@callback handle_out(binary, map, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} |
{:stop, term, Phoenix.Socket.t()}
@callback handle_info(term, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} |
{:push, map, Phoenix.Socket.t()} |
{:stop, term, Phoenix.Socket.t()}
@callback handle_call(term, GenServer.from(), Phoenix.Socket.t()) :: term
@callback handle_cast(term, Phoenix.Socket.t()) :: term
@callback terminate(term, Phoenix.Socket.t()) :: term
@callback code_change(term, Phoenix.Socket.t(), term) :: {:ok, Phoenix.Socket.t()}
# Public API
def broadcast(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
def broadcast!(Phoenix.Socket.t(), binary, map) :: :ok
def broadcast_from(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
def broadcast_from!(Phoenix.Socket.t(), binary, map) :: :ok
def push(Phoenix.Socket.t(), binary, map) :: :ok
def reply({binary, binary}, map) :: :ok
def socket_ref(Phoenix.Socket.t()) :: {binary, binary}
enddefmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
# Handle client joining a room
def join("room:" <> room_id, _params, socket) do
if authorized?(socket.assigns.user_id, room_id) do
send(self(), :after_join)
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
# Handle messages from clients
def handle_in("new_message", %{"body" => body}, socket) do
user = MyApp.Accounts.get_user!(socket.assigns.user_id)
message = %{
body: body,
user: user.name,
timestamp: DateTime.utc_now()
}
# Broadcast to all clients in this topic
broadcast!(socket, "new_message", message)
# Store message in database
MyApp.Chat.create_message(socket.assigns.room_id, user.id, body)
{:noreply, socket}
end
def handle_in("typing", %{"typing" => typing}, socket) do
broadcast_from!(socket, "typing", %{
user: socket.assigns.user_name,
typing: typing
})
{:noreply, socket}
end
# Handle system messages
def handle_info(:after_join, socket) do
# Send recent messages to newly joined client
messages = MyApp.Chat.list_recent_messages(socket.assigns.room_id)
push(socket, "message_history", %{messages: messages})
{:noreply, socket}
end
# Handle client disconnection
def terminate(reason, socket) do
broadcast_from!(socket, "user_left", %{
user: socket.assigns.user_name
})
:ok
end
defp authorized?(user_id, room_id) do
MyApp.Chat.user_can_access_room?(user_id, room_id)
end
endSockets manage the persistent connection and multiplex multiple channels.
defmodule Phoenix.Socket do
# Required callbacks
@callback connect(map, Phoenix.Socket.t(), map) ::
{:ok, Phoenix.Socket.t()} | :error
@callback id(Phoenix.Socket.t()) :: binary | nil
# Socket struct fields
defstruct [
:id, # socket identifier
:assigns, # socket assigns
:channel, # current channel module
:channel_pid, # channel process PID
:endpoint, # endpoint module
:handler, # socket handler module
:joined, # whether socket has joined a topic
:join_ref, # join reference
:ref, # message reference
:pubsub_server, # pubsub server name
:topic, # current topic
:transport, # transport name (:websocket, :longpoll)
:transport_pid, # transport process PID
:serializer # message serializer module
]
@type t :: %__MODULE__{}
enddefmodule MyAppWeb.UserSocket do
use Phoenix.Socket
# Channels
channel "room:*", MyAppWeb.RoomChannel
channel "user:*", MyAppWeb.UserChannel
channel "notifications:*", MyAppWeb.NotificationChannel
# Socket authentication
def connect(%{"token" => token}, socket, _connect_info) do
case MyApp.Accounts.verify_socket_token(token) do
{:ok, user_id} ->
socket = assign(socket, :user_id, user_id)
{:ok, socket}
{:error, _reason} ->
:error
end
end
def connect(_params, _socket, _connect_info), do: :error
# Socket identification for presence tracking
def id(socket), do: "user:#{socket.assigns.user_id}"
endPhoenix provides a distributed PubSub system for broadcasting messages across processes and nodes.
# From Phoenix.Channel
def broadcast(socket_or_endpoint, topic, event, message)
def broadcast!(socket_or_endpoint, topic, event, message)
def broadcast_from(socket_or_endpoint, pid, topic, event, message)
def broadcast_from!(socket_or_endpoint, pid, topic, event, message)
# From Phoenix.Endpoint
@callback broadcast(binary, binary, term) :: :ok | {:error, term}
@callback broadcast!(binary, binary, term) :: :ok
@callback broadcast_from(pid, binary, binary, term) :: :ok | {:error, term}
@callback broadcast_from!(pid, binary, binary, term) :: :ok
@callback local_broadcast(binary, binary, term) :: :ok
@callback local_broadcast_from(pid, binary, binary, term) :: :ok# Broadcasting from controllers
defmodule MyAppWeb.PostController do
use Phoenix.Controller
def create(conn, %{"post" => post_params}) do
case MyApp.Blog.create_post(post_params) do
{:ok, post} ->
# Broadcast new post to all subscribers
MyAppWeb.Endpoint.broadcast!(
"posts:lobby",
"new_post",
%{post: post}
)
json(conn, %{post: post})
{:error, changeset} ->
conn
|> put_status(:unprocessable_entity)
|> json(%{errors: changeset})
end
end
end
# Broadcasting from contexts
defmodule MyApp.Chat do
def send_message(room_id, user_id, body) do
case create_message(room_id, user_id, body) do
{:ok, message} ->
# Broadcast to all room subscribers
MyAppWeb.Endpoint.broadcast!(
"room:#{room_id}",
"new_message",
%{message: message}
)
{:ok, message}
error ->
error
end
end
end
# Broadcasting from GenServers
defmodule MyApp.GameServer do
use GenServer
def handle_cast({:update_game_state, game_state}, state) do
# Broadcast game state changes
MyAppWeb.Endpoint.broadcast!(
"game:#{state.game_id}",
"state_updated",
%{state: game_state}
)
{:noreply, %{state | game_state: game_state}}
end
endPhoenix supports multiple transport protocols with fallback mechanisms.
# In endpoint configuration
socket "/socket", MyAppWeb.UserSocket,
websocket: true,
longpoll: false
# WebSocket configuration
socket "/socket", MyAppWeb.UserSocket,
websocket: [
timeout: 45_000,
transport_log: false,
subprotocols: ["mqtt", "v1.example.com"]
]
# Long polling configuration
socket "/socket", MyAppWeb.UserSocket,
longpoll: [
window_ms: 10_000,
pubsub_timeout_ms: 2_000,
crypto: [
max_age: 1_209_600 # 2 weeks
]
]// JavaScript client connection
import {Socket} from "phoenix"
let socket = new Socket("/socket", {
params: {token: userToken},
logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
})
socket.connect()
// Join a channel
let channel = socket.channel("room:lobby", {})
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp) })
.receive("error", resp => { console.log("Unable to join", resp) })
// Handle messages
channel.on("new_message", payload => {
console.log("New message:", payload.body)
})
// Send messages
channel.push("new_message", {body: "Hello, World!"})Phoenix supports multiple message serializers for different transport needs.
defmodule Phoenix.Socket.Serializer do
@callback encode!(Phoenix.Socket.Message.t()) :: iodata
@callback decode!(iodata, keyword) :: Phoenix.Socket.Message.t()
end
# Built-in serializers
defmodule Phoenix.Socket.V1.JSONSerializer
defmodule Phoenix.Socket.V2.JSONSerializerdefmodule Phoenix.Socket.Message do
defstruct [
:topic, # topic string
:event, # event string
:payload, # message payload
:ref, # message reference
:join_ref # join reference
]
@type t :: %__MODULE__{
topic: binary,
event: binary,
payload: map,
ref: binary | nil,
join_ref: binary | nil
}
enddefmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
# Handle join errors
def join("room:" <> room_id, _params, socket) do
case MyApp.Chat.get_room(room_id) do
nil ->
{:error, %{reason: "room_not_found"}}
room ->
if MyApp.Chat.user_can_join?(socket.assigns.user_id, room) do
{:ok, assign(socket, :room, room)}
else
{:error, %{reason: "access_denied"}}
end
end
end
# Handle message errors
def handle_in("send_message", params, socket) do
case validate_message(params) do
{:ok, message_params} ->
# Process message
{:reply, {:ok, %{status: "sent"}}, socket}
{:error, errors} ->
{:reply, {:error, %{errors: errors}}, socket}
end
end
# Handle crashes gracefully
def terminate(reason, socket) do
Logger.info("Channel terminated: #{inspect(reason)}")
# Cleanup resources
MyApp.Chat.user_left_room(
socket.assigns.user_id,
socket.assigns.room.id
)
:ok
end
enddefmodule MyAppWeb.RoomChannelTest do
use MyAppWeb.ChannelCase
setup do
user = insert(:user)
room = insert(:room)
{:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
{:ok, socket: socket, user: user, room: room}
end
test "joining a room", %{socket: socket, room: room} do
{:ok, reply, socket} = subscribe_and_join(
socket,
MyAppWeb.RoomChannel,
"room:#{room.id}"
)
assert reply == %{}
assert socket.assigns.room.id == room.id
end
test "sending messages", %{socket: socket, room: room} do
{:ok, _, socket} = subscribe_and_join(
socket,
MyAppWeb.RoomChannel,
"room:#{room.id}"
)
ref = push(socket, "new_message", %{"body" => "Hello"})
assert_reply ref, :ok
assert_broadcast "new_message", %{body: "Hello"}
end
endInstall with Tessl CLI
npx tessl i tessl/hex-phoenix