Curated library of 38 atomic skills, 7 personas, and 1 orchestrator for Elixir and Phoenix development. Organized by category: fundamentals, phoenix, database, testing, auth, infrastructure, quality, security, integrations, tooling, frameworks, personas, and orchestration. Covers core Elixir patterns, Phoenix LiveView, Ecto, OTP, Oban, testing, security, deployment, real-time, and modern tooling (Req, Swoosh, Cachex, Broadway, Ash).
73
91%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Advisory
Suggest reviewing before use
Use this skill before writing ANY PubSub or real-time broadcast code.
mount — guard with if connected?(socket) to prevent duplicate subscriptionsbroadcast/2 helper that fires only on {:ok, result}handle_info/2 — update assigns immutably with update/3defmodule MyAppWeb.PostLive.Index do
use MyAppWeb, :live_view
@impl true
def mount(_params, _session, socket) do
if connected?(socket) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "posts")
end
{:ok, assign(socket, :posts, list_posts())}
end
@impl true
def handle_info({:post_created, post}, socket) do
{:noreply, update(socket, :posts, fn posts -> [post | posts] end)}
end
@impl true
def handle_info({:post_updated, post}, socket) do
{:noreply,
update(socket, :posts, fn posts ->
Enum.map(posts, fn
p when p.id == post.id -> post
p -> p
end)
end)}
end
@impl true
def handle_info({:post_deleted, post}, socket) do
{:noreply,
update(socket, :posts, fn posts ->
Enum.reject(posts, &(&1.id == post.id))
end)}
end
endBroadcast from contexts, not LiveViews — keeps real-time logic in the business layer. Topic naming conventions:
"posts" — collection-wide; events: {:post_created, post}, {:post_updated, post}, {:post_deleted, post}"posts:#{post.id}" — specific resource; events: {:post_updated, post}, {:comment_added, comment}"users:#{user.id}" — user-scoped; events: {:notification, notification}, {:message_received, message}defmodule MyApp.Blog do
def create_post(attrs) do
%Post{}
|> Post.changeset(attrs)
|> Repo.insert()
|> broadcast(:post_created)
end
def update_post(%Post{} = post, attrs) do
post
|> Post.changeset(attrs)
|> Repo.update()
|> broadcast(:post_updated)
end
def delete_post(%Post{} = post) do
post
|> Repo.delete()
|> broadcast(:post_deleted)
end
# Only broadcast on success
defp broadcast({:ok, post}, event) do
Phoenix.PubSub.broadcast(MyApp.PubSub, "posts", {event, post})
{:ok, post}
end
defp broadcast({:error, changeset}, _event) do
{:error, changeset}
end
endTest by calling context functions and asserting the LiveView reflects the update — do not test PubSub.broadcast in isolation.
defmodule MyAppWeb.PostLive.IndexTest do
use MyAppWeb.ConnCase, async: true
import Phoenix.LiveViewTest
test "creates a post and LiveView updates in real time", %{conn: conn} do
{:ok, view, _html} = live(conn, ~p"/posts")
# Call the context function — it broadcasts internally
{:ok, post} = MyApp.Blog.create_post(%{title: "Hello", body: "World"})
# Assert the LiveView received and rendered the broadcast
assert render(view) =~ post.title
end
test "deletes a post and LiveView removes it", %{conn: conn} do
post = insert(:post)
{:ok, view, _html} = live(conn, ~p"/posts")
{:ok, _} = MyApp.Blog.delete_post(post)
refute render(view) =~ post.title
end
endif connected?(socket) only run after WebSocket upgrade, not on the initial static render.subscribe and broadcast match exactly (case-sensitive). Add a temporary IO.inspect in handle_info/2 to confirm the message is arriving.if connected?(socket) guard — the static render and the live render both subscribed.handle_info clause missing? An unhandled PubSub message will crash the LiveView process. Add a catch-all def handle_info(_, socket), do: {:noreply, socket} if other processes may send unexpected messages.| Predecessor | This Skill | Successor |
|---|---|---|
| phoenix-liveview-essentials | phoenix-pubsub-patterns | testing-essentials |