0
# Historical Data Streams (Germany)
1
2
Incremental streams providing historical COVID-19 data for Germany with date-based cursor synchronization. These streams support efficient incremental updates by tracking the latest data date and only syncing new records.
3
4
## Capabilities
5
6
### Historical Cases Data
7
8
Historical COVID-19 cases data for Germany with incremental synchronization.
9
10
```python { .api }
11
class GermanyHistoryCases(IncrementalRkiCovidStream):
12
"""
13
Historical COVID-19 cases data for Germany.
14
15
API Endpoint: https://api.corona-zahlen.org/germany/history/cases/:days
16
Sync Mode: Incremental
17
Cursor Field: date
18
Primary Key: None
19
20
Provides historical daily cases data with incremental sync support
21
based on date field. Days parameter calculated from start_date.
22
"""
23
24
primary_key = None
25
26
def __init__(self, config, **kwargs):
27
"""
28
Initialize with configuration containing start_date.
29
30
Parameters:
31
- config: dict containing 'start_date' in YYYY-MM-DD format
32
"""
33
34
@property
35
def cursor_field(self) -> str:
36
"""Returns 'date' - the field used for incremental sync"""
37
38
@property
39
def source_defined_cursor(self) -> bool:
40
"""Returns False - cursor managed by connector, not API"""
41
42
def date_to_int(self, start_date) -> int:
43
"""
44
Convert start_date to days parameter for API.
45
46
Calculates difference between start_date and current date.
47
Returns minimum of 1 if date is in future.
48
"""
49
50
def get_updated_state(self, current_stream_state, latest_record):
51
"""
52
Update stream state with latest record date.
53
54
Compares cursor field values and returns state with
55
the maximum (most recent) date value.
56
"""
57
58
def read_records(self, stream_state=None, **kwargs):
59
"""
60
Read records with incremental filtering.
61
62
Filters records to only return those with dates
63
newer than the current stream state cursor.
64
"""
65
66
def path(self, stream_state=None, stream_slice=None, next_page_token=None) -> str:
67
"""Returns path with calculated days: 'germany/history/cases/{days}'"""
68
```
69
70
### Historical Incidence Data
71
72
Historical COVID-19 incidence rates for Germany.
73
74
```python { .api }
75
class GermanHistoryIncidence(IncrementalRkiCovidStream):
76
"""
77
Historical COVID-19 incidence data for Germany.
78
79
API Endpoint: https://api.corona-zahlen.org/germany/history/incidence/:days
80
Sync Mode: Incremental
81
Cursor Field: date
82
Primary Key: None
83
84
Provides historical 7-day incidence rates per 100,000 population
85
with incremental synchronization capability.
86
"""
87
88
primary_key = None
89
cursor_field = "date"
90
source_defined_cursor = False
91
```
92
93
### Historical Deaths Data
94
95
Historical COVID-19 deaths data for Germany.
96
97
```python { .api }
98
class GermanHistoryDeaths(IncrementalRkiCovidStream):
99
"""
100
Historical COVID-19 deaths data for Germany.
101
102
API Endpoint: https://api.corona-zahlen.org/germany/history/deaths/:days
103
Sync Mode: Incremental
104
Cursor Field: date
105
Primary Key: None
106
107
Provides historical daily deaths data with incremental sync
108
for tracking mortality trends over time.
109
"""
110
111
primary_key = None
112
cursor_field = "date"
113
source_defined_cursor = False
114
```
115
116
### Historical Recovered Data
117
118
Historical COVID-19 recovery data for Germany.
119
120
```python { .api }
121
class GermanHistoryRecovered(IncrementalRkiCovidStream):
122
"""
123
Historical COVID-19 recovered cases data for Germany.
124
125
API Endpoint: https://api.corona-zahlen.org/germany/history/recovered/:days
126
Sync Mode: Incremental
127
Cursor Field: date
128
Primary Key: None
129
130
Provides historical daily recovery data with incremental sync
131
for tracking recovery trends and calculating active cases.
132
"""
133
134
primary_key = None
135
cursor_field = "date"
136
source_defined_cursor = False
137
```
138
139
### Historical Frozen Incidence Data
140
141
Historical COVID-19 frozen incidence data for Germany.
142
143
```python { .api }
144
class GermanHistoryFrozenIncidence(IncrementalRkiCovidStream):
145
"""
146
Historical COVID-19 frozen incidence data for Germany.
147
148
API Endpoint: https://api.corona-zahlen.org/germany/history/frozen-incidence/:days
149
Sync Mode: Incremental
150
Cursor Field: date
151
Primary Key: None
152
153
Provides historical frozen incidence rates - incidence values
154
that are frozen at specific points in time for consistent
155
reporting and trend analysis.
156
"""
157
158
primary_key = None
159
cursor_field = "date"
160
source_defined_cursor = False
161
162
def parse_response(self, response, **kwargs):
163
"""
164
Parse frozen incidence response.
165
166
Extracts history data from nested response structure:
167
response.json().get("data").get("history")
168
"""
169
```
170
171
### Historical Hospitalization Data
172
173
Historical COVID-19 hospitalization data for Germany.
174
175
```python { .api }
176
class GermanHistoryHospitalization(IncrementalRkiCovidStream):
177
"""
178
Historical COVID-19 hospitalization data for Germany.
179
180
API Endpoint: https://api.corona-zahlen.org/germany/history/hospitalization/:days
181
Sync Mode: Incremental
182
Cursor Field: date
183
Primary Key: None
184
185
Provides historical hospitalization metrics including
186
new admissions and ICU utilization with incremental sync.
187
"""
188
189
primary_key = None
190
cursor_field = "date"
191
source_defined_cursor = False
192
```
193
194
## Base Incremental Stream Class
195
196
All historical Germany streams inherit from IncrementalRkiCovidStream.
197
198
```python { .api }
199
class IncrementalRkiCovidStream(RkiCovidStream, ABC):
200
"""
201
Base class for incremental RKI COVID streams.
202
203
Extends RkiCovidStream with incremental sync capabilities:
204
- Cursor field management
205
- Stream state tracking
206
- Incremental record filtering
207
- State checkpoint handling
208
"""
209
210
state_checkpoint_interval = None
211
212
@property
213
def cursor_field(self) -> str:
214
"""
215
Abstract property defining the cursor field name.
216
217
Must be implemented by subclasses to specify which
218
field is used for incremental synchronization.
219
"""
220
221
def get_updated_state(self, current_stream_state, latest_record):
222
"""
223
Abstract method for updating stream state.
224
225
Must be implemented by subclasses to define how
226
the stream state is updated with new records.
227
"""
228
```
229
230
## Usage Examples
231
232
### Incremental Sync Setup
233
234
```python
235
from source_rki_covid import SourceRkiCovid
236
237
source = SourceRkiCovid()
238
config = {"start_date": "2023-01-01"}
239
240
# Get historical streams
241
streams = source.streams(config)
242
historical_streams = [
243
stream for stream in streams
244
if 'History' in stream.__class__.__name__ and
245
'States' not in stream.__class__.__name__
246
]
247
248
print(f"Historical Germany streams: {len(historical_streams)}") # 6 streams
249
```
250
251
### Reading Historical Data
252
253
```python
254
# Example with initial sync (no stream state)
255
cases_stream = GermanyHistoryCases(config=config)
256
257
print(f"Cursor field: {cases_stream.cursor_field}") # 'date'
258
259
# Read all records from start_date
260
for record in cases_stream.read_records():
261
print(f"Date: {record['date']}, Cases: {record.get('cases', 0)}")
262
```
263
264
### Incremental Updates
265
266
```python
267
# Example with existing stream state
268
current_state = {"date": "2023-06-15"}
269
270
# Read only new records after the state date
271
for record in cases_stream.read_records(stream_state=current_state):
272
print(f"New record - Date: {record['date']}")
273
274
# Update state with latest record
275
updated_state = cases_stream.get_updated_state(current_state, record)
276
current_state = updated_state
277
```
278
279
### Date Range Calculation
280
281
```python
282
from datetime import datetime
283
284
# Understanding the date_to_int method
285
cases_stream = GermanyHistoryCases(config={"start_date": "2023-01-01"})
286
287
# Calculate days parameter for API call
288
days = cases_stream.date_to_int("2023-01-01")
289
print(f"Days parameter: {days}")
290
291
# This creates API path: germany/history/cases/{days}
292
path = cases_stream.path()
293
print(f"API path: {path}")
294
```
295
296
## Data Structure
297
298
Historical streams return time-series data with records containing:
299
300
- **date**: Date string in YYYY-MM-DD format (cursor field)
301
- **cases/deaths/recovered**: Daily counts for the specific metric
302
- **incidence**: 7-day incidence rate per 100,000 population
303
- **weekIncidence**: Weekly incidence calculations
304
- **delta**: Day-over-day changes in metrics
305
- **meta**: Metadata including last update timestamps
306
307
## Incremental Sync Behavior
308
309
1. **Initial Sync**: Fetches all data from start_date to current date
310
2. **State Management**: Tracks the latest date processed
311
3. **Incremental Updates**: Only syncs records newer than the last state date
312
4. **API Efficiency**: Uses calculated days parameter to limit API response size
313
5. **Data Consistency**: Ensures no duplicate records through date-based filtering