0
# Distributed Presence Tracking
1
2
Phoenix.Presence provides distributed, real-time presence tracking that works across multiple nodes in a cluster. It tracks which users are online, what they're doing, and provides real-time updates when presence changes.
3
4
## Presence System Overview
5
6
Presence tracking is built on Phoenix's PubSub system and provides fault-tolerant, eventually-consistent presence information across distributed nodes.
7
8
### Phoenix.Presence
9
10
```elixir { .api }
11
defmodule Phoenix.Presence do
12
# Required callbacks
13
@callback init(term) :: {:ok, term}
14
@callback fetch(binary, map, term) :: map
15
@callback handle_metas(binary, map, map, term) :: {:ok, term}
16
17
# Tracking functions
18
def track(Phoenix.Socket.t(), binary, map) :: {:ok, binary} | {:error, term}
19
def track(pid, binary, binary, map) :: {:ok, binary} | {:error, term}
20
21
def untrack(Phoenix.Socket.t(), binary) :: :ok
22
def untrack(pid, binary, binary) :: :ok
23
24
def update(Phoenix.Socket.t(), binary, map | (map -> map)) ::
25
{:ok, binary} | {:error, term}
26
def update(pid, binary, binary, map | (map -> map)) ::
27
{:ok, binary} | {:error, term}
28
29
# Query functions
30
def list(Phoenix.Socket.t() | binary) :: map
31
def get_by_key(Phoenix.Socket.t() | binary, binary) :: map
32
33
# Note: merge/2 and diff/2 are provided by Phoenix JavaScript client, not Elixir server
34
end
35
36
# Presence metadata structure
37
@type presence_map :: %{
38
binary => %{
39
metas: [map]
40
}
41
}
42
43
@type presence_diff :: %{
44
joins: presence_map,
45
leaves: presence_map
46
}
47
```
48
49
## Setting Up Presence
50
51
Create a presence module for your application using the Phoenix.Presence behaviour.
52
53
### Creating a Presence Module
54
55
```elixir
56
# lib/my_app/presence.ex
57
defmodule MyApp.Presence do
58
@moduledoc """
59
Provides presence tracking for users across channels and processes.
60
"""
61
use Phoenix.Presence, otp_app: :my_app,
62
pubsub_server: MyApp.PubSub
63
64
@impl true
65
def init(_opts) do
66
{:ok, %{}}
67
end
68
69
@impl true
70
def fetch(_topic, presences, state) do
71
# Enrich presence data with user information
72
user_ids = presences |> Map.keys() |> Enum.map(&String.to_integer/1)
73
users = MyApp.Accounts.get_users_map(user_ids)
74
75
for {key, %{metas: metas}} <- presences, into: %{} do
76
{key, %{metas: enrich_metas(metas, users[String.to_integer(key)])}}
77
end
78
end
79
80
@impl true
81
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
82
# Broadcast presence changes
83
MyAppWeb.Endpoint.broadcast(topic, "presence_diff", %{
84
joins: joins,
85
leaves: leaves
86
})
87
88
{:ok, state}
89
end
90
91
defp enrich_metas(metas, user) when user != nil do
92
Enum.map(metas, fn meta ->
93
meta
94
|> Map.put(:user_name, user.name)
95
|> Map.put(:avatar_url, user.avatar_url)
96
end)
97
end
98
99
defp enrich_metas(metas, _user), do: metas
100
end
101
```
102
103
### Supervisor Configuration
104
105
```elixir
106
# lib/my_app/application.ex
107
defmodule MyApp.Application do
108
use Application
109
110
def start(_type, _args) do
111
children = [
112
MyApp.Repo,
113
{Phoenix.PubSub, name: MyApp.PubSub},
114
MyApp.Presence, # Add presence to supervision tree
115
MyAppWeb.Endpoint
116
]
117
118
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
119
Supervisor.start_link(children, opts)
120
end
121
end
122
```
123
124
## Tracking Presence in Channels
125
126
Use presence tracking within channels to monitor user activity and provide real-time updates.
127
128
### Channel Integration
129
130
```elixir
131
defmodule MyAppWeb.RoomChannel do
132
use Phoenix.Channel
133
134
alias MyApp.Presence
135
136
def join("room:" <> room_id, _params, socket) do
137
if authorized?(socket.assigns.user, room_id) do
138
send(self(), :after_join)
139
{:ok, assign(socket, :room_id, room_id)}
140
else
141
{:error, %{reason: "unauthorized"}}
142
end
143
end
144
145
def handle_info(:after_join, socket) do
146
user_id = to_string(socket.assigns.user.id)
147
148
# Track user presence
149
{:ok, _} = Presence.track(socket, user_id, %{
150
online_at: inspect(System.system_time(:second)),
151
status: "online",
152
device: get_device_type(socket)
153
})
154
155
# Send current presence list to newly joined user
156
presences = Presence.list(socket)
157
push(socket, "presence_state", presences)
158
159
{:noreply, socket}
160
end
161
162
def handle_in("update_status", %{"status" => status}, socket) when status in ["online", "away", "busy"] do
163
user_id = to_string(socket.assigns.user.id)
164
165
# Update presence metadata
166
{:ok, _} = Presence.update(socket, user_id, fn meta ->
167
Map.put(meta, :status, status)
168
end)
169
170
{:noreply, socket}
171
end
172
173
def handle_in("set_typing", %{"typing" => typing}, socket) do
174
user_id = to_string(socket.assigns.user.id)
175
176
# Update typing indicator in presence
177
{:ok, _} = Presence.update(socket, user_id, fn meta ->
178
Map.put(meta, :typing, typing)
179
end)
180
181
{:noreply, socket}
182
end
183
184
def terminate(_reason, socket) do
185
# Presence is automatically cleaned up when the channel process exits
186
:ok
187
end
188
189
defp get_device_type(socket) do
190
case get_in(socket.connect_info, [:user_agent]) do
191
ua when is_binary(ua) ->
192
cond do
193
String.contains?(ua, ["Mobile", "Android", "iPhone"]) -> "mobile"
194
String.contains?(ua, ["Tablet", "iPad"]) -> "tablet"
195
true -> "desktop"
196
end
197
_ -> "unknown"
198
end
199
end
200
201
defp authorized?(user, room_id) do
202
MyApp.Rooms.user_can_access?(user, room_id)
203
end
204
end
205
```
206
207
### Client-Side Presence Handling
208
209
```javascript
210
// JavaScript client handling presence
211
import {Socket, Presence} from "phoenix"
212
213
let socket = new Socket("/socket", {params: {token: userToken}})
214
socket.connect()
215
216
let channel = socket.channel("room:lobby", {})
217
let presences = {}
218
219
channel.on("presence_state", state => {
220
presences = Presence.syncState(presences, state)
221
renderUsers(presences)
222
})
223
224
channel.on("presence_diff", diff => {
225
presences = Presence.syncDiff(presences, diff)
226
renderUsers(presences)
227
})
228
229
function renderUsers(presences) {
230
let userList = Presence.list(presences, (id, {metas: [first, ...rest]}) => {
231
return {
232
id: id,
233
name: first.user_name,
234
avatar: first.avatar_url,
235
status: first.status,
236
typing: first.typing,
237
onlineAt: first.online_at,
238
count: rest.length + 1 // Multiple sessions/devices
239
}
240
})
241
242
// Update UI with user list
243
updateUserInterface(userList)
244
}
245
246
channel.join()
247
```
248
249
## Advanced Presence Patterns
250
251
### Multi-Device Tracking
252
253
Track users across multiple devices and sessions:
254
255
```elixir
256
defmodule MyAppWeb.UserChannel do
257
use Phoenix.Channel
258
alias MyApp.Presence
259
260
def join("user:" <> user_id, %{"device_id" => device_id}, socket) do
261
if socket.assigns.user.id == String.to_integer(user_id) do
262
send(self(), {:track_device, device_id})
263
{:ok, assign(socket, :device_id, device_id)}
264
else
265
{:error, %{reason: "unauthorized"}}
266
end
267
end
268
269
def handle_info({:track_device, device_id}, socket) do
270
user_id = to_string(socket.assigns.user.id)
271
272
# Use device_id as the tracking key for multi-device support
273
{:ok, _} = Presence.track(socket, "#{user_id}:#{device_id}", %{
274
user_id: user_id,
275
device_id: device_id,
276
device_type: get_device_type(socket),
277
online_at: System.system_time(:second),
278
last_seen: System.system_time(:second)
279
})
280
281
{:noreply, socket}
282
end
283
284
# Periodic heartbeat to update last_seen
285
def handle_info(:heartbeat, socket) do
286
user_id = to_string(socket.assigns.user.id)
287
device_id = socket.assigns.device_id
288
289
{:ok, _} = Presence.update(socket, "#{user_id}:#{device_id}", fn meta ->
290
Map.put(meta, :last_seen, System.system_time(:second))
291
end)
292
293
# Schedule next heartbeat
294
Process.send_after(self(), :heartbeat, 30_000)
295
{:noreply, socket}
296
end
297
end
298
```
299
300
### Presence Aggregation
301
302
Aggregate presence data across different contexts:
303
304
```elixir
305
defmodule MyApp.PresenceAggregator do
306
@moduledoc """
307
Aggregates presence data across multiple topics and provides
308
consolidated views of user activity.
309
"""
310
311
alias MyApp.Presence
312
313
def users_online_in_room(room_id) do
314
"room:#{room_id}"
315
|> Presence.list()
316
|> extract_users()
317
|> Enum.uniq_by(& &1.id)
318
end
319
320
def user_active_rooms(user_id) do
321
# Find all rooms where user is present
322
MyApp.Rooms.list_user_rooms(user_id)
323
|> Enum.filter(fn room ->
324
"room:#{room.id}"
325
|> Presence.list()
326
|> Map.has_key?(to_string(user_id))
327
end)
328
end
329
330
def global_user_count do
331
# Count unique users across all presence topics
332
Phoenix.PubSub.node_name(MyApp.PubSub)
333
|> :pg.get_local_members()
334
|> Enum.filter(&String.starts_with?(&1, "room:"))
335
|> Enum.flat_map(&Presence.list/1)
336
|> Enum.map(fn {user_id, _} -> user_id end)
337
|> Enum.uniq()
338
|> length()
339
end
340
341
def room_statistics(room_id) do
342
presences = Presence.list("room:#{room_id}")
343
344
%{
345
total_users: map_size(presences),
346
devices: count_devices(presences),
347
statuses: count_statuses(presences),
348
typing_users: count_typing_users(presences)
349
}
350
end
351
352
defp extract_users(presences) do
353
Enum.map(presences, fn {_id, %{metas: [meta | _]}} ->
354
%{
355
id: meta.user_id,
356
name: meta.user_name,
357
status: meta.status,
358
device: meta.device_type
359
}
360
end)
361
end
362
363
defp count_devices(presences) do
364
presences
365
|> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
366
|> Enum.group_by(& &1.device_type)
367
|> Enum.map(fn {device, instances} -> {device, length(instances)} end)
368
|> Enum.into(%{})
369
end
370
371
defp count_statuses(presences) do
372
presences
373
|> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
374
|> Enum.group_by(& &1.status)
375
|> Enum.map(fn {status, instances} -> {status, length(instances)} end)
376
|> Enum.into(%{})
377
end
378
379
defp count_typing_users(presences) do
380
presences
381
|> Enum.count(fn {_id, %{metas: metas}} ->
382
Enum.any?(metas, & &1[:typing])
383
end)
384
end
385
end
386
```
387
388
## Presence with LiveView
389
390
Integrate presence tracking with LiveView for real-time UI updates:
391
392
```elixir
393
defmodule MyAppWeb.RoomLive do
394
use MyAppWeb, :live_view
395
396
alias MyApp.Presence
397
398
@impl true
399
def mount(%{"id" => room_id}, session, socket) do
400
user = get_user_from_session(session)
401
402
if connected?(socket) do
403
# Subscribe to presence changes
404
MyAppWeb.Endpoint.subscribe("room:#{room_id}")
405
406
# Track user presence
407
Presence.track(self(), "room:#{room_id}", to_string(user.id), %{
408
user_name: user.name,
409
online_at: System.system_time(:second),
410
status: "online"
411
})
412
end
413
414
presences = Presence.list("room:#{room_id}")
415
416
socket =
417
socket
418
|> assign(:room_id, room_id)
419
|> assign(:user, user)
420
|> assign(:presences, presences)
421
|> assign(:online_users, format_presences(presences))
422
423
{:ok, socket}
424
end
425
426
@impl true
427
def handle_info(%{event: "presence_diff", payload: diff}, socket) do
428
presences = Presence.merge(socket.assigns.presences, diff)
429
430
socket =
431
socket
432
|> assign(:presences, presences)
433
|> assign(:online_users, format_presences(presences))
434
435
{:noreply, socket}
436
end
437
438
@impl true
439
def render(assigns) do
440
~H"""
441
<div class="room">
442
<div class="online-users">
443
<h3>Online Users (<%= length(@online_users) %>)</h3>
444
<div :for={user <- @online_users} class="user">
445
<span class={"status status-#{user.status}"}></span>
446
<%= user.name %>
447
<span class="device-count" :if={user.device_count > 1}>
448
(<%= user.device_count %> devices)
449
</span>
450
</div>
451
</div>
452
453
<div class="chat-area">
454
<!-- Chat interface -->
455
</div>
456
</div>
457
"""
458
end
459
460
defp format_presences(presences) do
461
Presence.list(presences, fn _id, %{metas: metas} ->
462
%{
463
name: List.first(metas).user_name,
464
status: List.first(metas).status,
465
device_count: length(metas)
466
}
467
end)
468
end
469
end
470
```
471
472
## Testing Presence
473
474
Test presence functionality in your channels and LiveViews:
475
476
```elixir
477
defmodule MyAppWeb.PresenceTest do
478
use MyAppWeb.ChannelCase
479
480
alias MyApp.Presence
481
482
setup do
483
user = user_fixture()
484
{:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
485
%{socket: socket, user: user}
486
end
487
488
test "tracks user presence on join", %{socket: socket, user: user} do
489
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
490
491
# User should be tracked
492
presences = Presence.list(socket)
493
user_id = to_string(user.id)
494
495
assert Map.has_key?(presences, user_id)
496
assert presences[user_id].metas |> List.first() |> Map.get(:status) == "online"
497
end
498
499
test "updates presence metadata", %{socket: socket, user: user} do
500
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
501
502
# Update status
503
push(socket, "update_status", %{"status" => "away"})
504
505
# Presence should be updated
506
presences = Presence.list(socket)
507
user_id = to_string(user.id)
508
509
assert presences[user_id].metas |> List.first() |> Map.get(:status) == "away"
510
end
511
512
test "removes presence on leave", %{socket: socket, user: user} do
513
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
514
user_id = to_string(user.id)
515
516
# Initially present
517
assert Presence.list(socket) |> Map.has_key?(user_id)
518
519
# Leave channel
520
leave(socket)
521
522
# Should no longer be present
523
refute Presence.list("room:lobby") |> Map.has_key?(user_id)
524
end
525
end
526
```
527
528
## Performance Considerations
529
530
### Optimizing Presence
531
532
```elixir
533
defmodule MyApp.OptimizedPresence do
534
use Phoenix.Presence, otp_app: :my_app,
535
pubsub_server: MyApp.PubSub
536
537
@impl true
538
def fetch(topic, presences, state) do
539
# Batch database queries for better performance
540
user_ids =
541
presences
542
|> Map.keys()
543
|> Enum.map(&String.to_integer/1)
544
|> Enum.uniq()
545
546
# Use GenServer or ETS cache for frequently accessed data
547
users = get_cached_users(user_ids)
548
549
for {key, %{metas: metas}} <- presences, into: %{} do
550
user_id = String.to_integer(key)
551
user = users[user_id]
552
553
enhanced_metas =
554
metas
555
|> Enum.map(&enhance_meta(&1, user))
556
|> limit_metas(5) # Limit metadata to prevent memory issues
557
558
{key, %{metas: enhanced_metas}}
559
end
560
end
561
562
defp get_cached_users(user_ids) do
563
# Implement caching strategy (ETS, GenServer, etc.)
564
MyApp.Cache.get_users(user_ids)
565
end
566
567
defp enhance_meta(meta, user) when user != nil do
568
Map.merge(meta, %{
569
user_name: user.name,
570
avatar_url: user.avatar_url
571
})
572
end
573
574
defp enhance_meta(meta, _user), do: meta
575
576
defp limit_metas(metas, limit) when length(metas) > limit do
577
metas |> Enum.take(limit)
578
end
579
580
defp limit_metas(metas, _limit), do: metas
581
end
582
```