or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

current-data-streams.mdentry-points.mdhistorical-data-streams.mdindex.mdmain-source.mdstate-historical-streams.md

historical-data-streams.mddocs/

0

# Historical Data Streams (Germany)

1

2

Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. These streams support efficient incremental updates by tracking the latest data date and only syncing new records.

3

4

## Capabilities

5

6

### Historical Cases Data

7

8

Historical COVID-19 cases data for Germany with incremental synchronization.

9

10

```python { .api }

11

class GermanyHistoryCases(IncrementalRkiCovidStream):

12

"""

13

Historical COVID-19 cases data for Germany.

14

15

API Endpoint: https://api.corona-zahlen.org/germany/history/cases/:days

16

Sync Mode: Incremental

17

Cursor Field: date

18

Primary Key: None

19

20

Provides historical daily cases data with incremental sync support

21

based on date field. Days parameter calculated from start_date.

22

"""

23

24

primary_key = None

25

26

def __init__(self, config, **kwargs):

27

"""

28

Initialize with configuration containing start_date.

29

30

Parameters:

31

- config: dict containing 'start_date' in YYYY-MM-DD format

32

"""

33

34

@property

35

def cursor_field(self) -> str:

36

"""Returns 'date' - the field used for incremental sync"""

37

38

@property

39

def source_defined_cursor(self) -> bool:

40

"""Returns False - cursor managed by connector, not API"""

41

42

def date_to_int(self, start_date) -> int:

43

"""

44

Convert start_date to days parameter for API.

45

46

Calculates difference between start_date and current date.

47

Returns minimum of 1 if date is in future.

48

"""

49

50

def get_updated_state(self, current_stream_state, latest_record):

51

"""

52

Update stream state with latest record date.

53

54

Compares cursor field values and returns state with

55

the maximum (most recent) date value.

56

"""

57

58

def read_records(self, stream_state=None, **kwargs):

59

"""

60

Read records with incremental filtering.

61

62

Filters records to only return those with dates

63

newer than the current stream state cursor.

64

"""

65

66

def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:

67

"""Returns path with calculated days: 'germany/history/cases/{days}'"""

68

```

69

70

### Historical Incidence Data

71

72

Historical COVID-19 incidence rates for Germany.

73

74

```python { .api }

75

class GermanHistoryIncidence(IncrementalRkiCovidStream):

76

"""

77

Historical COVID-19 incidence data for Germany.

78

79

API Endpoint: https://api.corona-zahlen.org/germany/history/incidence/:days

80

Sync Mode: Incremental

81

Cursor Field: date

82

Primary Key: None

83

84

Provides historical 7-day incidence rates per 100,000 population

85

with incremental synchronization capability.

86

"""

87

88

primary_key = None

89

cursor_field = "date"

90

source_defined_cursor = False

91

```

92

93

### Historical Deaths Data

94

95

Historical COVID-19 deaths data for Germany.

96

97

```python { .api }

98

class GermanHistoryDeaths(IncrementalRkiCovidStream):

99

"""

100

Historical COVID-19 deaths data for Germany.

101

102

API Endpoint: https://api.corona-zahlen.org/germany/history/deaths/:days

103

Sync Mode: Incremental

104

Cursor Field: date

105

Primary Key: None

106

107

Provides historical daily deaths data with incremental sync

108

for tracking mortality trends over time.

109

"""

110

111

primary_key = None

112

cursor_field = "date"

113

source_defined_cursor = False

114

```

115

116

### Historical Recovered Data

117

118

Historical COVID-19 recovery data for Germany.

119

120

```python { .api }

121

class GermanHistoryRecovered(IncrementalRkiCovidStream):

122

"""

123

Historical COVID-19 recovered cases data for Germany.

124

125

API Endpoint: https://api.corona-zahlen.org/germany/history/recovered/:days

126

Sync Mode: Incremental

127

Cursor Field: date

128

Primary Key: None

129

130

Provides historical daily recovery data with incremental sync

131

for tracking recovery trends and calculating active cases.

132

"""

133

134

primary_key = None

135

cursor_field = "date"

136

source_defined_cursor = False

137

```

138

139

### Historical Frozen Incidence Data

140

141

Historical COVID-19 frozen incidence data for Germany.

142

143

```python { .api }

144

class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream):

145

"""

146

Historical COVID-19 frozen incidence data for Germany.

147

148

API Endpoint: https://api.corona-zahlen.org/germany/history/frozen-incidence/:days

149

Sync Mode: Incremental

150

Cursor Field: date

151

Primary Key: None

152

153

Provides historical frozen incidence rates - incidence values

154

that are frozen at specific points in time for consistent

155

reporting and trend analysis.

156

"""

157

158

primary_key = None

159

cursor_field = "date"

160

source_defined_cursor = False

161

162

def parse_response(self, response, **kwargs):

163

"""

164

Parse frozen incidence response.

165

166

Extracts history data from nested response structure:

167

response.json().get("data").get("history")

168

"""

169

```

170

171

### Historical Hospitalization Data

172

173

Historical COVID-19 hospitalization data for Germany.

174

175

```python { .api }

176

class GermanHistoryHospitalization(IncrementalRkiCovidStream):

177

"""

178

Historical COVID-19 hospitalization data for Germany.

179

180

API Endpoint: https://api.corona-zahlen.org/germany/history/hospitalization/:days

181

Sync Mode: Incremental

182

Cursor Field: date

183

Primary Key: None

184

185

Provides historical hospitalization metrics including

186

new admissions and ICU utilization with incremental sync.

187

"""

188

189

primary_key = None

190

cursor_field = "date"

191

source_defined_cursor = False

192

```

193

194

## Base Incremental Stream Class

195

196

All historical Germany streams inherit from IncrementalRkiCovidStream.

197

198

```python { .api }

199

class IncrementalRkiCovidStream(RkiCovidStream, ABC):

200

"""

201

Base class for incremental RKI COVID streams.

202

203

Extends RkiCovidStream with incremental sync capabilities:

204

- Cursor field management

205

- Stream state tracking

206

- Incremental record filtering

207

- State checkpoint handling

208

"""

209

210

state_checkpoint_interval = None

211

212

@property

213

def cursor_field(self) -> str:

214

"""

215

Abstract property defining the cursor field name.

216

217

Must be implemented by subclasses to specify which

218

field is used for incremental synchronization.

219

"""

220

221

def get_updated_state(self, current_stream_state, latest_record):

222

"""

223

Abstract method for updating stream state.

224

225

Must be implemented by subclasses to define how

226

the stream state is updated with new records.

227

"""

228

```

229

230

## Usage Examples

231

232

### Incremental Sync Setup

233

234

```python

235

from source_rki_covid import SourceRkiCovid

236

237

source = SourceRkiCovid()

238

config = {"start_date": "2023-01-01"}

239

240

# Get historical streams

241

streams = source.streams(config)

242

historical_streams = [

243

stream for stream in streams

244

if 'History' in stream.__class__.__name__ and

245

'States' not in stream.__class__.__name__

246

]

247

248

print(f"Historical Germany streams: {len(historical_streams)}") # 6 streams

249

```

250

251

### Reading Historical Data

252

253

```python

254

# Example with initial sync (no stream state)

255

cases_stream = GermanyHistoryCases(config=config)

256

257

print(f"Cursor field: {cases_stream.cursor_field}") # 'date'

258

259

# Read all records from start_date

260

for record in cases_stream.read_records():

261

print(f"Date: {record['date']}, Cases: {record.get('cases', 0)}")

262

```

263

264

### Incremental Updates

265

266

```python

267

# Example with existing stream state

268

current_state = {"date": "2023-06-15"}

269

270

# Read only new records after the state date

271

for record in cases_stream.read_records(stream_state=current_state):

272

print(f"New record - Date: {record['date']}")

273

274

# Update state with latest record

275

updated_state = cases_stream.get_updated_state(current_state, record)

276

current_state = updated_state

277

```

278

279

### Date Range Calculation

280

281

```python

282

from datetime import datetime

283

284

# Understanding the date_to_int method

285

cases_stream = GermanyHistoryCases(config={"start_date": "2023-01-01"})

286

287

# Calculate days parameter for API call

288

days = cases_stream.date_to_int("2023-01-01")

289

print(f"Days parameter: {days}")

290

291

# This creates API path: germany/history/cases/{days}

292

path = cases_stream.path()

293

print(f"API path: {path}")

294

```

295

296

## Data Structure

297

298

Historical streams return time-series data with records containing:

299

300

- **date**: Date string in YYYY-MM-DD format (cursor field)

301

- **cases/deaths/recovered**: Daily counts for the specific metric

302

- **incidence**: 7-day incidence rate per 100,000 population

303

- **weekIncidence**: Weekly incidence calculations

304

- **delta**: Day-over-day changes in metrics

305

- **meta**: Metadata including last update timestamps

306

307

## Incremental Sync Behavior

308

309

1. **Initial Sync**: Fetches all data from start_date to current date

310

2. **State Management**: Tracks the latest date processed

311

3. **Incremental Updates**: Only syncs records newer than the last state date

312

4. **API Efficiency**: Uses calculated days parameter to limit API response size

313

5. **Data Consistency**: Ensures no duplicate records through date-based filtering