CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-jc

Converts the output of popular command-line tools and file-types to JSON.

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming Parsers

Real-time parsing capabilities for processing continuous data streams, with built-in error handling and metadata generation. Streaming parsers enable real-time analysis of log files, command output, and other continuous data sources.

Streaming Interface

Streaming parsers process iterables of strings rather than single strings, returning generators for memory-efficient processing.

def parse(
    data: Iterable[str],
    quiet: bool = False,
    raw: bool = False,
    ignore_exceptions: bool = False
) -> Iterator[JSONDictType]:
    """
    Parse streaming data line by line.

    Parameters:
    - data: Iterable of strings (each line to parse)
    - quiet: Suppress warning messages if True
    - raw: Output preprocessed JSON if True
    - ignore_exceptions: Continue processing on parse errors if True

    Returns:
    - Iterator yielding parsed dictionaries for each line
    """

Streaming Utilities

Core functions for validating input and handling streaming parser metadata.

def streaming_input_type_check(data: Iterable[Union[str, bytes]]) -> None:
    """
    Ensure input data is an iterable, but not a string or bytes.

    Parameters:
    - data: Input data to validate

    Raises:
    - TypeError: If data is not a non-string iterable object
    """

def streaming_line_input_type_check(line: str) -> None:
    """
    Ensure each line is a string.

    Parameters:
    - line: Individual line to validate

    Raises:
    - TypeError: If line is not a 'str' object
    """

def stream_success(output_line: JSONDictType, ignore_exceptions: bool) -> JSONDictType:
    """
    Add _jc_meta object to output line if ignore_exceptions=True.

    Parameters:
    - output_line: Parsed output dictionary
    - ignore_exceptions: Whether to add success metadata

    Returns:
    - Dictionary with optional _jc_meta success field
    """

def stream_error(e: BaseException, line: str) -> JSONDictType:
    """
    Create an error _jc_meta field for failed parsing.

    Parameters:
    - e: Exception that occurred during parsing
    - line: Original line that failed to parse

    Returns:
    - Dictionary with _jc_meta error information
    """

Streaming Decorator

Decorator function for creating streaming parsers with automatic error handling.

def add_jc_meta(func: F) -> F:
    """
    Decorator for streaming parsers to add stream_success and stream_error handling.

    This decorator automatically wraps streaming parser functions to:
    - Add success metadata when ignore_exceptions=True
    - Handle exceptions and generate error metadata
    - Ensure consistent streaming parser behavior

    Parameters:
    - func: Streaming parser function to decorate

    Returns:
    - Decorated function with automatic metadata handling
    """

Available Streaming Parsers

Streaming variants of standard parsers (identified by -s suffix):

Log Analysis

  • syslog-s - Stream syslog entries in real-time
  • syslog-bsd-s - Stream BSD-style syslog entries
  • clf-s - Stream Common Log Format web server logs
  • cef-s - Stream Common Event Format security logs

Network Monitoring

  • ping-s - Stream ping command output for continuous monitoring
  • netstat-s - Stream network connection status

System Monitoring

  • top-s - Stream top command output for real-time process monitoring
  • iostat-s - Stream I/O statistics
  • vmstat-s - Stream virtual memory statistics
  • mpstat-s - Stream processor statistics
  • pidstat-s - Stream process statistics

Data Processing

  • csv-s - Stream CSV data processing
  • asciitable-m - Stream ASCII table processing

Version Control

  • git-log-s - Stream git log output

Usage Examples

Basic Streaming

import jc

# Stream ping output
ping_data = [
    'PING example.com (93.184.216.34): 56 data bytes',
    '64 bytes from 93.184.216.34: icmp_seq=0 ttl=56 time=11.632 ms',
    '64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
]

# Process each line as it becomes available
ping_stream = jc.parse('ping-s', ping_data)
for result in ping_stream:
    if 'time_ms' in result:
        print(f"Ping time: {result['time_ms']} ms")

Real-time Log Monitoring

import jc
import subprocess

# Monitor syslog in real-time
def monitor_syslog():
    proc = subprocess.Popen(['tail', '-f', '/var/log/syslog'], 
                          stdout=subprocess.PIPE, 
                          text=True)
    
    # Create streaming parser
    syslog_stream = jc.parse('syslog-s', iter(proc.stdout.readline, ''))
    
    for log_entry in syslog_stream:
        if log_entry.get('severity') == 'error':
            print(f"ERROR: {log_entry['message']}")

Error Handling with Streaming

import jc

# Stream with error handling
malformed_data = [
    'PING example.com (93.184.216.34): 56 data bytes',
    'this line will cause a parse error',
    '64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'
]

# Continue processing even with errors
ping_stream = jc.parse('ping-s', malformed_data, ignore_exceptions=True)
for result in ping_stream:
    if result.get('_jc_meta', {}).get('success') == False:
        print(f"Parse error: {result['_jc_meta']['error']}")
        print(f"Failed line: {result['_jc_meta']['line']}")
    elif 'time_ms' in result:
        print(f"Ping time: {result['time_ms']} ms")

Custom Streaming Parser

from jc.streaming import add_jc_meta
import jc.streaming

@add_jc_meta
def parse(data, quiet=False, raw=False, ignore_exceptions=False):
    """Custom streaming parser with automatic metadata handling"""
    
    # Validate input
    jc.streaming.streaming_input_type_check(data)
    
    for line in data:
        jc.streaming.streaming_line_input_type_check(line)
        
        # Parse logic here
        parsed_line = {'processed': line.strip()}
        
        yield parsed_line

Memory-Efficient Processing

import jc

# Process large files without loading into memory
def process_large_csv(filename):
    with open(filename, 'r') as f:
        csv_stream = jc.parse('csv-s', f)
        
        total_rows = 0
        for row in csv_stream:
            total_rows += 1
            # Process row without storing all data
            if total_rows % 1000 == 0:
                print(f"Processed {total_rows} rows")
    
    return total_rows

Streaming Parser Metadata

When ignore_exceptions=True, streaming parsers add metadata to each output object:

# Successful parsing
{
    'parsed_data': 'value',
    '_jc_meta': {
        'success': True
    }
}

# Failed parsing
{
    '_jc_meta': {
        'success': False,
        'error': 'ParseError: Invalid format',
        'line': 'original line that failed'
    }
}

This metadata enables robust error handling in streaming applications while maintaining processing continuity.

Install with Tessl CLI

npx tessl i tessl/pypi-jc

docs

cli.md

core-api.md

index.md

parsers.md

streaming.md

utilities.md

tile.json