0
# Session and Messaging
1
2
Jupyter protocol message handling, serialization, authentication, and session management. Provides complete protocol implementation with signing, security features, and message lifecycle management for kernel communication.
3
4
## Capabilities
5
6
### Session Management
7
8
The `Session` class handles Jupyter protocol message creation, serialization, authentication, and communication over ZMQ sockets.
9
10
```python { .api }
11
class Session:
12
"""Handles Jupyter protocol message creation and communication."""
13
14
def msg(self, msg_type, content=None, parent=None, header=None, metadata=None):
15
"""
16
Create a Jupyter protocol message.
17
18
Parameters:
19
- msg_type (str): Type of message (e.g., 'execute_request')
20
- content (dict): Message content payload
21
- parent (dict): Parent message for threading
22
- header (dict): Message header (auto-generated if None)
23
- metadata (dict): Message metadata
24
25
Returns:
26
dict: Complete Jupyter protocol message
27
"""
28
29
def sign(self, msg_list):
30
"""
31
Sign a message list for authentication.
32
33
Parameters:
34
- msg_list (list): List of message parts to sign
35
36
Returns:
37
bytes: Message signature
38
"""
39
40
def serialize(self, msg, ident=None):
41
"""
42
Serialize a message for transmission over ZMQ.
43
44
Parameters:
45
- msg (dict): Message to serialize
46
- ident (bytes): ZMQ identity for routing
47
48
Returns:
49
list: List of bytes for ZMQ multipart message
50
"""
51
52
def deserialize(self, msg_list, content=True, copy=True):
53
"""
54
Deserialize a message received from ZMQ.
55
56
Parameters:
57
- msg_list (list): List of bytes from ZMQ multipart message
58
- content (bool): Parse message content if True
59
- copy (bool): Copy message data if True
60
61
Returns:
62
dict: Deserialized message dictionary
63
"""
64
65
def send(self, socket, msg_or_type, content=None, parent=None,
66
ident=None, buffers=None, track=False, header=None, metadata=None):
67
"""
68
Send a message over a ZMQ socket.
69
70
Parameters:
71
- socket (zmq.Socket): ZMQ socket to send on
72
- msg_or_type (dict | str): Complete message or message type
73
- content (dict): Message content (if msg_or_type is string)
74
- parent (dict): Parent message for threading
75
- ident (bytes): ZMQ identity for routing
76
- buffers (list): Binary buffers to send
77
- track (bool): Track message delivery if True
78
- header (dict): Message header
79
- metadata (dict): Message metadata
80
81
Returns:
82
MessageTracker | None: Tracker for message delivery
83
"""
84
85
def recv(self, socket, mode=0, content=True, copy=True):
86
"""
87
Receive a message from a ZMQ socket.
88
89
Parameters:
90
- socket (zmq.Socket): ZMQ socket to receive from
91
- mode (int): ZMQ receive mode (0=blocking, zmq.NOBLOCK=non-blocking)
92
- content (bool): Parse message content if True
93
- copy (bool): Copy message data if True
94
95
Returns:
96
dict: Received message dictionary
97
"""
98
99
def clone(self):
100
"""
101
Create a copy of this session.
102
103
Returns:
104
Session: New session instance with same configuration
105
"""
106
107
# Configuration properties
108
username = 'username' # Username for message headers
109
session = 'session-id' # Session identifier
110
signature_scheme = 'hmac-sha256' # Message signing scheme
111
key = b'' # Signing key
112
packer = None # Message serialization function
113
unpacker = None # Message deserialization function
114
```
115
116
### Session Factory
117
118
Base class for objects that need session management capabilities.
119
120
```python { .api }
121
class SessionFactory:
122
"""Base class for configurables that have a Session."""
123
124
session = None # Session instance for message handling
125
```
126
127
### Message Structure
128
129
The `Message` class represents Jupyter protocol messages with proper structure and validation.
130
131
```python { .api }
132
class Message:
133
"""Jupyter protocol message representation."""
134
135
def __init__(self, msg_dict):
136
"""
137
Create message from dictionary.
138
139
Parameters:
140
- msg_dict (dict): Message dictionary
141
"""
142
143
@property
144
def header(self):
145
"""Message header dictionary."""
146
147
@property
148
def parent_header(self):
149
"""Parent message header for threading."""
150
151
@property
152
def metadata(self):
153
"""Message metadata dictionary."""
154
155
@property
156
def content(self):
157
"""Message content payload."""
158
159
@property
160
def buffers(self):
161
"""Binary message buffers."""
162
```
163
164
### Message Utilities
165
166
Utility functions for working with Jupyter protocol messages.
167
168
```python { .api }
169
def msg_header(msg_id, msg_type, username, session, date=None, version=None):
170
"""
171
Create a message header.
172
173
Parameters:
174
- msg_id (str): Unique message identifier
175
- msg_type (str): Type of message
176
- username (str): Username creating the message
177
- session (str): Session identifier
178
- date (datetime): Message timestamp (current time if None)
179
- version (str): Protocol version
180
181
Returns:
182
dict: Message header dictionary
183
"""
184
185
def extract_header(msg_or_header):
186
"""
187
Extract header from message or return header directly.
188
189
Parameters:
190
- msg_or_header (dict): Message dict or header dict
191
192
Returns:
193
dict: Message header
194
"""
195
196
def utcnow():
197
"""
198
Get current UTC timestamp for messages.
199
200
Returns:
201
datetime: Current UTC datetime
202
"""
203
204
def new_id():
205
"""
206
Generate a new unique message ID.
207
208
Returns:
209
str: Unique message identifier
210
"""
211
```
212
213
### JSON Utilities
214
215
Specialized JSON handling for Jupyter messages with support for dates and numpy arrays.
216
217
```python { .api }
218
# JSON serialization functions optimized for Jupyter messages
219
def json_default(obj):
220
"""Default JSON serializer for special types."""
221
222
def json_packer(obj):
223
"""Pack objects to JSON with special type handling."""
224
225
def json_unpacker(s):
226
"""Unpack JSON with special type handling."""
227
```
228
229
## Usage Examples
230
231
### Basic Session Usage
232
233
```python
234
from jupyter_client import Session
235
import zmq
236
237
# Create ZMQ context and sockets
238
context = zmq.Context()
239
socket = context.socket(zmq.REQ)
240
socket.connect('tcp://127.0.0.1:50001')
241
242
# Create session
243
session = Session()
244
245
# Create and send a message
246
msg = session.msg('kernel_info_request')
247
session.send(socket, msg)
248
249
# Receive reply
250
reply = session.recv(socket)
251
print(f"Kernel info: {reply['content']}")
252
253
# Clean up
254
socket.close()
255
context.term()
256
```
257
258
### Message Creation and Structure
259
260
```python
261
from jupyter_client import Session
262
263
session = Session(username='user', session='test-session')
264
265
# Create execute request message
266
execute_msg = session.msg(
267
'execute_request',
268
content={
269
'code': 'print("Hello, World!")',
270
'silent': False,
271
'store_history': True,
272
'user_expressions': {},
273
'allow_stdin': False,
274
'stop_on_error': True
275
}
276
)
277
278
print(f"Message ID: {execute_msg['header']['msg_id']}")
279
print(f"Message type: {execute_msg['header']['msg_type']}")
280
print(f"Content: {execute_msg['content']}")
281
282
# Create reply message
283
reply = session.msg(
284
'execute_reply',
285
content={
286
'status': 'ok',
287
'execution_count': 1,
288
'user_expressions': {}
289
},
290
parent=execute_msg
291
)
292
293
print(f"Reply parent: {reply['parent_header']['msg_id']}")
294
```
295
296
### Message Serialization and Signing
297
298
```python
299
from jupyter_client import Session
300
import zmq
301
302
# Create session with authentication
303
session = Session(
304
key=b'secret-key-for-signing',
305
signature_scheme='hmac-sha256'
306
)
307
308
# Create message
309
msg = session.msg('status', content={'execution_state': 'busy'})
310
311
# Serialize message
312
serialized = session.serialize(msg)
313
print(f"Serialized parts: {len(serialized)}")
314
315
# Sign message manually
316
signature = session.sign(serialized[1:]) # Skip signature part
317
print(f"Signature: {signature.hex()}")
318
319
# Deserialize message
320
deserialized = session.deserialize(serialized)
321
print(f"Deserialized type: {deserialized['header']['msg_type']}")
322
```
323
324
### Session Communication Pattern
325
326
```python
327
from jupyter_client import Session, KernelManager
328
import zmq
329
330
# Start kernel and get connection info
331
km = KernelManager()
332
km.start_kernel()
333
connection_info = km.get_connection_info()
334
335
# Create session matching kernel's session
336
session = Session(
337
key=connection_info['key'].encode(),
338
signature_scheme=connection_info['signature_scheme']
339
)
340
341
# Connect to shell channel
342
context = zmq.Context()
343
shell_socket = context.socket(zmq.DEALER)
344
shell_socket.connect(f"tcp://127.0.0.1:{connection_info['shell_port']}")
345
346
# Send kernel info request
347
kernel_info_msg = session.msg('kernel_info_request')
348
session.send(shell_socket, kernel_info_msg)
349
350
# Receive reply
351
reply = session.recv(shell_socket)
352
print(f"Kernel language: {reply['content']['language_info']['name']}")
353
354
# Clean up
355
shell_socket.close()
356
context.term()
357
km.shutdown_kernel()
358
```
359
360
### Custom Message Types
361
362
```python
363
from jupyter_client import Session
364
365
session = Session()
366
367
# Create custom message type
368
custom_msg = session.msg(
369
'custom_request',
370
content={
371
'operation': 'process_data',
372
'parameters': {
373
'input_file': '/path/to/data.csv',
374
'output_format': 'json'
375
}
376
},
377
metadata={
378
'priority': 'high',
379
'timeout': 30
380
}
381
)
382
383
print(f"Custom message: {custom_msg}")
384
385
# Handle custom reply
386
def handle_custom_reply(msg):
387
if msg['header']['msg_type'] == 'custom_reply':
388
content = msg['content']
389
if content['status'] == 'ok':
390
print(f"Operation completed: {content['result']}")
391
else:
392
print(f"Operation failed: {content['error']}")
393
```
394
395
### Thread-Safe Session Usage
396
397
```python
398
import threading
399
from jupyter_client import Session
400
import zmq
401
402
class SessionWorker(threading.Thread):
403
def __init__(self, session_config, socket_url):
404
super().__init__()
405
self.session = Session(**session_config)
406
self.socket_url = socket_url
407
self.running = True
408
409
def run(self):
410
context = zmq.Context()
411
socket = context.socket(zmq.DEALER)
412
socket.connect(self.socket_url)
413
414
while self.running:
415
try:
416
# Non-blocking receive
417
msg = self.session.recv(socket, mode=zmq.NOBLOCK)
418
self.handle_message(msg)
419
except zmq.Again:
420
# No message available
421
continue
422
except Exception as e:
423
print(f"Error receiving message: {e}")
424
425
socket.close()
426
context.term()
427
428
def handle_message(self, msg):
429
print(f"Received: {msg['header']['msg_type']}")
430
431
def send_message(self, msg_type, content=None):
432
# This would need socket access - shown for concept
433
msg = self.session.msg(msg_type, content=content)
434
# session.send(socket, msg)
435
436
# Create worker with session config
437
session_config = {
438
'username': 'worker',
439
'session': 'worker-session'
440
}
441
442
worker = SessionWorker(session_config, 'tcp://127.0.0.1:50001')
443
worker.start()
444
445
# Use worker...
446
447
worker.running = False
448
worker.join()
449
```
450
451
### Message Buffer Handling
452
453
```python
454
from jupyter_client import Session
455
import numpy as np
456
457
session = Session()
458
459
# Create message with binary buffers
460
data = np.array([1, 2, 3, 4, 5])
461
msg = session.msg(
462
'data_message',
463
content={
464
'shape': data.shape,
465
'dtype': str(data.dtype)
466
},
467
buffers=[data.tobytes()]
468
)
469
470
print(f"Message has {len(msg.get('buffers', []))} buffers")
471
472
# Serialize with buffers
473
serialized = session.serialize(msg)
474
print(f"Serialized message parts: {len(serialized)}")
475
476
# Deserialize and reconstruct data
477
deserialized = session.deserialize(serialized)
478
if 'buffers' in deserialized:
479
buffer_data = deserialized['buffers'][0]
480
reconstructed = np.frombuffer(buffer_data, dtype=data.dtype)
481
print(f"Reconstructed data: {reconstructed}")
482
```