or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-configuration.mdcore-logging.mderror-handling.mdevent-time.mdindex.mdstream-integration.mdwinston-integration.md
tile.json

stream-integration.mddocs/

Stream Integration

Stream interface support for integrating Fluent Logger with Node.js streams and stream-based libraries.

Capabilities

Stream Creation

Create writable streams that send data to Fluentd.

/**
 * Create a writable stream interface for logging
 * @param options - Stream configuration options
 * @returns Writable stream that sends data to Fluentd
 */
FluentSender.prototype.toStream(options);

// Options can be:
// - string: label for the stream
// - object: { label: string, encoding?: string }

Usage Examples:

const logger = require('fluent-logger');

const sender = logger.createFluentSender('app', {
  host: 'localhost',
  port: 24224
});

// Create streams with labels  
const stdout = sender.toStream('stdout');
const stderr = sender.toStream('stderr');

// Create stream with options
const accessLog = sender.toStream({
  label: 'access',
  encoding: 'utf8'  // Default encoding
});

// Write to streams - each line becomes a log event
stdout.write('Application started successfully\n');
stderr.write('Warning: configuration file not found\n');
accessLog.write('192.168.1.1 - GET /api/users - 200\n');

// Streams automatically handle newline-separated events
stdout.write('First line\nSecond line\nThird line\n');
// This creates 3 separate log events

Console Integration

Use streams with Node.js Console for structured logging.

// Use fluent streams as Console output destinations
const Console = require('console').Console;
const fluentConsole = new Console(stdoutStream, stderrStream);

Usage Examples:

const { Console } = require('console');
const logger = require('fluent-logger');

const sender = logger.createFluentSender('console', {
  host: 'log-server.example.com',
  port: 24224
});

// Create fluent-backed console
const fluentConsole = new Console(
  sender.toStream('stdout'),
  sender.toStream('stderr')
);

// Use like regular console - output goes to Fluentd
fluentConsole.log('This goes to stdout stream');
fluentConsole.error('This goes to stderr stream');
fluentConsole.warn('Warnings go to stderr');

// Console methods work normally
fluentConsole.time('operation');
// ... some work ...
fluentConsole.timeEnd('operation');

// Structured data via console
fluentConsole.log(JSON.stringify({
  event: 'user_login',
  user_id: 12345,
  timestamp: Date.now()
}));

// Graceful shutdown
setTimeout(() => {
  sender.end();
}, 5000);

Process Output Redirection

Redirect process stdout/stderr through Fluent Logger streams.

Usage Examples:

const logger = require('fluent-logger');

const sender = logger.createFluentSender('process', {
  host: 'localhost',
  port: 24224
});

// Backup original streams
const originalStdout = process.stdout.write;
const originalStderr = process.stderr.write;

// Create fluent streams
const fluentStdout = sender.toStream('stdout');
const fluentStderr = sender.toStream('stderr');

// Redirect process streams
process.stdout.write = fluentStdout.write.bind(fluentStdout);
process.stderr.write = fluentStderr.write.bind(fluentStderr);

// Now all console.log, console.error, etc. go through Fluent Logger
console.log('This message goes to Fluentd via stdout stream');
console.error('This error goes to Fluentd via stderr stream');

// Restore original streams when needed
function restoreStreams() {
  process.stdout.write = originalStdout;
  process.stderr.write = originalStderr;
}

// Restore on exit
process.on('exit', restoreStreams);

Library Integration

Integrate with libraries that expect stream outputs.

Usage Examples:

const logger = require('fluent-logger');
const morgan = require('morgan'); // HTTP request logger middleware

const sender = logger.createFluentSender('http', {
  host: 'localhost',
  port: 24224
});

// Use with Express.js and Morgan
const express = require('express');
const app = express();

// Create access log stream
const accessLogStream = sender.toStream({
  label: 'access',
  encoding: 'utf8'
});

// Morgan logs to Fluent Logger
app.use(morgan('combined', { stream: accessLogStream }));

// Application routes
app.get('/', (req, res) => {
  res.send('Hello World');
});

app.listen(3000, () => {
  console.log('Server started on port 3000');
});

// Use with other stream-based loggers
const winston = require('winston');

const winstonLogger = winston.createLogger({
  transports: [
    new winston.transports.Stream({
      stream: sender.toStream('winston')
    })
  ]
});

winstonLogger.info('Winston message via stream');

Child Process Integration

Capture child process output through Fluent Logger streams.

Usage Examples:

const { spawn } = require('child_process');
const logger = require('fluent-logger');

const sender = logger.createFluentSender('subprocess', {
  host: 'localhost',
  port: 24224
});

// Create streams for child process output
const childStdout = sender.toStream('child_stdout');
const childStderr = sender.toStream('child_stderr');

// Spawn child process
const child = spawn('ls', ['-la', '/tmp'], {
  stdio: ['ignore', 'pipe', 'pipe']
});

// Pipe child output to Fluent Logger
child.stdout.pipe(childStdout);
child.stderr.pipe(childStderr);

child.on('close', (code) => {
  console.log(`Child process exited with code ${code}`);
  
  // Log process completion
  sender.emit('child_process', {
    command: 'ls -la /tmp',
    exit_code: code,
    completed_at: new Date().toISOString()
  });
});

// Multiple child processes
const commands = [
  ['ps', ['aux']],
  ['df', ['-h']],
  ['free', ['-h']]
];

commands.forEach(([cmd, args], index) => {
  const child = spawn(cmd, args);
  
  const stdout = sender.toStream(`${cmd}_stdout`);
  const stderr = sender.toStream(`${cmd}_stderr`);
  
  child.stdout.pipe(stdout);
  child.stderr.pipe(stderr);
  
  child.on('close', (code) => {
    sender.emit('system_command', {
      command: `${cmd} ${args.join(' ')}`,
      exit_code: code,
      sequence: index
    });
  });
});

Stream Configuration Options

Configure stream behavior and encoding.

interface StreamOptions {
  label: string;                    // Required: label for log events
  encoding?: string;                // Default: 'UTF-8'
}

// Stream behavior:
// - Each line (delimited by \n) becomes a separate log event
// - Partial lines are buffered until newline is received
// - Log events have structure: { message: "line content" }

Usage Examples:

const logger = require('fluent-logger');
const sender = logger.createFluentSender('streams');

// Default UTF-8 encoding
const defaultStream = sender.toStream('default');

// Explicit encoding
const utf8Stream = sender.toStream({
  label: 'utf8_logs',
  encoding: 'utf8'
});

// ASCII encoding for simple logs
const asciiStream = sender.toStream({
  label: 'ascii_logs', 
  encoding: 'ascii'
});

// Test different encodings
defaultStream.write('Default encoding: Hello 🌍\n');
utf8Stream.write('UTF-8 encoding: Hello 🌍\n');
asciiStream.write('ASCII encoding: Hello World\n');

// Buffer handling demonstration
const bufferTestStream = sender.toStream('buffer_test');

// Write partial line
bufferTestStream.write('Partial line without');
// Buffer holds: "Partial line without"

// Complete the line
bufferTestStream.write(' newline\n');
// Emits: { message: "Partial line without newline" }

// Multiple lines in one write
bufferTestStream.write('Line 1\nLine 2\nLine 3\n');
// Emits 3 separate events:
// { message: "Line 1" }
// { message: "Line 2" } 
// { message: "Line 3" }

Error Handling with Streams

Handle stream errors and connection issues.

// Stream inherits from Node.js Writable stream
// Standard stream error events apply
stream.on('error', (error) => { });
stream.on('finish', () => { });
stream.on('close', () => { });

Usage Examples:

const logger = require('fluent-logger');

const sender = logger.createFluentSender('error-demo', {
  host: 'unreliable-server.example.com',
  port: 24224,
  timeout: 2.0
});

const logStream = sender.toStream('app_logs');

// Handle stream errors
logStream.on('error', (error) => {
  console.error('Stream error:', error.message);
  
  // Implement fallback logging
  console.log('Falling back to console logging');
});

logStream.on('finish', () => {
  console.log('Stream finished');
});

logStream.on('close', () => {
  console.log('Stream closed');
});

// Monitor underlying sender
sender.on('error', (error) => {
  console.error('Sender error:', error.message);
});

sender.on('connect', () => {
  console.log('Reconnected to Fluentd');
});

// Robust stream writing with error handling
function safeWrite(stream, data) {
  return new Promise((resolve, reject) => {
    if (!stream.writable) {
      reject(new Error('Stream not writable'));
      return;
    }
    
    const success = stream.write(data, (error) => {
      if (error) reject(error);
      else resolve();
    });
    
    if (!success) {
      // Handle backpressure
      stream.once('drain', resolve);
    }
  });
}

// Usage with error handling
async function logSafely() {
  try {
    await safeWrite(logStream, 'Important log message\n');
    console.log('Log written successfully');
  } catch (error) {
    console.error('Failed to write log:', error.message);
    // Fallback to console
    console.log('FALLBACK: Important log message');
  }
}

logSafely();

Stream Event Data Format

Stream-generated log events have a consistent format:

// Each line written to stream becomes:
{
  message: "line content without newline"
}

// For label 'stdout', tag becomes: sender_tag_prefix.stdout
// For label 'access', tag becomes: sender_tag_prefix.access

Examples:

const sender = logger.createFluentSender('myapp');
const stream = sender.toStream('debug');

stream.write('User login successful\n');
// Generates event:
// Tag: myapp.debug
// Data: { message: "User login successful" }

stream.write('Error: connection timeout\nRetrying connection\n');
// Generates two events:
// 1. Tag: myapp.debug, Data: { message: "Error: connection timeout" }
// 2. Tag: myapp.debug, Data: { message: "Retrying connection" }