0
# Real-time Communication
1
2
Phoenix provides sophisticated real-time communication through WebSocket channels, enabling bidirectional messaging between clients and servers. The system supports broadcasting, presence tracking, and custom message handling with built-in fault tolerance.
3
4
## Channel System
5
6
Channels enable persistent connections between clients and the server with topic-based message routing.
7
8
### Phoenix.Channel
9
10
```elixir { .api }
11
defmodule Phoenix.Channel do
12
# Required callbacks
13
@callback join(binary, map, Phoenix.Socket.t()) ::
14
{:ok, Phoenix.Socket.t()} |
15
{:ok, map, Phoenix.Socket.t()} |
16
{:error, map}
17
18
@callback handle_in(binary, map, Phoenix.Socket.t()) ::
19
{:noreply, Phoenix.Socket.t()} |
20
{:reply, map, Phoenix.Socket.t()} |
21
{:stop, term, Phoenix.Socket.t()} |
22
{:stop, term, map, Phoenix.Socket.t()}
23
24
# Optional callbacks
25
@callback handle_out(binary, map, Phoenix.Socket.t()) ::
26
{:noreply, Phoenix.Socket.t()} |
27
{:stop, term, Phoenix.Socket.t()}
28
29
@callback handle_info(term, Phoenix.Socket.t()) ::
30
{:noreply, Phoenix.Socket.t()} |
31
{:push, map, Phoenix.Socket.t()} |
32
{:stop, term, Phoenix.Socket.t()}
33
34
@callback handle_call(term, GenServer.from(), Phoenix.Socket.t()) :: term
35
36
@callback handle_cast(term, Phoenix.Socket.t()) :: term
37
38
@callback terminate(term, Phoenix.Socket.t()) :: term
39
40
@callback code_change(term, Phoenix.Socket.t(), term) :: {:ok, Phoenix.Socket.t()}
41
42
# Public API
43
def broadcast(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
44
def broadcast!(Phoenix.Socket.t(), binary, map) :: :ok
45
def broadcast_from(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}
46
def broadcast_from!(Phoenix.Socket.t(), binary, map) :: :ok
47
48
def push(Phoenix.Socket.t(), binary, map) :: :ok
49
def reply({binary, binary}, map) :: :ok
50
def socket_ref(Phoenix.Socket.t()) :: {binary, binary}
51
end
52
```
53
54
### Usage Examples
55
56
```elixir
57
defmodule MyAppWeb.RoomChannel do
58
use Phoenix.Channel
59
60
# Handle client joining a room
61
def join("room:" <> room_id, _params, socket) do
62
if authorized?(socket.assigns.user_id, room_id) do
63
send(self(), :after_join)
64
{:ok, socket}
65
else
66
{:error, %{reason: "unauthorized"}}
67
end
68
end
69
70
# Handle messages from clients
71
def handle_in("new_message", %{"body" => body}, socket) do
72
user = MyApp.Accounts.get_user!(socket.assigns.user_id)
73
74
message = %{
75
body: body,
76
user: user.name,
77
timestamp: DateTime.utc_now()
78
}
79
80
# Broadcast to all clients in this topic
81
broadcast!(socket, "new_message", message)
82
83
# Store message in database
84
MyApp.Chat.create_message(socket.assigns.room_id, user.id, body)
85
86
{:noreply, socket}
87
end
88
89
def handle_in("typing", %{"typing" => typing}, socket) do
90
broadcast_from!(socket, "typing", %{
91
user: socket.assigns.user_name,
92
typing: typing
93
})
94
{:noreply, socket}
95
end
96
97
# Handle system messages
98
def handle_info(:after_join, socket) do
99
# Send recent messages to newly joined client
100
messages = MyApp.Chat.list_recent_messages(socket.assigns.room_id)
101
push(socket, "message_history", %{messages: messages})
102
{:noreply, socket}
103
end
104
105
# Handle client disconnection
106
def terminate(reason, socket) do
107
broadcast_from!(socket, "user_left", %{
108
user: socket.assigns.user_name
109
})
110
:ok
111
end
112
113
defp authorized?(user_id, room_id) do
114
MyApp.Chat.user_can_access_room?(user_id, room_id)
115
end
116
end
117
```
118
119
## Socket Management
120
121
Sockets manage the persistent connection and multiplex multiple channels.
122
123
### Phoenix.Socket
124
125
```elixir { .api }
126
defmodule Phoenix.Socket do
127
# Required callbacks
128
@callback connect(map, Phoenix.Socket.t(), map) ::
129
{:ok, Phoenix.Socket.t()} | :error
130
131
@callback id(Phoenix.Socket.t()) :: binary | nil
132
133
# Socket struct fields
134
defstruct [
135
:id, # socket identifier
136
:assigns, # socket assigns
137
:channel, # current channel module
138
:channel_pid, # channel process PID
139
:endpoint, # endpoint module
140
:handler, # socket handler module
141
:joined, # whether socket has joined a topic
142
:join_ref, # join reference
143
:ref, # message reference
144
:pubsub_server, # pubsub server name
145
:topic, # current topic
146
:transport, # transport name (:websocket, :longpoll)
147
:transport_pid, # transport process PID
148
:serializer # message serializer module
149
]
150
151
@type t :: %__MODULE__{}
152
end
153
```
154
155
### Usage Examples
156
157
```elixir
158
defmodule MyAppWeb.UserSocket do
159
use Phoenix.Socket
160
161
# Channels
162
channel "room:*", MyAppWeb.RoomChannel
163
channel "user:*", MyAppWeb.UserChannel
164
channel "notifications:*", MyAppWeb.NotificationChannel
165
166
# Socket authentication
167
def connect(%{"token" => token}, socket, _connect_info) do
168
case MyApp.Accounts.verify_socket_token(token) do
169
{:ok, user_id} ->
170
socket = assign(socket, :user_id, user_id)
171
{:ok, socket}
172
173
{:error, _reason} ->
174
:error
175
end
176
end
177
178
def connect(_params, _socket, _connect_info), do: :error
179
180
# Socket identification for presence tracking
181
def id(socket), do: "user:#{socket.assigns.user_id}"
182
end
183
```
184
185
## Broadcasting and PubSub
186
187
Phoenix provides a distributed PubSub system for broadcasting messages across processes and nodes.
188
189
### Broadcasting API
190
191
```elixir { .api }
192
# From Phoenix.Channel
193
def broadcast(socket_or_endpoint, topic, event, message)
194
def broadcast!(socket_or_endpoint, topic, event, message)
195
def broadcast_from(socket_or_endpoint, pid, topic, event, message)
196
def broadcast_from!(socket_or_endpoint, pid, topic, event, message)
197
198
# From Phoenix.Endpoint
199
@callback broadcast(binary, binary, term) :: :ok | {:error, term}
200
@callback broadcast!(binary, binary, term) :: :ok
201
@callback broadcast_from(pid, binary, binary, term) :: :ok | {:error, term}
202
@callback broadcast_from!(pid, binary, binary, term) :: :ok
203
@callback local_broadcast(binary, binary, term) :: :ok
204
@callback local_broadcast_from(pid, binary, binary, term) :: :ok
205
```
206
207
### Usage Examples
208
209
```elixir
210
# Broadcasting from controllers
211
defmodule MyAppWeb.PostController do
212
use Phoenix.Controller
213
214
def create(conn, %{"post" => post_params}) do
215
case MyApp.Blog.create_post(post_params) do
216
{:ok, post} ->
217
# Broadcast new post to all subscribers
218
MyAppWeb.Endpoint.broadcast!(
219
"posts:lobby",
220
"new_post",
221
%{post: post}
222
)
223
224
json(conn, %{post: post})
225
226
{:error, changeset} ->
227
conn
228
|> put_status(:unprocessable_entity)
229
|> json(%{errors: changeset})
230
end
231
end
232
end
233
234
# Broadcasting from contexts
235
defmodule MyApp.Chat do
236
def send_message(room_id, user_id, body) do
237
case create_message(room_id, user_id, body) do
238
{:ok, message} ->
239
# Broadcast to all room subscribers
240
MyAppWeb.Endpoint.broadcast!(
241
"room:#{room_id}",
242
"new_message",
243
%{message: message}
244
)
245
{:ok, message}
246
247
error ->
248
error
249
end
250
end
251
end
252
253
# Broadcasting from GenServers
254
defmodule MyApp.GameServer do
255
use GenServer
256
257
def handle_cast({:update_game_state, game_state}, state) do
258
# Broadcast game state changes
259
MyAppWeb.Endpoint.broadcast!(
260
"game:#{state.game_id}",
261
"state_updated",
262
%{state: game_state}
263
)
264
265
{:noreply, %{state | game_state: game_state}}
266
end
267
end
268
```
269
270
## Transport Configuration
271
272
Phoenix supports multiple transport protocols with fallback mechanisms.
273
274
### Transport Types
275
276
```elixir
277
# In endpoint configuration
278
socket "/socket", MyAppWeb.UserSocket,
279
websocket: true,
280
longpoll: false
281
282
# WebSocket configuration
283
socket "/socket", MyAppWeb.UserSocket,
284
websocket: [
285
timeout: 45_000,
286
transport_log: false,
287
subprotocols: ["mqtt", "v1.example.com"]
288
]
289
290
# Long polling configuration
291
socket "/socket", MyAppWeb.UserSocket,
292
longpoll: [
293
window_ms: 10_000,
294
pubsub_timeout_ms: 2_000,
295
crypto: [
296
max_age: 1_209_600 # 2 weeks
297
]
298
]
299
```
300
301
### Client-Side Connection
302
303
```javascript
304
// JavaScript client connection
305
import {Socket} from "phoenix"
306
307
let socket = new Socket("/socket", {
308
params: {token: userToken},
309
logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }
310
})
311
312
socket.connect()
313
314
// Join a channel
315
let channel = socket.channel("room:lobby", {})
316
channel.join()
317
.receive("ok", resp => { console.log("Joined successfully", resp) })
318
.receive("error", resp => { console.log("Unable to join", resp) })
319
320
// Handle messages
321
channel.on("new_message", payload => {
322
console.log("New message:", payload.body)
323
})
324
325
// Send messages
326
channel.push("new_message", {body: "Hello, World!"})
327
```
328
329
## Message Serialization
330
331
Phoenix supports multiple message serializers for different transport needs.
332
333
### Phoenix.Socket.Serializer
334
335
```elixir { .api }
336
defmodule Phoenix.Socket.Serializer do
337
@callback encode!(Phoenix.Socket.Message.t()) :: iodata
338
@callback decode!(iodata, keyword) :: Phoenix.Socket.Message.t()
339
end
340
341
# Built-in serializers
342
defmodule Phoenix.Socket.V1.JSONSerializer
343
defmodule Phoenix.Socket.V2.JSONSerializer
344
```
345
346
### Message Structure
347
348
```elixir { .api }
349
defmodule Phoenix.Socket.Message do
350
defstruct [
351
:topic, # topic string
352
:event, # event string
353
:payload, # message payload
354
:ref, # message reference
355
:join_ref # join reference
356
]
357
358
@type t :: %__MODULE__{
359
topic: binary,
360
event: binary,
361
payload: map,
362
ref: binary | nil,
363
join_ref: binary | nil
364
}
365
end
366
```
367
368
## Error Handling and Lifecycle
369
370
```elixir
371
defmodule MyAppWeb.RoomChannel do
372
use Phoenix.Channel
373
374
# Handle join errors
375
def join("room:" <> room_id, _params, socket) do
376
case MyApp.Chat.get_room(room_id) do
377
nil ->
378
{:error, %{reason: "room_not_found"}}
379
380
room ->
381
if MyApp.Chat.user_can_join?(socket.assigns.user_id, room) do
382
{:ok, assign(socket, :room, room)}
383
else
384
{:error, %{reason: "access_denied"}}
385
end
386
end
387
end
388
389
# Handle message errors
390
def handle_in("send_message", params, socket) do
391
case validate_message(params) do
392
{:ok, message_params} ->
393
# Process message
394
{:reply, {:ok, %{status: "sent"}}, socket}
395
396
{:error, errors} ->
397
{:reply, {:error, %{errors: errors}}, socket}
398
end
399
end
400
401
# Handle crashes gracefully
402
def terminate(reason, socket) do
403
Logger.info("Channel terminated: #{inspect(reason)}")
404
405
# Cleanup resources
406
MyApp.Chat.user_left_room(
407
socket.assigns.user_id,
408
socket.assigns.room.id
409
)
410
411
:ok
412
end
413
end
414
```
415
416
## Testing Channels
417
418
```elixir
419
defmodule MyAppWeb.RoomChannelTest do
420
use MyAppWeb.ChannelCase
421
422
setup do
423
user = insert(:user)
424
room = insert(:room)
425
426
{:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
427
{:ok, socket: socket, user: user, room: room}
428
end
429
430
test "joining a room", %{socket: socket, room: room} do
431
{:ok, reply, socket} = subscribe_and_join(
432
socket,
433
MyAppWeb.RoomChannel,
434
"room:#{room.id}"
435
)
436
437
assert reply == %{}
438
assert socket.assigns.room.id == room.id
439
end
440
441
test "sending messages", %{socket: socket, room: room} do
442
{:ok, _, socket} = subscribe_and_join(
443
socket,
444
MyAppWeb.RoomChannel,
445
"room:#{room.id}"
446
)
447
448
ref = push(socket, "new_message", %{"body" => "Hello"})
449
assert_reply ref, :ok
450
451
assert_broadcast "new_message", %{body: "Hello"}
452
end
453
end
454
```