CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/hex-phoenix

Peace of mind from prototype to production - comprehensive web framework for Elixir

Overview
Eval results
Files

presence.mddocs/

Distributed Presence Tracking

Phoenix.Presence provides distributed, real-time presence tracking that works across multiple nodes in a cluster. It tracks which users are online, what they're doing, and provides real-time updates when presence changes.

Presence System Overview

Presence tracking is built on Phoenix's PubSub system and provides fault-tolerant, eventually-consistent presence information across distributed nodes.

Phoenix.Presence

defmodule Phoenix.Presence do
  # Required callbacks
  @callback init(term) :: {:ok, term}
  @callback fetch(binary, map, term) :: map
  @callback handle_metas(binary, map, map, term) :: {:ok, term}

  # Tracking functions
  def track(Phoenix.Socket.t(), binary, map) :: {:ok, binary} | {:error, term}
  def track(pid, binary, binary, map) :: {:ok, binary} | {:error, term}

  def untrack(Phoenix.Socket.t(), binary) :: :ok
  def untrack(pid, binary, binary) :: :ok

  def update(Phoenix.Socket.t(), binary, map | (map -> map)) ::
    {:ok, binary} | {:error, term}
  def update(pid, binary, binary, map | (map -> map)) ::
    {:ok, binary} | {:error, term}

  # Query functions
  def list(Phoenix.Socket.t() | binary) :: map
  def get_by_key(Phoenix.Socket.t() | binary, binary) :: map

  # Note: merge/2 and diff/2 are provided by Phoenix JavaScript client, not Elixir server
end

# Presence metadata structure
@type presence_map :: %{
  binary => %{
    metas: [map]
  }
}

@type presence_diff :: %{
  joins: presence_map,
  leaves: presence_map
}

Setting Up Presence

Create a presence module for your application using the Phoenix.Presence behaviour.

Creating a Presence Module

# lib/my_app/presence.ex
defmodule MyApp.Presence do
  @moduledoc """
  Provides presence tracking for users across channels and processes.
  """
  use Phoenix.Presence, otp_app: :my_app,
                        pubsub_server: MyApp.PubSub

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def fetch(_topic, presences, state) do
    # Enrich presence data with user information
    user_ids = presences |> Map.keys() |> Enum.map(&String.to_integer/1)
    users = MyApp.Accounts.get_users_map(user_ids)

    for {key, %{metas: metas}} <- presences, into: %{} do
      {key, %{metas: enrich_metas(metas, users[String.to_integer(key)])}}
    end
  end

  @impl true
  def handle_metas(topic, %{joins: joins, leaves: leaves}, presences, state) do
    # Broadcast presence changes
    MyAppWeb.Endpoint.broadcast(topic, "presence_diff", %{
      joins: joins,
      leaves: leaves
    })

    {:ok, state}
  end

  defp enrich_metas(metas, user) when user != nil do
    Enum.map(metas, fn meta ->
      meta
      |> Map.put(:user_name, user.name)
      |> Map.put(:avatar_url, user.avatar_url)
    end)
  end

  defp enrich_metas(metas, _user), do: metas
end

Supervisor Configuration

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.Repo,
      {Phoenix.PubSub, name: MyApp.PubSub},
      MyApp.Presence,  # Add presence to supervision tree
      MyAppWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Tracking Presence in Channels

Use presence tracking within channels to monitor user activity and provide real-time updates.

Channel Integration

defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel

  alias MyApp.Presence

  def join("room:" <> room_id, _params, socket) do
    if authorized?(socket.assigns.user, room_id) do
      send(self(), :after_join)
      {:ok, assign(socket, :room_id, room_id)}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  def handle_info(:after_join, socket) do
    user_id = to_string(socket.assigns.user.id)

    # Track user presence
    {:ok, _} = Presence.track(socket, user_id, %{
      online_at: inspect(System.system_time(:second)),
      status: "online",
      device: get_device_type(socket)
    })

    # Send current presence list to newly joined user
    presences = Presence.list(socket)
    push(socket, "presence_state", presences)

    {:noreply, socket}
  end

  def handle_in("update_status", %{"status" => status}, socket) when status in ["online", "away", "busy"] do
    user_id = to_string(socket.assigns.user.id)

    # Update presence metadata
    {:ok, _} = Presence.update(socket, user_id, fn meta ->
      Map.put(meta, :status, status)
    end)

    {:noreply, socket}
  end

  def handle_in("set_typing", %{"typing" => typing}, socket) do
    user_id = to_string(socket.assigns.user.id)

    # Update typing indicator in presence
    {:ok, _} = Presence.update(socket, user_id, fn meta ->
      Map.put(meta, :typing, typing)
    end)

    {:noreply, socket}
  end

  def terminate(_reason, socket) do
    # Presence is automatically cleaned up when the channel process exits
    :ok
  end

  defp get_device_type(socket) do
    case get_in(socket.connect_info, [:user_agent]) do
      ua when is_binary(ua) ->
        cond do
          String.contains?(ua, ["Mobile", "Android", "iPhone"]) -> "mobile"
          String.contains?(ua, ["Tablet", "iPad"]) -> "tablet"
          true -> "desktop"
        end
      _ -> "unknown"
    end
  end

  defp authorized?(user, room_id) do
    MyApp.Rooms.user_can_access?(user, room_id)
  end
end

Client-Side Presence Handling

// JavaScript client handling presence
import {Socket, Presence} from "phoenix"

let socket = new Socket("/socket", {params: {token: userToken}})
socket.connect()

let channel = socket.channel("room:lobby", {})
let presences = {}

channel.on("presence_state", state => {
  presences = Presence.syncState(presences, state)
  renderUsers(presences)
})

channel.on("presence_diff", diff => {
  presences = Presence.syncDiff(presences, diff)
  renderUsers(presences)
})

function renderUsers(presences) {
  let userList = Presence.list(presences, (id, {metas: [first, ...rest]}) => {
    return {
      id: id,
      name: first.user_name,
      avatar: first.avatar_url,
      status: first.status,
      typing: first.typing,
      onlineAt: first.online_at,
      count: rest.length + 1  // Multiple sessions/devices
    }
  })

  // Update UI with user list
  updateUserInterface(userList)
}

channel.join()

Advanced Presence Patterns

Multi-Device Tracking

Track users across multiple devices and sessions:

defmodule MyAppWeb.UserChannel do
  use Phoenix.Channel
  alias MyApp.Presence

  def join("user:" <> user_id, %{"device_id" => device_id}, socket) do
    if socket.assigns.user.id == String.to_integer(user_id) do
      send(self(), {:track_device, device_id})
      {:ok, assign(socket, :device_id, device_id)}
    else
      {:error, %{reason: "unauthorized"}}
    end
  end

  def handle_info({:track_device, device_id}, socket) do
    user_id = to_string(socket.assigns.user.id)

    # Use device_id as the tracking key for multi-device support
    {:ok, _} = Presence.track(socket, "#{user_id}:#{device_id}", %{
      user_id: user_id,
      device_id: device_id,
      device_type: get_device_type(socket),
      online_at: System.system_time(:second),
      last_seen: System.system_time(:second)
    })

    {:noreply, socket}
  end

  # Periodic heartbeat to update last_seen
  def handle_info(:heartbeat, socket) do
    user_id = to_string(socket.assigns.user.id)
    device_id = socket.assigns.device_id

    {:ok, _} = Presence.update(socket, "#{user_id}:#{device_id}", fn meta ->
      Map.put(meta, :last_seen, System.system_time(:second))
    end)

    # Schedule next heartbeat
    Process.send_after(self(), :heartbeat, 30_000)
    {:noreply, socket}
  end
end

Presence Aggregation

Aggregate presence data across different contexts:

defmodule MyApp.PresenceAggregator do
  @moduledoc """
  Aggregates presence data across multiple topics and provides
  consolidated views of user activity.
  """

  alias MyApp.Presence

  def users_online_in_room(room_id) do
    "room:#{room_id}"
    |> Presence.list()
    |> extract_users()
    |> Enum.uniq_by(& &1.id)
  end

  def user_active_rooms(user_id) do
    # Find all rooms where user is present
    MyApp.Rooms.list_user_rooms(user_id)
    |> Enum.filter(fn room ->
      "room:#{room.id}"
      |> Presence.list()
      |> Map.has_key?(to_string(user_id))
    end)
  end

  def global_user_count do
    # Count unique users across all presence topics
    Phoenix.PubSub.node_name(MyApp.PubSub)
    |> :pg.get_local_members()
    |> Enum.filter(&String.starts_with?(&1, "room:"))
    |> Enum.flat_map(&Presence.list/1)
    |> Enum.map(fn {user_id, _} -> user_id end)
    |> Enum.uniq()
    |> length()
  end

  def room_statistics(room_id) do
    presences = Presence.list("room:#{room_id}")

    %{
      total_users: map_size(presences),
      devices: count_devices(presences),
      statuses: count_statuses(presences),
      typing_users: count_typing_users(presences)
    }
  end

  defp extract_users(presences) do
    Enum.map(presences, fn {_id, %{metas: [meta | _]}} ->
      %{
        id: meta.user_id,
        name: meta.user_name,
        status: meta.status,
        device: meta.device_type
      }
    end)
  end

  defp count_devices(presences) do
    presences
    |> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
    |> Enum.group_by(& &1.device_type)
    |> Enum.map(fn {device, instances} -> {device, length(instances)} end)
    |> Enum.into(%{})
  end

  defp count_statuses(presences) do
    presences
    |> Enum.flat_map(fn {_id, %{metas: metas}} -> metas end)
    |> Enum.group_by(& &1.status)
    |> Enum.map(fn {status, instances} -> {status, length(instances)} end)
    |> Enum.into(%{})
  end

  defp count_typing_users(presences) do
    presences
    |> Enum.count(fn {_id, %{metas: metas}} ->
      Enum.any?(metas, & &1[:typing])
    end)
  end
end

Presence with LiveView

Integrate presence tracking with LiveView for real-time UI updates:

defmodule MyAppWeb.RoomLive do
  use MyAppWeb, :live_view

  alias MyApp.Presence

  @impl true
  def mount(%{"id" => room_id}, session, socket) do
    user = get_user_from_session(session)

    if connected?(socket) do
      # Subscribe to presence changes
      MyAppWeb.Endpoint.subscribe("room:#{room_id}")

      # Track user presence
      Presence.track(self(), "room:#{room_id}", to_string(user.id), %{
        user_name: user.name,
        online_at: System.system_time(:second),
        status: "online"
      })
    end

    presences = Presence.list("room:#{room_id}")

    socket =
      socket
      |> assign(:room_id, room_id)
      |> assign(:user, user)
      |> assign(:presences, presences)
      |> assign(:online_users, format_presences(presences))

    {:ok, socket}
  end

  @impl true
  def handle_info(%{event: "presence_diff", payload: diff}, socket) do
    presences = Presence.merge(socket.assigns.presences, diff)

    socket =
      socket
      |> assign(:presences, presences)
      |> assign(:online_users, format_presences(presences))

    {:noreply, socket}
  end

  @impl true
  def render(assigns) do
    ~H"""
    <div class="room">
      <div class="online-users">
        <h3>Online Users (<%= length(@online_users) %>)</h3>
        <div :for={user <- @online_users} class="user">
          <span class={"status status-#{user.status}"}></span>
          <%= user.name %>
          <span class="device-count" :if={user.device_count > 1}>
            (<%= user.device_count %> devices)
          </span>
        </div>
      </div>

      <div class="chat-area">
        <!-- Chat interface -->
      </div>
    </div>
    """
  end

  defp format_presences(presences) do
    Presence.list(presences, fn _id, %{metas: metas} ->
      %{
        name: List.first(metas).user_name,
        status: List.first(metas).status,
        device_count: length(metas)
      }
    end)
  end
end

Testing Presence

Test presence functionality in your channels and LiveViews:

defmodule MyAppWeb.PresenceTest do
  use MyAppWeb.ChannelCase

  alias MyApp.Presence

  setup do
    user = user_fixture()
    {:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})
    %{socket: socket, user: user}
  end

  test "tracks user presence on join", %{socket: socket, user: user} do
    {:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")

    # User should be tracked
    presences = Presence.list(socket)
    user_id = to_string(user.id)

    assert Map.has_key?(presences, user_id)
    assert presences[user_id].metas |> List.first() |> Map.get(:status) == "online"
  end

  test "updates presence metadata", %{socket: socket, user: user} do
    {:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")

    # Update status
    push(socket, "update_status", %{"status" => "away"})

    # Presence should be updated
    presences = Presence.list(socket)
    user_id = to_string(user.id)

    assert presences[user_id].metas |> List.first() |> Map.get(:status) == "away"
  end

  test "removes presence on leave", %{socket: socket, user: user} do
    {:ok, _, socket} = subscribe_and_join(socket, MyAppWeb.RoomChannel, "room:lobby")
    user_id = to_string(user.id)

    # Initially present
    assert Presence.list(socket) |> Map.has_key?(user_id)

    # Leave channel
    leave(socket)

    # Should no longer be present
    refute Presence.list("room:lobby") |> Map.has_key?(user_id)
  end
end

Performance Considerations

Optimizing Presence

defmodule MyApp.OptimizedPresence do
  use Phoenix.Presence, otp_app: :my_app,
                        pubsub_server: MyApp.PubSub

  @impl true
  def fetch(topic, presences, state) do
    # Batch database queries for better performance
    user_ids =
      presences
      |> Map.keys()
      |> Enum.map(&String.to_integer/1)
      |> Enum.uniq()

    # Use GenServer or ETS cache for frequently accessed data
    users = get_cached_users(user_ids)

    for {key, %{metas: metas}} <- presences, into: %{} do
      user_id = String.to_integer(key)
      user = users[user_id]

      enhanced_metas =
        metas
        |> Enum.map(&enhance_meta(&1, user))
        |> limit_metas(5)  # Limit metadata to prevent memory issues

      {key, %{metas: enhanced_metas}}
    end
  end

  defp get_cached_users(user_ids) do
    # Implement caching strategy (ETS, GenServer, etc.)
    MyApp.Cache.get_users(user_ids)
  end

  defp enhance_meta(meta, user) when user != nil do
    Map.merge(meta, %{
      user_name: user.name,
      avatar_url: user.avatar_url
    })
  end

  defp enhance_meta(meta, _user), do: meta

  defp limit_metas(metas, limit) when length(metas) > limit do
    metas |> Enum.take(limit)
  end

  defp limit_metas(metas, _limit), do: metas
end

Install with Tessl CLI

npx tessl i tessl/hex-phoenix

docs

code-generation.md

index.md

presence.md

real-time.md

security.md

testing.md

web-foundation.md

tile.json