Curated library of 38 atomic skills, 7 personas, and 1 orchestrator for Elixir and Phoenix development. Organized by category: fundamentals, phoenix, database, testing, auth, infrastructure, quality, security, integrations, tooling, frameworks, personas, and orchestration. Covers core Elixir patterns, Phoenix LiveView, Ecto, OTP, Oban, testing, security, deployment, real-time, and modern tooling (Req, Swoosh, Cachex, Broadway, Ash).
73
91%
Does it follow best practices?
Impact
—
No eval scenarios have been run
Advisory
Suggest reviewing before use
Broadway.Message.failed/2 for errors — never raise in handle_message/3handle_failed/2 — dead-letter handling must be explicit for every pipeline:status in start_link — set :max_restarts, :max_seconds for production resilienceBroadway.Test.push_message/2 — verify each message type including failuresFollow these steps in order when building a new Broadway pipeline:
broadway (and any producer library) to mix.exshandle_message/3 and handle_batch/4 callbacksapplication.exBroadway.Test helpers before scaling concurrencyhandle_failed/2 firesbroadway_dashboard; see Broadway Telemetry docs and broadway_dashboardProducer libraries: For SQS use
broadway_sqs, for Kafka usebroadway_kafka, for RabbitMQ usebroadway_rabbitmq. See each library's README for producer-specific configuration.
# mix.exs
defp deps do
[
{:broadway, "~> 1.0"},
{:broadway_dashboard, "~> 0.3"} # Optional: LiveDashboard integration
]
enddefmodule MyApp.MessagePipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {BroadwaySQS.Producer, queue_url: System.get_env("SQS_QUEUE_URL")}
],
processors: [
default: [concurrency: 10]
],
batchers: [
default: [concurrency: 5, batch_size: 100, batch_timeout: 2000]
]
)
end
@impl true
def handle_message(_, message, _context) do
case process(message.data) do
{:ok, result} ->
message
|> Broadway.Message.update_data(fn _ -> result end)
|> Broadway.Message.put_batcher(:default)
{:error, reason} ->
Broadway.Message.failed(message, reason)
end
end
@impl true
def handle_failed(messages, _context) do
Enum.each(messages, fn message ->
Logger.error("Message failed: #{inspect(message.data)}")
DeadLetterQueue.send(message.data, message.status.reason)
end)
messages
end
@impl true
def handle_batch(:default, messages, _batch_info, _context) do
data = Enum.map(messages, & &1.data)
case MyApp.Repo.insert_all(MyApp.Record, data) do
{_count, _} ->
messages
{:error, reason} ->
Logger.error("Batch failed: #{inspect(reason)}")
Enum.map(messages, &Broadway.Message.failed(&1, reason))
end
end
defp process(%{"body" => body} = data) do
sanitized = %{data | "body" => String.slice(body || "", 0, 10_000)}
{:ok, Map.put(sanitized, :processed_at, DateTime.utc_now())}
end
defp process(data) when is_map(data) do
{:ok, Map.put(data, :processed_at, DateTime.utc_now())}
end
defp process(data) when is_binary(data) do
case Jason.decode(data) do
{:ok, parsed} -> process(parsed)
{:error, _} -> {:error, :invalid_json}
end
end
end# lib/my_app/application.ex
def start(_type, _args) do
children = [
# ...
MyApp.MessagePipeline
]
Supervisor.start_link(children, strategy: :one_for_one)
enddefmodule MyApp.MessagePipelineTest do
use ExUnit.Case
import Broadway.Test
test "processes a single message" do
ref = push_message(MyApp.MessagePipeline, %{id: 1, value: "hello"})
assert_receive {:ack, ^ref, [%{data: %{id: 1}}], []}
end
test "marks malformed messages as failed" do
ref = push_message(MyApp.MessagePipeline, nil)
assert_receive {:ack, ^ref, [], [_failed]}
end
endBroadway does not provide a built-in backoff/requeue mechanism at the message level. For retry logic:
process/1 in a retry library (e.g., Retry) and let failures bubble to handle_failed/2.@impl true
def handle_message(_, message, _context) do
attempt = Map.get(message.metadata, :retry_count, 0)
case process(message.data) do
{:ok, result} ->
Broadway.Message.update_data(message, fn _ -> result end)
{:error, reason} when attempt < 3 ->
Broadway.Message.failed(message, {:retryable, reason})
{:error, reason} ->
Broadway.Message.failed(message, {:max_retries_exceeded, reason})
end
endBroadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: System.get_env("SQS_QUEUE_URL"),
config: [
region: "us-west-2",
max_number_of_messages: 10,
wait_time_seconds: 20
]
},
concurrency: 1
],
processors: [
default: [
concurrency: 10,
max_demand: 10,
min_demand: 5
]
],
batchers: [
default: [
concurrency: 5,
batch_size: 100,
batch_timeout: 5_000
]
]
)Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwayKafka.Producer,
brokers: ["localhost:9092"],
group_id: "my_consumer_group",
topics: ["my-topic"]
}
],
processors: [
default: [concurrency: 10]
],
batchers: [
default: [concurrency: 5, batch_size: 100, batch_timeout: 5_000]
]
)Broadway emits telemetry events for message processing and batching. Attach handlers via :telemetry.attach_many/4 in your application startup:
:telemetry.attach_many(
"broadway-handler",
[
[:broadway, :message, :start],
[:broadway, :message, :stop],
[:broadway, :message, :failure],
[:broadway, :batch, :start],
[:broadway, :batch, :stop]
],
&MyApp.Telemetry.handle_event/4,
%{}
)Optionally visualise metrics with broadway_dashboard. See the Broadway Telemetry guide for full event names and metadata shapes.
# CPU-bound: fewer workers, lower demand
# I/O-bound: more workers, higher demand
processors: [
default: [
concurrency: System.schedulers_online() * 2, # multiply by 4 for heavy I/O
max_demand: 50 # raise to 100 for I/O-bound
]
]
batchers: [
default: [
concurrency: System.schedulers_online(),
batch_size: 100,
batch_timeout: 5_000
]
]