or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# pg-query-stream

1

2

pg-query-stream provides memory-efficient streaming of PostgreSQL query results using server-side cursors. It extends Node.js Readable streams to handle massive result sets without loading all data into memory, making it ideal for ETL operations and processing large tables.

3

4

## Package Information

5

6

- **Package Name**: pg-query-stream

7

- **Package Type**: npm

8

- **Language**: TypeScript

9

- **Installation**: `npm install pg-query-stream`

10

- **Peer Dependency**: Requires `pg` (^8) - the PostgreSQL client library

11

12

## Core Imports

13

14

```typescript

15

import QueryStream from "pg-query-stream";

16

```

17

18

For CommonJS:

19

20

```javascript

21

const QueryStream = require("pg-query-stream");

22

```

23

24

## Basic Usage

25

26

```typescript

27

import pg from "pg";

28

import QueryStream from "pg-query-stream";

29

30

const client = new pg.Client();

31

await client.connect();

32

33

// Stream large result set without memory issues

34

const query = new QueryStream('SELECT * FROM large_table', []);

35

const stream = client.query(query);

36

37

stream.on('data', (row) => {

38

console.log(row); // Process each row as it arrives

39

});

40

41

stream.on('end', () => {

42

console.log('Stream completed');

43

client.end();

44

});

45

46

stream.on('error', (err) => {

47

console.error('Stream error:', err);

48

});

49

```

50

51

## Architecture

52

53

pg-query-stream is built on these key components:

54

55

- **Cursor-based Streaming**: Uses PostgreSQL cursors via `pg-cursor` for efficient memory usage

56

- **Stream Integration**: Extends Node.js `Readable` stream for standard stream operations

57

- **pg Client Integration**: Implements `Submittable` interface for seamless pg client compatibility

58

- **Configurable Batching**: Controls memory usage through batch size and high water mark settings

59

60

## Capabilities

61

62

### Query Stream Creation

63

64

Create a streaming query that reads results in batches using PostgreSQL cursors.

65

66

```typescript { .api }

67

class QueryStream extends Readable implements Submittable {

68

constructor(

69

text: string,

70

values?: any[],

71

config?: QueryStreamConfig

72

);

73

}

74

```

75

76

### Stream Configuration

77

78

Configure memory usage and result format for the query stream.

79

80

```typescript { .api }

81

interface QueryStreamConfig {

82

batchSize?: number; // Number of rows to fetch per batch

83

highWaterMark?: number; // Stream buffer size (defaults to 100)

84

rowMode?: 'array'; // Return rows as arrays instead of objects

85

types?: any; // Custom type parsers for result columns

86

}

87

```

88

89

### Database Integration

90

91

Submit the query to a PostgreSQL connection for execution.

92

93

```typescript { .api }

94

submit(connection: Connection): void;

95

```

96

97

### Stream Properties

98

99

Access underlying cursor and result metadata.

100

101

```typescript { .api }

102

cursor: any; // The underlying pg-cursor instance

103

_result: any; // Query result metadata from pg-cursor

104

```

105

106

## Usage Examples

107

108

### Basic Streaming

109

110

```typescript

111

import pg from "pg";

112

import QueryStream from "pg-query-stream";

113

114

const pool = new pg.Pool();

115

116

pool.connect((err, client, done) => {

117

if (err) throw err;

118

119

const query = new QueryStream('SELECT * FROM users', []);

120

const stream = client.query(query);

121

122

stream.on('data', (row) => {

123

// Process each user row

124

console.log(`User: ${row.name}, Email: ${row.email}`);

125

});

126

127

stream.on('end', done);

128

stream.on('error', (err) => {

129

done();

130

throw err;

131

});

132

});

133

```

134

135

### With Configuration Options

136

137

```typescript

138

const query = new QueryStream(

139

'SELECT id, data FROM large_table WHERE active = $1',

140

[true],

141

{

142

batchSize: 50, // Fetch 50 rows at a time

143

highWaterMark: 25, // Buffer up to 25 rows

144

rowMode: 'array' // Return [id, data] instead of {id, data}

145

}

146

);

147

148

const stream = client.query(query);

149

stream.on('data', (row) => {

150

const [id, data] = row; // Array destructuring

151

console.log(`ID: ${id}, Data: ${data}`);

152

});

153

```

154

155

### Async Iteration (Node.js 10+)

156

157

```typescript

158

const query = new QueryStream('SELECT * FROM products', []);

159

const stream = client.query(query);

160

161

for await (const row of stream) {

162

// Process each product

163

console.log(`Product: ${row.name}, Price: ${row.price}`);

164

}

165

```

166

167

### Piping to Transform Streams

168

169

```typescript

170

import { Transform } from 'stream';

171

172

const query = new QueryStream('SELECT * FROM orders', []);

173

const stream = client.query(query);

174

175

const transform = new Transform({

176

objectMode: true,

177

transform(row, encoding, callback) {

178

// Transform each order

179

const transformed = {

180

orderId: row.id,

181

total: parseFloat(row.total),

182

date: new Date(row.created_at)

183

};

184

callback(null, transformed);

185

}

186

});

187

188

stream.pipe(transform).on('data', (order) => {

189

console.log('Processed order:', order);

190

});

191

```

192

193

### Custom Type Parsing

194

195

```typescript

196

const customTypes = {

197

getTypeParser: (oid: number) => {

198

// Custom parser for specific PostgreSQL types

199

if (oid === 1184) { // timestamptz

200

return (value: string) => new Date(value);

201

}

202

return (value: string) => value;

203

}

204

};

205

206

const query = new QueryStream(

207

'SELECT created_at, data FROM events',

208

[],

209

{ types: customTypes }

210

);

211

```

212

213

## Stream Events

214

215

pg-query-stream emits all standard Node.js Readable stream events:

216

217

- **'data'**: Emitted for each row returned from the query

218

- **'end'**: Emitted when all rows have been read

219

- **'error'**: Emitted when a query or cursor error occurs

220

- **'close'**: Emitted when the stream is destroyed

221

- **'readable'**: Emitted when data is available to read

222

223

## Memory Management

224

225

The stream maintains a small memory footprint by:

226

227

- **Cursor-based fetching**: Only keeps a small batch of rows in memory

228

- **Configurable batch size**: Control how many rows are fetched at once

229

- **Stream backpressure**: Automatically pauses fetching when downstream is slow

230

- **Automatic cleanup**: Cursor is closed when stream ends or is destroyed

231

232

## Error Handling

233

234

```typescript

235

const query = new QueryStream('SELECT * FROM table', []);

236

const stream = client.query(query);

237

238

stream.on('error', (err) => {

239

console.error('Query error:', err.message);

240

// Stream automatically destroys itself on error

241

});

242

243

// Handle connection errors separately

244

client.on('error', (err) => {

245

console.error('Client error:', err.message);

246

});

247

```

248

249

## Performance Considerations

250

251

- **Batch Size**: Larger batches reduce network roundtrips but use more memory

252

- **High Water Mark**: Controls internal buffering; tune based on processing speed

253

- **Row Mode**: Array mode is slightly faster than object mode for simple data

254

- **Cursor Overhead**: Small performance cost compared to loading all results in memory

255

- **Network Latency**: Batch size becomes more important with high network latency

256

257

## Limitations

258

259

- **JavaScript Client Only**: Does not work with native libpq bindings

260

- **Read-Only**: Stream is read-only; no updates through the stream

261

- **Single Query**: Each stream handles one query; create new instances for multiple queries

262

- **Connection Binding**: Stream must complete before connection can be used for other queries