Peace of mind from prototype to production - comprehensive web framework for Elixir
Phoenix.Presence provides distributed, real-time presence tracking that works across multiple nodes in a cluster. It tracks which users are online, what they're doing, and provides real-time updates when presence changes.
Presence tracking is built on Phoenix's PubSub system and provides fault-tolerant, eventually-consistent presence information across distributed nodes.
defmodule Phoenix.Presence do
# Required callbacks
@callback init(term) :: {:ok, term}
@callback fetch(binary, map, term) :: map
@callback handle_metas(binary, map, map, term) :: {:ok, term}
# Tracking functions
def track(Phoenix.Socket.t(), binary, map) :: {:ok, binary} | {:error, term}
def track(pid, binary, binary, map) :: {:ok, binary} | {:error, term}
def untrack(Phoenix.Socket.t(), binary) :: :ok
def untrack(pid, binary, binary) :: :ok
def update(Phoenix.Socket.t(), binary, map | (map -> map)) ::
{:ok, binary} | {:error, term}
def update(pid, binary, binary, map | (map -> map)) ::
{:ok, binary} | {:error, term}
# Query functions
def list(Phoenix.Socket.t() | binary) :: map
def get_by_key(Phoenix.Socket.t() | binary, binary) :: map
# Note: merge/2 and diff/2 are provided by Phoenix JavaScript client, not Elixir server
end
# Presence metadata structure
@type presence_map :: %{
binary => %{
metas: [map]
}
}
@type presence_diff :: %{
joins: presence_map,
leaves: presence_map
}Create a presence module for your application using the Phoenix.Presence behaviour.
# lib/my_app/presence.ex
defmodule MyApp.Presence do
@moduledoc """
Provides presence tracking for users across channels and processes.
"""
use Phoenix.Presence, otp_app: :my_app,
pubsub_server: MyApp.PubSub
@impl true
def init(_opts) do
{:ok, %{}}
end
@impl true
def fetch(_topic, presences, state) do
# Enrich presence data with user information
user_ids = presences |> Map.keys() |> Enum.map(&String.to_integer/1)
users = MyApp.Accounts.get_users_map(user_ids)
for {key, %{metas: metas}} <- presences, into: %{} do
{key, %{metas: enrich_metas(metas, users[String.to_integer(key)])}}
end
end
@impl true
def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
# Broadcast presence changes
MyAppWeb.Endpoint.broadcast(topic, "presence_diff", %{
joins: joins,
leaves: leaves
})
{:ok, state}
end
defp enrich_metas(metas, user) when user != nil do
Enum.map(metas, fn meta ->
meta
|> Map.put(:user_name, user.name)
|> Map.put(:avatar_url, user.avatar_url)
end)
end
defp enrich_metas(metas, _user), do: metas
end# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
MyApp.Presence, # Add presence to supervision tree
MyAppWeb.Endpoint
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endUse presence tracking within channels to monitor user activity and provide real-time updates.
defmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
alias MyApp.Presence
def join("room:" <> room_id, _params, socket) do
if authorized?(socket.assigns.user, room_id) do
send(self(), :after_join)
{:ok, assign(socket, :room_id, room_id)}
else
{:error, %{reason: "unauthorized"}}
end
end
def handle_info(:after_join, socket) do
user_id = to_string(socket.assigns.user.id)
# Track user presence
{:ok, _} = Presence.track(socket, user_id, %{
online_at: inspect(System.system_time(:second)),
status: "online",
device: get_device_type(socket)
})
# Send current presence list to newly joined user
presences = Presence.list(socket)
push(socket, "presence_state", presences)
{:noreply, socket}
end
def handle_in("update_status", %{"status" => status}, socket) when status in ["online", "away", "busy"] do
user_id = to_string(socket.assigns.user.id)
# Update presence metadata
{:ok, _} = Presence.update(socket, user_id, fn meta ->
Map.put(meta, :status, status)
end)
{:noreply, socket}
end
def handle_in("set_typing", %{"typing" => typing}, socket) do
user_id = to_string(socket.assigns.user.id)
# Update typing indicator in presence
{:ok, _} = Presence.update(socket, user_id, fn meta ->
Map.put(meta, :typing, typing)
end)
{:noreply, socket}
end
def terminate(_reason, socket) do
# Presence is automatically cleaned up when the channel process exits
:ok
end
defp get_device_type(socket) do
case get_in(socket.connect_info, [:user_agent]) do
ua when is_binary(ua) ->
cond do
String.contains?(ua, ["Mobile", "Android", "iPhone"]) -> "mobile"
String.contains?(ua, ["Tablet", "iPad"]) -> "tablet"
true -> "desktop"
end
_ -> "unknown"
end
end
defp authorized?(user, room_id) do
MyApp.Rooms.user_can_access?(user, room_id)
end
end// JavaScript client handling presence
import {Socket, Presence} from "phoenix"
let socket = new Socket("/socket", {params: {token: userToken}})
socket.connect()
let channel = socket.channel("room:lobby", {})
let presences = {}
channel.on("presence_state", state => {
presences = Presence.syncState(presences, state)
renderUsers(presences)
})
channel.on("presence_diff", diff => {
presences = Presence.syncDiff(presences, diff)
renderUsers(presences)
})
function renderUsers(presences) {
let userList = Presence.list(presences, (id, {metas: [first, ...rest]}) => {
return {
id: id,
name: first.user_name,
avatar: first.avatar_url,
status: first.status,
typing: first.typing,
onlineAt: first.online_at,
count: rest.length + 1 // Multiple sessions/devices
}
})
// Update UI with user list
updateUserInterface(userList)
}
channel.join()Track users across multiple devices and sessions:
defmodule MyAppWeb.UserChannel do
use Phoenix.Channel
alias MyApp.Presence
def join("user:" <> user_id, %{"device_id" => device_id}, socket) do
if socket.assigns.user.id == String.to_integer(user_id) do
send(self(), {:track_device, device_id})
{:ok, assign(socket, :device_id, device_id)}
else
{:error, %{reason: "unauthorized"}}
end
end
def handle_info({:track_device, device_id}, socket) do
user_id = to_string(socket.assigns.user.id)
# Use device_id as the tracking key for multi-device support
{:ok, _} = Presence.track(socket, "#{user_id}:#{device_id}", %{
user_id: user_id,
device_id: device_id,
device_type: get_device_type(socket),
online_at: System.system_time(:second),
last_seen: System.system_time(:second)
})
{:noreply, socket}
end
# Periodic heartbeat to update last_seen
def handle_info(:heartbeat, socket) do
user_id = to_string(socket.assigns.user.id)
device_id = socket.assigns.device_id
{:ok, _} = Presence.update(socket, "#{user_id}:#{device_id}", fn meta ->
Map.put(meta, :last_seen, System.system_time(:second))
end)
# Schedule next heartbeat
Process.send_after(self(), :heartbeat, 30_000)
{:noreply, socket}
end
endAggregate presence data across different contexts:
defmodule MyApp.PresenceAggregator do
@moduledoc """
Aggregates presence data across multiple topics and provides
consolidated views of user activity.
"""
alias MyApp.Presence
def users_online_in_room(room_id) do
"room:#{room_id}"
|> Presence.list()
|> extract_users()
|> Enum.uniq_by(& &1.id)
end
def user_active_rooms(user_id) do
# Find all rooms where user is present
MyApp.Rooms.list_user_rooms(user_id)
|> Enum.filter(fn room ->
"room:#{room.id}"
|> Presence.list()
|> Map.has_key?(to_string(user_id))
end)
end
def global_user_count do
# Count unique users across all presence topics
Phoenix.PubSub.node_name(MyApp.PubSub)
|> :pg.get_local_members()
|> Enum.filter(&String.starts_with?(&1, "room:"))
|> Enum.flat_map(&Presence.list/1)
|> Enum.map(fn {user_id, _} -> user_id end)
|> Enum.uniq()
|> length()
end
def room_statistics(room_id) do
presences = Presence.list("room:#{room_id}")
%{
total_users: map_size(presences),
devices: count_devices(presences),
statuses: count_statuses(presences),
typing_users: count_typing_users(presences)
}
end
defp extract_users(presences) do
Enum.map(presences, fn {_id, %{metas: [meta | _]}} ->
%{
id: meta.user_id,
name: meta.user_name,
status: meta.status,
device: meta.device_type
}
end)
end
defp count_devices(presences) do
presences
|> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
|> Enum.group_by(& &1.device_type)
|> Enum.map(fn {device, instances} -> {device, length(instances)} end)
|> Enum.into(%{})
end
defp count_statuses(presences) do
presences
|> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
|> Enum.group_by(& &1.status)
|> Enum.map(fn {status, instances} -> {status, length(instances)} end)
|> Enum.into(%{})
end
defp count_typing_users(presences) do
presences
|> Enum.count(fn {_id, %{metas: metas}} ->
Enum.any?(metas, & &1[:typing])
end)
end
endIntegrate presence tracking with LiveView for real-time UI updates:
defmodule MyAppWeb.RoomLive do
use MyAppWeb, :live_view
alias MyApp.Presence
@impl true
def mount(%{"id" => room_id}, session, socket) do
user = get_user_from_session(session)
if connected?(socket) do
# Subscribe to presence changes
MyAppWeb.Endpoint.subscribe("room:#{room_id}")
# Track user presence
Presence.track(self(), "room:#{room_id}", to_string(user.id), %{
user_name: user.name,
online_at: System.system_time(:second),
status: "online"
})
end
presences = Presence.list("room:#{room_id}")
socket =
socket
|> assign(:room_id, room_id)
|> assign(:user, user)
|> assign(:presences, presences)
|> assign(:online_users, format_presences(presences))
{:ok, socket}
end
@impl true
def handle_info(%{event: "presence_diff", payload: diff}, socket) do
presences = Presence.merge(socket.assigns.presences, diff)
socket =
socket
|> assign(:presences, presences)
|> assign(:online_users, format_presences(presences))
{:noreply, socket}
end
@impl true
def render(assigns) do
~H"""
<div class="room">
<div class="online-users">
<h3>Online Users (<%= length(@online_users) %>)</h3>
<div :for={user <- @online_users} class="user">
<span class={"status status-#{user.status}"}></span>
<%= user.name %>
<span class="device-count" :if={user.device_count > 1}>
(<%= user.device_count %> devices)
</span>
</div>
</div>
<div class="chat-area">
<!-- Chat interface -->
</div>
</div>
"""
end
defp format_presences(presences) do
Presence.list(presences, fn _id, %{metas: metas} ->
%{
name: List.first(metas).user_name,
status: List.first(metas).status,
device_count: length(metas)
}
end)
end
endTest presence functionality in your channels and LiveViews:
defmodule MyAppWeb.PresenceTest do
use MyAppWeb.ChannelCase
alias MyApp.Presence
setup do
user = user_fixture()
{:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
%{socket: socket, user: user}
end
test "tracks user presence on join", %{socket: socket, user: user} do
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
# User should be tracked
presences = Presence.list(socket)
user_id = to_string(user.id)
assert Map.has_key?(presences, user_id)
assert presences[user_id].metas |> List.first() |> Map.get(:status) == "online"
end
test "updates presence metadata", %{socket: socket, user: user} do
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
# Update status
push(socket, "update_status", %{"status" => "away"})
# Presence should be updated
presences = Presence.list(socket)
user_id = to_string(user.id)
assert presences[user_id].metas |> List.first() |> Map.get(:status) == "away"
end
test "removes presence on leave", %{socket: socket, user: user} do
{:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
user_id = to_string(user.id)
# Initially present
assert Presence.list(socket) |> Map.has_key?(user_id)
# Leave channel
leave(socket)
# Should no longer be present
refute Presence.list("room:lobby") |> Map.has_key?(user_id)
end
enddefmodule MyApp.OptimizedPresence do
use Phoenix.Presence, otp_app: :my_app,
pubsub_server: MyApp.PubSub
@impl true
def fetch(topic, presences, state) do
# Batch database queries for better performance
user_ids =
presences
|> Map.keys()
|> Enum.map(&String.to_integer/1)
|> Enum.uniq()
# Use GenServer or ETS cache for frequently accessed data
users = get_cached_users(user_ids)
for {key, %{metas: metas}} <- presences, into: %{} do
user_id = String.to_integer(key)
user = users[user_id]
enhanced_metas =
metas
|> Enum.map(&enhance_meta(&1, user))
|> limit_metas(5) # Limit metadata to prevent memory issues
{key, %{metas: enhanced_metas}}
end
end
defp get_cached_users(user_ids) do
# Implement caching strategy (ETS, GenServer, etc.)
MyApp.Cache.get_users(user_ids)
end
defp enhance_meta(meta, user) when user != nil do
Map.merge(meta, %{
user_name: user.name,
avatar_url: user.avatar_url
})
end
defp enhance_meta(meta, _user), do: meta
defp limit_metas(metas, limit) when length(metas) > limit do
metas |> Enum.take(limit)
end
defp limit_metas(metas, _limit), do: metas
endInstall with Tessl CLI
npx tessl i tessl/hex-phoenix