or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

code-generation.mdindex.mdpresence.mdreal-time.mdsecurity.mdtesting.mdweb-foundation.md

real-time.mddocs/

0

# Real-time Communication

1

2

Phoenix provides sophisticated real-time communication through WebSocket channels, enabling bidirectional messaging between clients and servers. The system supports broadcasting, presence tracking, and custom message handling with built-in fault tolerance.

3

4

## Channel System

5

6

Channels enable persistent connections between clients and the server with topic-based message routing.

7

8

### Phoenix.Channel

9

10

```elixir { .api }

11

defmodule Phoenix.Channel do

12

# Required callbacks

13

@callback join(binary, map, Phoenix.Socket.t()) ::

14

{:ok, Phoenix.Socket.t()} |

15

{:ok, map, Phoenix.Socket.t()} |

16

{:error, map}

17

18

@callback handle_in(binary, map, Phoenix.Socket.t()) ::

19

{:noreply, Phoenix.Socket.t()} |

20

{:reply, map, Phoenix.Socket.t()} |

21

{:stop, term, Phoenix.Socket.t()} |

22

{:stop, term, map, Phoenix.Socket.t()}

23

24

# Optional callbacks

25

@callback handle_out(binary, map, Phoenix.Socket.t()) ::

26

{:noreply, Phoenix.Socket.t()} |

27

{:stop, term, Phoenix.Socket.t()}

28

29

@callback handle_info(term, Phoenix.Socket.t()) ::

30

{:noreply, Phoenix.Socket.t()} |

31

{:push, map, Phoenix.Socket.t()} |

32

{:stop, term, Phoenix.Socket.t()}

33

34

@callback handle_call(term, GenServer.from(), Phoenix.Socket.t()) :: term

35

36

@callback handle_cast(term, Phoenix.Socket.t()) :: term

37

38

@callback terminate(term, Phoenix.Socket.t()) :: term

39

40

@callback code_change(term, Phoenix.Socket.t(), term) :: {:ok, Phoenix.Socket.t()}

41

42

# Public API

43

def broadcast(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}

44

def broadcast!(Phoenix.Socket.t(), binary, map) :: :ok

45

def broadcast_from(Phoenix.Socket.t(), binary, map) :: :ok | {:error, term}

46

def broadcast_from!(Phoenix.Socket.t(), binary, map) :: :ok

47

48

def push(Phoenix.Socket.t(), binary, map) :: :ok

49

def reply({binary, binary}, map) :: :ok

50

def socket_ref(Phoenix.Socket.t()) :: {binary, binary}

51

end

52

```

53

54

### Usage Examples

55

56

```elixir

57

defmodule MyAppWeb.RoomChannel do

58

use Phoenix.Channel

59

60

# Handle client joining a room

61

def join("room:" <> room_id, _params, socket) do

62

if authorized?(socket.assigns.user_id, room_id) do

63

send(self(), :after_join)

64

{:ok, socket}

65

else

66

{:error, %{reason: "unauthorized"}}

67

end

68

end

69

70

# Handle messages from clients

71

def handle_in("new_message", %{"body" => body}, socket) do

72

user = MyApp.Accounts.get_user!(socket.assigns.user_id)

73

74

message = %{

75

body: body,

76

user: user.name,

77

timestamp: DateTime.utc_now()

78

}

79

80

# Broadcast to all clients in this topic

81

broadcast!(socket, "new_message", message)

82

83

# Store message in database

84

MyApp.Chat.create_message(socket.assigns.room_id, user.id, body)

85

86

{:noreply, socket}

87

end

88

89

def handle_in("typing", %{"typing" => typing}, socket) do

90

broadcast_from!(socket, "typing", %{

91

user: socket.assigns.user_name,

92

typing: typing

93

})

94

{:noreply, socket}

95

end

96

97

# Handle system messages

98

def handle_info(:after_join, socket) do

99

# Send recent messages to newly joined client

100

messages = MyApp.Chat.list_recent_messages(socket.assigns.room_id)

101

push(socket, "message_history", %{messages: messages})

102

{:noreply, socket}

103

end

104

105

# Handle client disconnection

106

def terminate(reason, socket) do

107

broadcast_from!(socket, "user_left", %{

108

user: socket.assigns.user_name

109

})

110

:ok

111

end

112

113

defp authorized?(user_id, room_id) do

114

MyApp.Chat.user_can_access_room?(user_id, room_id)

115

end

116

end

117

```

118

119

## Socket Management

120

121

Sockets manage the persistent connection and multiplex multiple channels.

122

123

### Phoenix.Socket

124

125

```elixir { .api }

126

defmodule Phoenix.Socket do

127

# Required callbacks

128

@callback connect(map, Phoenix.Socket.t(), map) ::

129

{:ok, Phoenix.Socket.t()} | :error

130

131

@callback id(Phoenix.Socket.t()) :: binary | nil

132

133

# Socket struct fields

134

defstruct [

135

:id, # socket identifier

136

:assigns, # socket assigns

137

:channel, # current channel module

138

:channel_pid, # channel process PID

139

:endpoint, # endpoint module

140

:handler, # socket handler module

141

:joined, # whether socket has joined a topic

142

:join_ref, # join reference

143

:ref, # message reference

144

:pubsub_server, # pubsub server name

145

:topic, # current topic

146

:transport, # transport name (:websocket, :longpoll)

147

:transport_pid, # transport process PID

148

:serializer # message serializer module

149

]

150

151

@type t :: %__MODULE__{}

152

end

153

```

154

155

### Usage Examples

156

157

```elixir

158

defmodule MyAppWeb.UserSocket do

159

use Phoenix.Socket

160

161

# Channels

162

channel "room:*", MyAppWeb.RoomChannel

163

channel "user:*", MyAppWeb.UserChannel

164

channel "notifications:*", MyAppWeb.NotificationChannel

165

166

# Socket authentication

167

def connect(%{"token" => token}, socket, _connect_info) do

168

case MyApp.Accounts.verify_socket_token(token) do

169

{:ok, user_id} ->

170

socket = assign(socket, :user_id, user_id)

171

{:ok, socket}

172

173

{:error, _reason} ->

174

:error

175

end

176

end

177

178

def connect(_params, _socket, _connect_info), do: :error

179

180

# Socket identification for presence tracking

181

def id(socket), do: "user:#{socket.assigns.user_id}"

182

end

183

```

184

185

## Broadcasting and PubSub

186

187

Phoenix provides a distributed PubSub system for broadcasting messages across processes and nodes.

188

189

### Broadcasting API

190

191

```elixir { .api }

192

# From Phoenix.Channel

193

def broadcast(socket_or_endpoint, topic, event, message)

194

def broadcast!(socket_or_endpoint, topic, event, message)

195

def broadcast_from(socket_or_endpoint, pid, topic, event, message)

196

def broadcast_from!(socket_or_endpoint, pid, topic, event, message)

197

198

# From Phoenix.Endpoint

199

@callback broadcast(binary, binary, term) :: :ok | {:error, term}

200

@callback broadcast!(binary, binary, term) :: :ok

201

@callback broadcast_from(pid, binary, binary, term) :: :ok | {:error, term}

202

@callback broadcast_from!(pid, binary, binary, term) :: :ok

203

@callback local_broadcast(binary, binary, term) :: :ok

204

@callback local_broadcast_from(pid, binary, binary, term) :: :ok

205

```

206

207

### Usage Examples

208

209

```elixir

210

# Broadcasting from controllers

211

defmodule MyAppWeb.PostController do

212

use Phoenix.Controller

213

214

def create(conn, %{"post" => post_params}) do

215

case MyApp.Blog.create_post(post_params) do

216

{:ok, post} ->

217

# Broadcast new post to all subscribers

218

MyAppWeb.Endpoint.broadcast!(

219

"posts:lobby",

220

"new_post",

221

%{post: post}

222

)

223

224

json(conn, %{post: post})

225

226

{:error, changeset} ->

227

conn

228

|> put_status(:unprocessable_entity)

229

|> json(%{errors: changeset})

230

end

231

end

232

end

233

234

# Broadcasting from contexts

235

defmodule MyApp.Chat do

236

def send_message(room_id, user_id, body) do

237

case create_message(room_id, user_id, body) do

238

{:ok, message} ->

239

# Broadcast to all room subscribers

240

MyAppWeb.Endpoint.broadcast!(

241

"room:#{room_id}",

242

"new_message",

243

%{message: message}

244

)

245

{:ok, message}

246

247

error ->

248

error

249

end

250

end

251

end

252

253

# Broadcasting from GenServers

254

defmodule MyApp.GameServer do

255

use GenServer

256

257

def handle_cast({:update_game_state, game_state}, state) do

258

# Broadcast game state changes

259

MyAppWeb.Endpoint.broadcast!(

260

"game:#{state.game_id}",

261

"state_updated",

262

%{state: game_state}

263

)

264

265

{:noreply, %{state | game_state: game_state}}

266

end

267

end

268

```

269

270

## Transport Configuration

271

272

Phoenix supports multiple transport protocols with fallback mechanisms.

273

274

### Transport Types

275

276

```elixir

277

# In endpoint configuration

278

socket "/socket", MyAppWeb.UserSocket,

279

websocket: true,

280

longpoll: false

281

282

# WebSocket configuration

283

socket "/socket", MyAppWeb.UserSocket,

284

websocket: [

285

timeout: 45_000,

286

transport_log: false,

287

subprotocols: ["mqtt", "v1.example.com"]

288

]

289

290

# Long polling configuration

291

socket "/socket", MyAppWeb.UserSocket,

292

longpoll: [

293

window_ms: 10_000,

294

pubsub_timeout_ms: 2_000,

295

crypto: [

296

max_age: 1_209_600 # 2 weeks

297

]

298

]

299

```

300

301

### Client-Side Connection

302

303

```javascript

304

// JavaScript client connection

305

import {Socket} from "phoenix"

306

307

let socket = new Socket("/socket", {

308

params: {token: userToken},

309

logger: (kind, msg, data) => { console.log(`${kind}: ${msg}`, data) }

310

})

311

312

socket.connect()

313

314

// Join a channel

315

let channel = socket.channel("room:lobby", {})

316

channel.join()

317

.receive("ok", resp => { console.log("Joined successfully", resp) })

318

.receive("error", resp => { console.log("Unable to join", resp) })

319

320

// Handle messages

321

channel.on("new_message", payload => {

322

console.log("New message:", payload.body)

323

})

324

325

// Send messages

326

channel.push("new_message", {body: "Hello, World!"})

327

```

328

329

## Message Serialization

330

331

Phoenix supports multiple message serializers for different transport needs.

332

333

### Phoenix.Socket.Serializer

334

335

```elixir { .api }

336

defmodule Phoenix.Socket.Serializer do

337

@callback encode!(Phoenix.Socket.Message.t()) :: iodata

338

@callback decode!(iodata, keyword) :: Phoenix.Socket.Message.t()

339

end

340

341

# Built-in serializers

342

defmodule Phoenix.Socket.V1.JSONSerializer

343

defmodule Phoenix.Socket.V2.JSONSerializer

344

```

345

346

### Message Structure

347

348

```elixir { .api }

349

defmodule Phoenix.Socket.Message do

350

defstruct [

351

:topic, # topic string

352

:event, # event string

353

:payload, # message payload

354

:ref, # message reference

355

:join_ref # join reference

356

]

357

358

@type t :: %__MODULE__{

359

topic: binary,

360

event: binary,

361

payload: map,

362

ref: binary | nil,

363

join_ref: binary | nil

364

}

365

end

366

```

367

368

## Error Handling and Lifecycle

369

370

```elixir

371

defmodule MyAppWeb.RoomChannel do

372

use Phoenix.Channel

373

374

# Handle join errors

375

def join("room:" <> room_id, _params, socket) do

376

case MyApp.Chat.get_room(room_id) do

377

nil ->

378

{:error, %{reason: "room_not_found"}}

379

380

room ->

381

if MyApp.Chat.user_can_join?(socket.assigns.user_id, room) do

382

{:ok, assign(socket, :room, room)}

383

else

384

{:error, %{reason: "access_denied"}}

385

end

386

end

387

end

388

389

# Handle message errors

390

def handle_in("send_message", params, socket) do

391

case validate_message(params) do

392

{:ok, message_params} ->

393

# Process message

394

{:reply, {:ok, %{status: "sent"}}, socket}

395

396

{:error, errors} ->

397

{:reply, {:error, %{errors: errors}}, socket}

398

end

399

end

400

401

# Handle crashes gracefully

402

def terminate(reason, socket) do

403

Logger.info("Channel terminated: #{inspect(reason)}")

404

405

# Cleanup resources

406

MyApp.Chat.user_left_room(

407

socket.assigns.user_id,

408

socket.assigns.room.id

409

)

410

411

:ok

412

end

413

end

414

```

415

416

## Testing Channels

417

418

```elixir

419

defmodule MyAppWeb.RoomChannelTest do

420

use MyAppWeb.ChannelCase

421

422

setup do

423

user = insert(:user)

424

room = insert(:room)

425

426

{:ok, socket} = connect(MyAppWeb.UserSocket, %{user_id: user.id})

427

{:ok, socket: socket, user: user, room: room}

428

end

429

430

test "joining a room", %{socket: socket, room: room} do

431

{:ok, reply, socket} = subscribe_and_join(

432

socket,

433

MyAppWeb.RoomChannel,

434

"room:#{room.id}"

435

)

436

437

assert reply == %{}

438

assert socket.assigns.room.id == room.id

439

end

440

441

test "sending messages", %{socket: socket, room: room} do

442

{:ok, _, socket} = subscribe_and_join(

443

socket,

444

MyAppWeb.RoomChannel,

445

"room:#{room.id}"

446

)

447

448

ref = push(socket, "new_message", %{"body" => "Hello"})

449

assert_reply ref, :ok

450

451

assert_broadcast "new_message", %{body: "Hello"}

452

end

453

end

454

```