Converts the output of popular command-line tools and file-types to JSON.
—
Real-time parsing capabilities for processing continuous data streams, with built-in error handling and metadata generation. Streaming parsers enable real-time analysis of log files, command output, and other continuous data sources.
Streaming parsers process iterables of strings rather than single strings, returning generators for memory-efficient processing.
def parse(
data: Iterable[str],
quiet: bool = False,
raw: bool = False,
ignore_exceptions: bool = False
) -> Iterator[JSONDictType]:
"""
Parse streaming data line by line.
Parameters:
- data: Iterable of strings (each line to parse)
- quiet: Suppress warning messages if True
- raw: Output preprocessed JSON if True
- ignore_exceptions: Continue processing on parse errors if True
Returns:
- Iterator yielding parsed dictionaries for each line
"""Core functions for validating input and handling streaming parser metadata.
def streaming_input_type_check(data: Iterable[Union[str, bytes]]) -> None:
"""
Ensure input data is an iterable, but not a string or bytes.
Parameters:
- data: Input data to validate
Raises:
- TypeError: If data is not a non-string iterable object
"""
def streaming_line_input_type_check(line: str) -> None:
"""
Ensure each line is a string.
Parameters:
- line: Individual line to validate
Raises:
- TypeError: If line is not a 'str' object
"""
def stream_success(output_line: JSONDictType, ignore_exceptions: bool) -> JSONDictType:
"""
Add _jc_meta object to output line if ignore_exceptions=True.
Parameters:
- output_line: Parsed output dictionary
- ignore_exceptions: Whether to add success metadata
Returns:
- Dictionary with optional _jc_meta success field
"""
def stream_error(e: BaseException, line: str) -> JSONDictType:
"""
Create an error _jc_meta field for failed parsing.
Parameters:
- e: Exception that occurred during parsing
- line: Original line that failed to parse
Returns:
- Dictionary with _jc_meta error information
"""Decorator function for creating streaming parsers with automatic error handling.
def add_jc_meta(func: F) -> F:
"""
Decorator for streaming parsers to add stream_success and stream_error handling.
This decorator automatically wraps streaming parser functions to:
- Add success metadata when ignore_exceptions=True
- Handle exceptions and generate error metadata
- Ensure consistent streaming parser behavior
Parameters:
- func: Streaming parser function to decorate
Returns:
- Decorated function with automatic metadata handling
"""Streaming variants of standard parsers (identified by -s suffix):
syslog-s - Stream syslog entries in real-timesyslog-bsd-s - Stream BSD-style syslog entriesclf-s - Stream Common Log Format web server logscef-s - Stream Common Event Format security logsping-s - Stream ping command output for continuous monitoringnetstat-s - Stream network connection statustop-s - Stream top command output for real-time process monitoringiostat-s - Stream I/O statisticsvmstat-s - Stream virtual memory statisticsmpstat-s - Stream processor statisticspidstat-s - Stream process statisticscsv-s - Stream CSV data processingasciitable-m - Stream ASCII table processinggit-log-s - Stream git log outputimport jc
# Stream ping output
ping_data = [
'PING example.com (93.184.216.34): 56 data bytes',
'64 bytes from 93.184.216.34: icmp_seq=0 ttl=56 time=11.632 ms',
'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
]
# Process each line as it becomes available
ping_stream = jc.parse('ping-s', ping_data)
for result in ping_stream:
if 'time_ms' in result:
print(f"Ping time: {result['time_ms']} ms")import jc
import subprocess
# Monitor syslog in real-time
def monitor_syslog():
proc = subprocess.Popen(['tail', '-f', '/var/log/syslog'],
stdout=subprocess.PIPE,
text=True)
# Create streaming parser
syslog_stream = jc.parse('syslog-s', iter(proc.stdout.readline, ''))
for log_entry in syslog_stream:
if log_entry.get('severity') == 'error':
print(f"ERROR: {log_entry['message']}")import jc
# Stream with error handling
malformed_data = [
'PING example.com (93.184.216.34): 56 data bytes',
'this line will cause a parse error',
'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
]
# Continue processing even with errors
ping_stream = jc.parse('ping-s', malformed_data, ignore_exceptions=True)
for result in ping_stream:
if result.get('_jc_meta', {}).get('success') == False:
print(f"Parse error: {result['_jc_meta']['error']}")
print(f"Failed line: {result['_jc_meta']['line']}")
elif 'time_ms' in result:
print(f"Ping time: {result['time_ms']} ms")from jc.streaming import add_jc_meta
import jc.streaming
@add_jc_meta
def parse(data, quiet=False, raw=False, ignore_exceptions=False):
"""Custom streaming parser with automatic metadata handling"""
# Validate input
jc.streaming.streaming_input_type_check(data)
for line in data:
jc.streaming.streaming_line_input_type_check(line)
# Parse logic here
parsed_line = {'processed': line.strip()}
yield parsed_lineimport jc
# Process large files without loading into memory
def process_large_csv(filename):
with open(filename, 'r') as f:
csv_stream = jc.parse('csv-s', f)
total_rows = 0
for row in csv_stream:
total_rows += 1
# Process row without storing all data
if total_rows % 1000 == 0:
print(f"Processed {total_rows} rows")
return total_rowsWhen ignore_exceptions=True, streaming parsers add metadata to each output object:
# Successful parsing
{
'parsed_data': 'value',
'_jc_meta': {
'success': True
}
}
# Failed parsing
{
'_jc_meta': {
'success': False,
'error': 'ParseError: Invalid format',
'line': 'original line that failed'
}
}This metadata enables robust error handling in streaming applications while maintaining processing continuity.
Install with Tessl CLI
npx tessl i tessl/pypi-jc