0
# Streaming Parsers
1
2
Real-time parsing capabilities for processing continuous data streams, with built-in error handling and metadata generation. Streaming parsers enable real-time analysis of log files, command output, and other continuous data sources.
3
4
## Streaming Interface
5
6
Streaming parsers process iterables of strings rather than single strings, returning generators for memory-efficient processing.
7
8
```python { .api }
9
def parse(
10
data: Iterable[str],
11
quiet: bool = False,
12
raw: bool = False,
13
ignore_exceptions: bool = False
14
) -> Iterator[JSONDictType]:
15
"""
16
Parse streaming data line by line.
17
18
Parameters:
19
- data: Iterable of strings (each line to parse)
20
- quiet: Suppress warning messages if True
21
- raw: Output preprocessed JSON if True
22
- ignore_exceptions: Continue processing on parse errors if True
23
24
Returns:
25
- Iterator yielding parsed dictionaries for each line
26
"""
27
```
28
29
## Streaming Utilities
30
31
Core functions for validating input and handling streaming parser metadata.
32
33
```python { .api }
34
def streaming_input_type_check(data: Iterable[Union[str, bytes]]) -> None:
35
"""
36
Ensure input data is an iterable, but not a string or bytes.
37
38
Parameters:
39
- data: Input data to validate
40
41
Raises:
42
- TypeError: If data is not a non-string iterable object
43
"""
44
45
def streaming_line_input_type_check(line: str) -> None:
46
"""
47
Ensure each line is a string.
48
49
Parameters:
50
- line: Individual line to validate
51
52
Raises:
53
- TypeError: If line is not a 'str' object
54
"""
55
56
def stream_success(output_line: JSONDictType, ignore_exceptions: bool) -> JSONDictType:
57
"""
58
Add _jc_meta object to output line if ignore_exceptions=True.
59
60
Parameters:
61
- output_line: Parsed output dictionary
62
- ignore_exceptions: Whether to add success metadata
63
64
Returns:
65
- Dictionary with optional _jc_meta success field
66
"""
67
68
def stream_error(e: BaseException, line: str) -> JSONDictType:
69
"""
70
Create an error _jc_meta field for failed parsing.
71
72
Parameters:
73
- e: Exception that occurred during parsing
74
- line: Original line that failed to parse
75
76
Returns:
77
- Dictionary with _jc_meta error information
78
"""
79
```
80
81
## Streaming Decorator
82
83
Decorator function for creating streaming parsers with automatic error handling.
84
85
```python { .api }
86
def add_jc_meta(func: F) -> F:
87
"""
88
Decorator for streaming parsers to add stream_success and stream_error handling.
89
90
This decorator automatically wraps streaming parser functions to:
91
- Add success metadata when ignore_exceptions=True
92
- Handle exceptions and generate error metadata
93
- Ensure consistent streaming parser behavior
94
95
Parameters:
96
- func: Streaming parser function to decorate
97
98
Returns:
99
- Decorated function with automatic metadata handling
100
"""
101
```
102
103
## Available Streaming Parsers
104
105
Streaming variants of standard parsers (identified by `-s` suffix):
106
107
### Log Analysis
108
- `syslog-s` - Stream syslog entries in real-time
109
- `syslog-bsd-s` - Stream BSD-style syslog entries
110
- `clf-s` - Stream Common Log Format web server logs
111
- `cef-s` - Stream Common Event Format security logs
112
113
### Network Monitoring
114
- `ping-s` - Stream ping command output for continuous monitoring
115
- `netstat-s` - Stream network connection status
116
117
### System Monitoring
118
- `top-s` - Stream top command output for real-time process monitoring
119
- `iostat-s` - Stream I/O statistics
120
- `vmstat-s` - Stream virtual memory statistics
121
- `mpstat-s` - Stream processor statistics
122
- `pidstat-s` - Stream process statistics
123
124
### Data Processing
125
- `csv-s` - Stream CSV data processing
126
- `asciitable-m` - Stream ASCII table processing
127
128
### Version Control
129
- `git-log-s` - Stream git log output
130
131
## Usage Examples
132
133
### Basic Streaming
134
135
```python
136
import jc
137
138
# Stream ping output
139
ping_data = [
140
'PING example.com (93.184.216.34): 56 data bytes',
141
'64 bytes from 93.184.216.34: icmp_seq=0 ttl=56 time=11.632 ms',
142
'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
143
]
144
145
# Process each line as it becomes available
146
ping_stream = jc.parse('ping-s', ping_data)
147
for result in ping_stream:
148
if 'time_ms' in result:
149
print(f"Ping time: {result['time_ms']} ms")
150
```
151
152
### Real-time Log Monitoring
153
154
```python
155
import jc
156
import subprocess
157
158
# Monitor syslog in real-time
159
def monitor_syslog():
160
proc = subprocess.Popen(['tail', '-f', '/var/log/syslog'],
161
stdout=subprocess.PIPE,
162
text=True)
163
164
# Create streaming parser
165
syslog_stream = jc.parse('syslog-s', iter(proc.stdout.readline, ''))
166
167
for log_entry in syslog_stream:
168
if log_entry.get('severity') == 'error':
169
print(f"ERROR: {log_entry['message']}")
170
```
171
172
### Error Handling with Streaming
173
174
```python
175
import jc
176
177
# Stream with error handling
178
malformed_data = [
179
'PING example.com (93.184.216.34): 56 data bytes',
180
'this line will cause a parse error',
181
'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
182
]
183
184
# Continue processing even with errors
185
ping_stream = jc.parse('ping-s', malformed_data, ignore_exceptions=True)
186
for result in ping_stream:
187
if result.get('_jc_meta', {}).get('success') == False:
188
print(f"Parse error: {result['_jc_meta']['error']}")
189
print(f"Failed line: {result['_jc_meta']['line']}")
190
elif 'time_ms' in result:
191
print(f"Ping time: {result['time_ms']} ms")
192
```
193
194
### Custom Streaming Parser
195
196
```python
197
from jc.streaming import add_jc_meta
198
import jc.streaming
199
200
@add_jc_meta
201
def parse(data, quiet=False, raw=False, ignore_exceptions=False):
202
"""Custom streaming parser with automatic metadata handling"""
203
204
# Validate input
205
jc.streaming.streaming_input_type_check(data)
206
207
for line in data:
208
jc.streaming.streaming_line_input_type_check(line)
209
210
# Parse logic here
211
parsed_line = {'processed': line.strip()}
212
213
yield parsed_line
214
```
215
216
### Memory-Efficient Processing
217
218
```python
219
import jc
220
221
# Process large files without loading into memory
222
def process_large_csv(filename):
223
with open(filename, 'r') as f:
224
csv_stream = jc.parse('csv-s', f)
225
226
total_rows = 0
227
for row in csv_stream:
228
total_rows += 1
229
# Process row without storing all data
230
if total_rows % 1000 == 0:
231
print(f"Processed {total_rows} rows")
232
233
return total_rows
234
```
235
236
## Streaming Parser Metadata
237
238
When `ignore_exceptions=True`, streaming parsers add metadata to each output object:
239
240
```python
241
# Successful parsing
242
{
243
'parsed_data': 'value',
244
'_jc_meta': {
245
'success': True
246
}
247
}
248
249
# Failed parsing
250
{
251
'_jc_meta': {
252
'success': False,
253
'error': 'ParseError: Invalid format',
254
'line': 'original line that failed'
255
}
256
}
257
```
258
259
This metadata enables robust error handling in streaming applications while maintaining processing continuity.