Stream interface support for integrating Fluent Logger with Node.js streams and stream-based libraries.
Create writable streams that send data to Fluentd.
/**
* Create a writable stream interface for logging
* @param options - Stream configuration options
* @returns Writable stream that sends data to Fluentd
*/
FluentSender.prototype.toStream(options);
// Options can be:
// - string: label for the stream
// - object: { label: string, encoding?: string }Usage Examples:
const logger = require('fluent-logger');
const sender = logger.createFluentSender('app', {
host: 'localhost',
port: 24224
});
// Create streams with labels
const stdout = sender.toStream('stdout');
const stderr = sender.toStream('stderr');
// Create stream with options
const accessLog = sender.toStream({
label: 'access',
encoding: 'utf8' // Default encoding
});
// Write to streams - each line becomes a log event
stdout.write('Application started successfully\n');
stderr.write('Warning: configuration file not found\n');
accessLog.write('192.168.1.1 - GET /api/users - 200\n');
// Streams automatically handle newline-separated events
stdout.write('First line\nSecond line\nThird line\n');
// This creates 3 separate log eventsUse streams with Node.js Console for structured logging.
// Use fluent streams as Console output destinations
const Console = require('console').Console;
const fluentConsole = new Console(stdoutStream, stderrStream);Usage Examples:
const { Console } = require('console');
const logger = require('fluent-logger');
const sender = logger.createFluentSender('console', {
host: 'log-server.example.com',
port: 24224
});
// Create fluent-backed console
const fluentConsole = new Console(
sender.toStream('stdout'),
sender.toStream('stderr')
);
// Use like regular console - output goes to Fluentd
fluentConsole.log('This goes to stdout stream');
fluentConsole.error('This goes to stderr stream');
fluentConsole.warn('Warnings go to stderr');
// Console methods work normally
fluentConsole.time('operation');
// ... some work ...
fluentConsole.timeEnd('operation');
// Structured data via console
fluentConsole.log(JSON.stringify({
event: 'user_login',
user_id: 12345,
timestamp: Date.now()
}));
// Graceful shutdown
setTimeout(() => {
sender.end();
}, 5000);Redirect process stdout/stderr through Fluent Logger streams.
Usage Examples:
const logger = require('fluent-logger');
const sender = logger.createFluentSender('process', {
host: 'localhost',
port: 24224
});
// Backup original streams
const originalStdout = process.stdout.write;
const originalStderr = process.stderr.write;
// Create fluent streams
const fluentStdout = sender.toStream('stdout');
const fluentStderr = sender.toStream('stderr');
// Redirect process streams
process.stdout.write = fluentStdout.write.bind(fluentStdout);
process.stderr.write = fluentStderr.write.bind(fluentStderr);
// Now all console.log, console.error, etc. go through Fluent Logger
console.log('This message goes to Fluentd via stdout stream');
console.error('This error goes to Fluentd via stderr stream');
// Restore original streams when needed
function restoreStreams() {
process.stdout.write = originalStdout;
process.stderr.write = originalStderr;
}
// Restore on exit
process.on('exit', restoreStreams);Integrate with libraries that expect stream outputs.
Usage Examples:
const logger = require('fluent-logger');
const morgan = require('morgan'); // HTTP request logger middleware
const sender = logger.createFluentSender('http', {
host: 'localhost',
port: 24224
});
// Use with Express.js and Morgan
const express = require('express');
const app = express();
// Create access log stream
const accessLogStream = sender.toStream({
label: 'access',
encoding: 'utf8'
});
// Morgan logs to Fluent Logger
app.use(morgan('combined', { stream: accessLogStream }));
// Application routes
app.get('/', (req, res) => {
res.send('Hello World');
});
app.listen(3000, () => {
console.log('Server started on port 3000');
});
// Use with other stream-based loggers
const winston = require('winston');
const winstonLogger = winston.createLogger({
transports: [
new winston.transports.Stream({
stream: sender.toStream('winston')
})
]
});
winstonLogger.info('Winston message via stream');Capture child process output through Fluent Logger streams.
Usage Examples:
const { spawn } = require('child_process');
const logger = require('fluent-logger');
const sender = logger.createFluentSender('subprocess', {
host: 'localhost',
port: 24224
});
// Create streams for child process output
const childStdout = sender.toStream('child_stdout');
const childStderr = sender.toStream('child_stderr');
// Spawn child process
const child = spawn('ls', ['-la', '/tmp'], {
stdio: ['ignore', 'pipe', 'pipe']
});
// Pipe child output to Fluent Logger
child.stdout.pipe(childStdout);
child.stderr.pipe(childStderr);
child.on('close', (code) => {
console.log(`Child process exited with code ${code}`);
// Log process completion
sender.emit('child_process', {
command: 'ls -la /tmp',
exit_code: code,
completed_at: new Date().toISOString()
});
});
// Multiple child processes
const commands = [
['ps', ['aux']],
['df', ['-h']],
['free', ['-h']]
];
commands.forEach(([cmd, args], index) => {
const child = spawn(cmd, args);
const stdout = sender.toStream(`${cmd}_stdout`);
const stderr = sender.toStream(`${cmd}_stderr`);
child.stdout.pipe(stdout);
child.stderr.pipe(stderr);
child.on('close', (code) => {
sender.emit('system_command', {
command: `${cmd} ${args.join(' ')}`,
exit_code: code,
sequence: index
});
});
});Configure stream behavior and encoding.
interface StreamOptions {
label: string; // Required: label for log events
encoding?: string; // Default: 'UTF-8'
}
// Stream behavior:
// - Each line (delimited by \n) becomes a separate log event
// - Partial lines are buffered until newline is received
// - Log events have structure: { message: "line content" }Usage Examples:
const logger = require('fluent-logger');
const sender = logger.createFluentSender('streams');
// Default UTF-8 encoding
const defaultStream = sender.toStream('default');
// Explicit encoding
const utf8Stream = sender.toStream({
label: 'utf8_logs',
encoding: 'utf8'
});
// ASCII encoding for simple logs
const asciiStream = sender.toStream({
label: 'ascii_logs',
encoding: 'ascii'
});
// Test different encodings
defaultStream.write('Default encoding: Hello π\n');
utf8Stream.write('UTF-8 encoding: Hello π\n');
asciiStream.write('ASCII encoding: Hello World\n');
// Buffer handling demonstration
const bufferTestStream = sender.toStream('buffer_test');
// Write partial line
bufferTestStream.write('Partial line without');
// Buffer holds: "Partial line without"
// Complete the line
bufferTestStream.write(' newline\n');
// Emits: { message: "Partial line without newline" }
// Multiple lines in one write
bufferTestStream.write('Line 1\nLine 2\nLine 3\n');
// Emits 3 separate events:
// { message: "Line 1" }
// { message: "Line 2" }
// { message: "Line 3" }Handle stream errors and connection issues.
// Stream inherits from Node.js Writable stream
// Standard stream error events apply
stream.on('error', (error) => { });
stream.on('finish', () => { });
stream.on('close', () => { });Usage Examples:
const logger = require('fluent-logger');
const sender = logger.createFluentSender('error-demo', {
host: 'unreliable-server.example.com',
port: 24224,
timeout: 2.0
});
const logStream = sender.toStream('app_logs');
// Handle stream errors
logStream.on('error', (error) => {
console.error('Stream error:', error.message);
// Implement fallback logging
console.log('Falling back to console logging');
});
logStream.on('finish', () => {
console.log('Stream finished');
});
logStream.on('close', () => {
console.log('Stream closed');
});
// Monitor underlying sender
sender.on('error', (error) => {
console.error('Sender error:', error.message);
});
sender.on('connect', () => {
console.log('Reconnected to Fluentd');
});
// Robust stream writing with error handling
function safeWrite(stream, data) {
return new Promise((resolve, reject) => {
if (!stream.writable) {
reject(new Error('Stream not writable'));
return;
}
const success = stream.write(data, (error) => {
if (error) reject(error);
else resolve();
});
if (!success) {
// Handle backpressure
stream.once('drain', resolve);
}
});
}
// Usage with error handling
async function logSafely() {
try {
await safeWrite(logStream, 'Important log message\n');
console.log('Log written successfully');
} catch (error) {
console.error('Failed to write log:', error.message);
// Fallback to console
console.log('FALLBACK: Important log message');
}
}
logSafely();Stream-generated log events have a consistent format:
// Each line written to stream becomes:
{
message: "line content without newline"
}
// For label 'stdout', tag becomes: sender_tag_prefix.stdout
// For label 'access', tag becomes: sender_tag_prefix.accessExamples:
const sender = logger.createFluentSender('myapp');
const stream = sender.toStream('debug');
stream.write('User login successful\n');
// Generates event:
// Tag: myapp.debug
// Data: { message: "User login successful" }
stream.write('Error: connection timeout\nRetrying connection\n');
// Generates two events:
// 1. Tag: myapp.debug, Data: { message: "Error: connection timeout" }
// 2. Tag: myapp.debug, Data: { message: "Retrying connection" }