or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli.mdcore-api.mdindex.mdparsers.mdstreaming.mdutilities.md

streaming.mddocs/

0

# Streaming Parsers

1

2

Real-time parsing capabilities for processing continuous data streams, with built-in error handling and metadata generation. Streaming parsers enable real-time analysis of log files, command output, and other continuous data sources.

3

4

## Streaming Interface

5

6

Streaming parsers process iterables of strings rather than single strings, returning generators for memory-efficient processing.

7

8

```python { .api }

9

def parse(

10

data: Iterable[str],

11

quiet: bool = False,

12

raw: bool = False,

13

ignore_exceptions: bool = False

14

) -> Iterator[JSONDictType]:

15

"""

16

Parse streaming data line by line.

17

18

Parameters:

19

- data: Iterable of strings (each line to parse)

20

- quiet: Suppress warning messages if True

21

- raw: Output preprocessed JSON if True

22

- ignore_exceptions: Continue processing on parse errors if True

23

24

Returns:

25

- Iterator yielding parsed dictionaries for each line

26

"""

27

```

28

29

## Streaming Utilities

30

31

Core functions for validating input and handling streaming parser metadata.

32

33

```python { .api }

34

def streaming_input_type_check(data: Iterable[Union[str, bytes]]) -> None:

35

"""

36

Ensure input data is an iterable, but not a string or bytes.

37

38

Parameters:

39

- data: Input data to validate

40

41

Raises:

42

- TypeError: If data is not a non-string iterable object

43

"""

44

45

def streaming_line_input_type_check(line: str) -> None:

46

"""

47

Ensure each line is a string.

48

49

Parameters:

50

- line: Individual line to validate

51

52

Raises:

53

- TypeError: If line is not a 'str' object

54

"""

55

56

def stream_success(output_line: JSONDictType, ignore_exceptions: bool) -> JSONDictType:

57

"""

58

Add _jc_meta object to output line if ignore_exceptions=True.

59

60

Parameters:

61

- output_line: Parsed output dictionary

62

- ignore_exceptions: Whether to add success metadata

63

64

Returns:

65

- Dictionary with optional _jc_meta success field

66

"""

67

68

def stream_error(e: BaseException, line: str) -> JSONDictType:

69

"""

70

Create an error _jc_meta field for failed parsing.

71

72

Parameters:

73

- e: Exception that occurred during parsing

74

- line: Original line that failed to parse

75

76

Returns:

77

- Dictionary with _jc_meta error information

78

"""

79

```

80

81

## Streaming Decorator

82

83

Decorator function for creating streaming parsers with automatic error handling.

84

85

```python { .api }

86

def add_jc_meta(func: F) -> F:

87

"""

88

Decorator for streaming parsers to add stream_success and stream_error handling.

89

90

This decorator automatically wraps streaming parser functions to:

91

- Add success metadata when ignore_exceptions=True

92

- Handle exceptions and generate error metadata

93

- Ensure consistent streaming parser behavior

94

95

Parameters:

96

- func: Streaming parser function to decorate

97

98

Returns:

99

- Decorated function with automatic metadata handling

100

"""

101

```

102

103

## Available Streaming Parsers

104

105

Streaming variants of standard parsers (identified by `-s` suffix):

106

107

### Log Analysis

108

- `syslog-s` - Stream syslog entries in real-time

109

- `syslog-bsd-s` - Stream BSD-style syslog entries

110

- `clf-s` - Stream Common Log Format web server logs

111

- `cef-s` - Stream Common Event Format security logs

112

113

### Network Monitoring

114

- `ping-s` - Stream ping command output for continuous monitoring

115

- `netstat-s` - Stream network connection status

116

117

### System Monitoring

118

- `top-s` - Stream top command output for real-time process monitoring

119

- `iostat-s` - Stream I/O statistics

120

- `vmstat-s` - Stream virtual memory statistics

121

- `mpstat-s` - Stream processor statistics

122

- `pidstat-s` - Stream process statistics

123

124

### Data Processing

125

- `csv-s` - Stream CSV data processing

126

- `asciitable-m` - Stream ASCII table processing

127

128

### Version Control

129

- `git-log-s` - Stream git log output

130

131

## Usage Examples

132

133

### Basic Streaming

134

135

```python

136

import jc

137

138

# Stream ping output

139

ping_data = [

140

'PING example.com (93.184.216.34): 56 data bytes',

141

'64 bytes from 93.184.216.34: icmp_seq=0 ttl=56 time=11.632 ms',

142

'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'

143

]

144

145

# Process each line as it becomes available

146

ping_stream = jc.parse('ping-s', ping_data)

147

for result in ping_stream:

148

if 'time_ms' in result:

149

print(f"Ping time: {result['time_ms']} ms")

150

```

151

152

### Real-time Log Monitoring

153

154

```python

155

import jc

156

import subprocess

157

158

# Monitor syslog in real-time

159

def monitor_syslog():

160

proc = subprocess.Popen(['tail', '-f', '/var/log/syslog'],

161

stdout=subprocess.PIPE,

162

text=True)

163

164

# Create streaming parser

165

syslog_stream = jc.parse('syslog-s', iter(proc.stdout.readline, ''))

166

167

for log_entry in syslog_stream:

168

if log_entry.get('severity') == 'error':

169

print(f"ERROR: {log_entry['message']}")

170

```

171

172

### Error Handling with Streaming

173

174

```python

175

import jc

176

177

# Stream with error handling

178

malformed_data = [

179

'PING example.com (93.184.216.34): 56 data bytes',

180

'this line will cause a parse error',

181

'64 bytes from 93.184.216.34: icmp_seq=1 ttl=56 time=12.451 ms'

182

]

183

184

# Continue processing even with errors

185

ping_stream = jc.parse('ping-s', malformed_data, ignore_exceptions=True)

186

for result in ping_stream:

187

if result.get('_jc_meta', {}).get('success') == False:

188

print(f"Parse error: {result['_jc_meta']['error']}")

189

print(f"Failed line: {result['_jc_meta']['line']}")

190

elif 'time_ms' in result:

191

print(f"Ping time: {result['time_ms']} ms")

192

```

193

194

### Custom Streaming Parser

195

196

```python

197

from jc.streaming import add_jc_meta

198

import jc.streaming

199

200

@add_jc_meta

201

def parse(data, quiet=False, raw=False, ignore_exceptions=False):

202

"""Custom streaming parser with automatic metadata handling"""

203

204

# Validate input

205

jc.streaming.streaming_input_type_check(data)

206

207

for line in data:

208

jc.streaming.streaming_line_input_type_check(line)

209

210

# Parse logic here

211

parsed_line = {'processed': line.strip()}

212

213

yield parsed_line

214

```

215

216

### Memory-Efficient Processing

217

218

```python

219

import jc

220

221

# Process large files without loading into memory

222

def process_large_csv(filename):

223

with open(filename, 'r') as f:

224

csv_stream = jc.parse('csv-s', f)

225

226

total_rows = 0

227

for row in csv_stream:

228

total_rows += 1

229

# Process row without storing all data

230

if total_rows % 1000 == 0:

231

print(f"Processed {total_rows} rows")

232

233

return total_rows

234

```

235

236

## Streaming Parser Metadata

237

238

When `ignore_exceptions=True`, streaming parsers add metadata to each output object:

239

240

```python

241

# Successful parsing

242

{

243

'parsed_data': 'value',

244

'_jc_meta': {

245

'success': True

246

}

247

}

248

249

# Failed parsing

250

{

251

'_jc_meta': {

252

'success': False,

253

'error': 'ParseError: Invalid format',

254

'line': 'original line that failed'

255

}

256

}

257

```

258

259

This metadata enables robust error handling in streaming applications while maintaining processing continuity.