0
# CQLEngine ORM
1
2
Object-relational mapping system with Django-inspired model definitions, query operations, and schema management. CQLEngine provides a high-level interface for working with Cassandra data through Python objects.
3
4
## Capabilities
5
6
### Connection Setup
7
8
Connection management and configuration for CQLEngine operations.
9
10
```python { .api }
11
def setup(hosts, default_keyspace, consistency=ConsistencyLevel.ONE, lazy_connect=False, retry_connect=False, **kwargs):
12
"""
13
Setup CQLEngine connection to Cassandra cluster.
14
15
Parameters:
16
- hosts (list): List of host addresses
17
- default_keyspace (str): Default keyspace name (required)
18
- consistency (int): Default consistency level (default: ConsistencyLevel.ONE)
19
- lazy_connect (bool): Whether to connect lazily
20
- retry_connect (bool): Whether to retry failed connections
21
- kwargs: Additional cluster configuration options
22
23
Note: Must be called before using any CQLEngine models.
24
"""
25
26
def execute(query, parameters=None, consistency_level=None, timeout=None):
27
"""
28
Execute a CQL query directly.
29
30
Parameters:
31
- query (str): CQL query string
32
- parameters (list or dict): Query parameters
33
- consistency_level (int): Consistency level for this query
34
- timeout (float): Query timeout in seconds
35
36
Returns:
37
ResultSet: Query execution results
38
"""
39
40
def get_session():
41
"""
42
Get the current CQLEngine session.
43
44
Returns:
45
Session: Active session object
46
"""
47
48
def get_cluster():
49
"""
50
Get the current CQLEngine cluster.
51
52
Returns:
53
Cluster: Active cluster object
54
"""
55
```
56
57
### Model Definition
58
59
Base model class and column types for defining Cassandra table models.
60
61
```python { .api }
62
class Model:
63
"""Base class for CQLEngine models."""
64
65
def __init__(self, **kwargs):
66
"""
67
Initialize model instance with field values.
68
69
Parameters:
70
- kwargs: Field name/value pairs
71
"""
72
73
def save(self):
74
"""
75
Save this model instance to Cassandra.
76
77
Returns:
78
Model: The saved model instance
79
"""
80
81
def delete(self):
82
"""
83
Delete this model instance from Cassandra.
84
"""
85
86
def update(self, **kwargs):
87
"""
88
Update specific fields of this model instance.
89
90
Parameters:
91
- kwargs: Field name/value pairs to update
92
93
Returns:
94
Model: The updated model instance
95
"""
96
97
@classmethod
98
def create(cls, **kwargs):
99
"""
100
Create and save a new model instance.
101
102
Parameters:
103
- kwargs: Field name/value pairs
104
105
Returns:
106
Model: The created model instance
107
"""
108
109
@classmethod
110
def objects(cls):
111
"""
112
Get a QuerySet for this model.
113
114
Returns:
115
ModelQuerySet: QuerySet for querying model instances
116
"""
117
118
@classmethod
119
def all(cls):
120
"""
121
Get all instances of this model.
122
123
Returns:
124
ModelQuerySet: QuerySet containing all instances
125
"""
126
127
@classmethod
128
def get(cls, **kwargs):
129
"""
130
Get a single model instance by primary key.
131
132
Parameters:
133
- kwargs: Primary key field values
134
135
Returns:
136
Model: The matching model instance
137
138
Raises:
139
- DoesNotExist: No matching instance found
140
- MultipleObjectsReturned: Multiple instances found
141
"""
142
143
@classmethod
144
def filter(cls, **kwargs):
145
"""
146
Filter model instances by field values.
147
148
Parameters:
149
- kwargs: Field filter conditions
150
151
Returns:
152
ModelQuerySet: Filtered QuerySet
153
"""
154
155
def validate(self):
156
"""
157
Validate this model instance.
158
159
Raises:
160
- ValidationError: If validation fails
161
"""
162
163
def __table_name__(self):
164
"""
165
Get the Cassandra table name for this model.
166
167
Returns:
168
str: Table name
169
"""
170
171
class Meta:
172
"""Metaclass for model configuration."""
173
pass
174
```
175
176
### Column Types
177
178
Column type definitions for model fields with validation and serialization.
179
180
```python { .api }
181
class Column:
182
"""Base class for all column types."""
183
184
def __init__(self, primary_key=False, partition_key=False, clustering_order=None,
185
required=True, default=None, db_field=None, index=False,
186
static=False, discriminator_column=False):
187
"""
188
Base column initialization.
189
190
Parameters:
191
- primary_key (bool): Whether this is a primary key column
192
- partition_key (bool): Whether this is part of the partition key
193
- clustering_order (str): Clustering order ('ASC' or 'DESC')
194
- required (bool): Whether the field is required
195
- default: Default value or callable
196
- db_field (str): Custom database column name
197
- index (bool): Whether to create a secondary index
198
- static (bool): Whether this is a static column
199
- discriminator_column (bool): Whether this is a discriminator column
200
"""
201
202
# Numeric Column Types
203
class Integer(Column):
204
"""32-bit signed integer column."""
205
206
class BigInt(Column):
207
"""64-bit signed integer column."""
208
209
class SmallInt(Column):
210
"""16-bit signed integer column."""
211
212
class TinyInt(Column):
213
"""8-bit signed integer column."""
214
215
class VarInt(Column):
216
"""Variable precision integer column."""
217
218
class Counter(Column):
219
"""Counter column (special Cassandra counter type)."""
220
221
class Float(Column):
222
"""32-bit floating point column."""
223
224
class Double(Column):
225
"""64-bit floating point column."""
226
227
class Decimal(Column):
228
"""High-precision decimal column."""
229
230
# Text Column Types
231
class Text(Column):
232
"""Text/varchar column."""
233
234
class Ascii(Column):
235
"""ASCII-only text column."""
236
237
Varchar = Text # Alias for Text
238
239
# Binary and Network Types
240
class Blob(Column):
241
"""Binary data column."""
242
243
class Inet(Column):
244
"""IP address column."""
245
246
# Temporal Column Types
247
class DateTime(Column):
248
"""Timestamp column with datetime values."""
249
250
class Date(Column):
251
"""Date column (date only, no time)."""
252
253
class Time(Column):
254
"""Time column (time of day only)."""
255
256
class UUID(Column):
257
"""UUID column."""
258
259
class TimeUUID(Column):
260
"""Time-based UUID column (version 1)."""
261
262
# Boolean Type
263
class Boolean(Column):
264
"""Boolean column."""
265
266
# Collection Column Types
267
class List(Column):
268
"""List collection column."""
269
270
def __init__(self, value_type, **kwargs):
271
"""
272
Parameters:
273
- value_type (Column): Type of list elements
274
- kwargs: Additional column options
275
"""
276
277
class Set(Column):
278
"""Set collection column."""
279
280
def __init__(self, value_type, **kwargs):
281
"""
282
Parameters:
283
- value_type (Column): Type of set elements
284
- kwargs: Additional column options
285
"""
286
287
class Map(Column):
288
"""Map collection column."""
289
290
def __init__(self, key_type, value_type, **kwargs):
291
"""
292
Parameters:
293
- key_type (Column): Type of map keys
294
- value_type (Column): Type of map values
295
- kwargs: Additional column options
296
"""
297
298
# Complex Types
299
class UserDefinedType(Column):
300
"""User-defined type column."""
301
302
def __init__(self, user_type, **kwargs):
303
"""
304
Parameters:
305
- user_type (UserType): User-defined type class
306
- kwargs: Additional column options
307
"""
308
```
309
310
### Query Operations
311
312
QuerySet classes for building and executing queries on models.
313
314
```python { .api }
315
class QuerySet:
316
"""Base QuerySet for model queries."""
317
318
def filter(self, **kwargs):
319
"""
320
Filter results by field values.
321
322
Parameters:
323
- kwargs: Field filter conditions
324
325
Returns:
326
QuerySet: Filtered QuerySet
327
"""
328
329
def limit(self, count):
330
"""
331
Limit the number of results.
332
333
Parameters:
334
- count (int): Maximum number of results
335
336
Returns:
337
QuerySet: Limited QuerySet
338
"""
339
340
def allow_filtering(self):
341
"""
342
Allow server-side filtering (ALLOW FILTERING).
343
344
Returns:
345
QuerySet: QuerySet with filtering enabled
346
"""
347
348
def consistency(self, consistency_level):
349
"""
350
Set consistency level for this query.
351
352
Parameters:
353
- consistency_level (int): Consistency level
354
355
Returns:
356
QuerySet: QuerySet with specified consistency
357
"""
358
359
def timeout(self, timeout):
360
"""
361
Set query timeout.
362
363
Parameters:
364
- timeout (float): Timeout in seconds
365
366
Returns:
367
QuerySet: QuerySet with specified timeout
368
"""
369
370
def count(self):
371
"""
372
Get the count of matching records.
373
374
Returns:
375
int: Number of matching records
376
"""
377
378
def get(self):
379
"""
380
Get a single result.
381
382
Returns:
383
Model: Single model instance
384
385
Raises:
386
- DoesNotExist: No results found
387
- MultipleObjectsReturned: Multiple results found
388
"""
389
390
def first(self):
391
"""
392
Get the first result.
393
394
Returns:
395
Model: First model instance or None
396
"""
397
398
def all(self):
399
"""
400
Get all results.
401
402
Returns:
403
list: List of model instances
404
"""
405
406
def __iter__(self):
407
"""Iterate over results."""
408
409
def __len__(self):
410
"""Get the number of results."""
411
412
class ModelQuerySet(QuerySet):
413
"""QuerySet specific to model classes."""
414
415
def update(self, **kwargs):
416
"""
417
Update all matching records.
418
419
Parameters:
420
- kwargs: Field values to update
421
422
Returns:
423
int: Number of updated records
424
"""
425
426
def delete(self):
427
"""
428
Delete all matching records.
429
430
Returns:
431
int: Number of deleted records
432
"""
433
434
def values_list(self, *fields, flat=False):
435
"""
436
Return specific field values instead of model instances.
437
438
Parameters:
439
- fields: Field names to include
440
- flat (bool): Return flat list for single field
441
442
Returns:
443
list: List of field value tuples or single values
444
"""
445
446
def only(self, *fields):
447
"""
448
Only fetch specified fields.
449
450
Parameters:
451
- fields: Field names to fetch
452
453
Returns:
454
ModelQuerySet: QuerySet with limited fields
455
"""
456
457
class BatchQuery:
458
"""Batch multiple operations together."""
459
460
def __init__(self, batch_type=None, consistency=None, execute_on_exception=False):
461
"""
462
Initialize batch query.
463
464
Parameters:
465
- batch_type (int): Type of batch (LOGGED, UNLOGGED, COUNTER)
466
- consistency (int): Consistency level for the batch
467
- execute_on_exception (bool): Execute batch even if exception occurs
468
"""
469
470
def save(self, model_instance):
471
"""
472
Add model save to the batch.
473
474
Parameters:
475
- model_instance (Model): Model instance to save
476
"""
477
478
def create(self, model_class, **kwargs):
479
"""
480
Add model creation to the batch.
481
482
Parameters:
483
- model_class (Model): Model class to create
484
- kwargs: Field values for new instance
485
"""
486
487
def update(self, model_instance, **kwargs):
488
"""
489
Add model update to the batch.
490
491
Parameters:
492
- model_instance (Model): Model instance to update
493
- kwargs: Field values to update
494
"""
495
496
def delete(self, model_instance):
497
"""
498
Add model deletion to the batch.
499
500
Parameters:
501
- model_instance (Model): Model instance to delete
502
"""
503
504
def execute(self):
505
"""Execute all operations in the batch."""
506
507
def __enter__(self):
508
"""Context manager entry."""
509
510
def __exit__(self, exc_type, exc_val, exc_tb):
511
"""Context manager exit."""
512
```
513
514
### Schema Management
515
516
Functions for managing keyspaces, tables, and types.
517
518
```python { .api }
519
def sync_table(model, keyspaces=None):
520
"""
521
Create or update table schema for a model.
522
523
Parameters:
524
- model (Model): Model class to sync
525
- keyspaces (list): Keyspaces to sync in (default: all configured)
526
"""
527
528
def sync_type(keyspace, user_type_model):
529
"""
530
Create or update user-defined type.
531
532
Parameters:
533
- keyspace (str): Keyspace name
534
- user_type_model (UserType): User type model to sync
535
"""
536
537
def create_keyspace_simple(keyspace_name, replication_factor):
538
"""
539
Create keyspace with SimpleStrategy.
540
541
Parameters:
542
- keyspace_name (str): Name of keyspace to create
543
- replication_factor (int): Replication factor
544
"""
545
546
def create_keyspace_network_topology(keyspace_name, dc_replication_map):
547
"""
548
Create keyspace with NetworkTopologyStrategy.
549
550
Parameters:
551
- keyspace_name (str): Name of keyspace to create
552
- dc_replication_map (dict): Replication factors by datacenter
553
"""
554
555
def drop_keyspace(keyspace_name):
556
"""
557
Drop a keyspace.
558
559
Parameters:
560
- keyspace_name (str): Name of keyspace to drop
561
"""
562
563
def drop_table(model):
564
"""
565
Drop table for a model.
566
567
Parameters:
568
- model (Model): Model class whose table to drop
569
"""
570
```
571
572
### User-Defined Types
573
574
Support for Cassandra user-defined types in models.
575
576
```python { .api }
577
class UserType:
578
"""Base class for user-defined types."""
579
580
def __init__(self, **kwargs):
581
"""
582
Initialize UDT instance.
583
584
Parameters:
585
- kwargs: Field name/value pairs
586
"""
587
588
@classmethod
589
def create(cls, **kwargs):
590
"""
591
Create UDT instance.
592
593
Parameters:
594
- kwargs: Field values
595
596
Returns:
597
UserType: Created UDT instance
598
"""
599
600
def validate(self):
601
"""
602
Validate UDT instance.
603
604
Raises:
605
- ValidationError: If validation fails
606
"""
607
```
608
609
### Query Functions
610
611
CQL functions for use in queries.
612
613
```python { .api }
614
def Token(*columns):
615
"""
616
Generate token function for partition key columns.
617
618
Parameters:
619
- columns: Column values for token generation
620
621
Returns:
622
Function object for use in queries
623
"""
624
625
def MinTimeUUID(timestamp):
626
"""
627
Generate minimum TimeUUID for a timestamp.
628
629
Parameters:
630
- timestamp (datetime): Timestamp for UUID generation
631
632
Returns:
633
Function object for use in queries
634
"""
635
636
def MaxTimeUUID(timestamp):
637
"""
638
Generate maximum TimeUUID for a timestamp.
639
640
Parameters:
641
- timestamp (datetime): Timestamp for UUID generation
642
643
Returns:
644
Function object for use in queries
645
"""
646
```
647
648
### Exception Classes
649
650
CQLEngine-specific exception classes.
651
652
```python { .api }
653
class CQLEngineException(Exception):
654
"""Base exception for CQLEngine operations."""
655
656
class ValidationError(CQLEngineException):
657
"""Field validation error."""
658
659
class DoesNotExist(CQLEngineException):
660
"""Model instance does not exist."""
661
662
class MultipleObjectsReturned(CQLEngineException):
663
"""Multiple instances returned when one expected."""
664
665
class LWTException(CQLEngineException):
666
"""Lightweight transaction (conditional update) failed."""
667
```
668
669
## Usage Examples
670
671
### Model Definition and Setup
672
673
```python
674
from cassandra.cqlengine import connection, management
675
from cassandra.cqlengine.models import Model
676
from cassandra.cqlengine.columns import *
677
import uuid
678
679
# Setup connection
680
connection.setup(['127.0.0.1'], keyspace='blog', consistency=ConsistencyLevel.ONE)
681
682
# Create keyspace
683
management.create_keyspace_simple('blog', 1)
684
685
# Define models
686
class User(Model):
687
__keyspace__ = 'blog'
688
__table_name__ = 'users'
689
690
id = UUID(primary_key=True, default=uuid.uuid4)
691
username = Text(required=True, index=True)
692
email = Text(required=True)
693
created_at = DateTime(default=datetime.utcnow)
694
is_active = Boolean(default=True)
695
profile = UserDefinedType(user_type='user_profile')
696
697
class Meta:
698
get_pk_field = 'id'
699
700
class Post(Model):
701
__keyspace__ = 'blog'
702
703
id = UUID(partition_key=True, default=uuid.uuid4)
704
created_at = DateTime(primary_key=True, clustering_order='DESC')
705
author_id = UUID(required=True, index=True)
706
title = Text(required=True)
707
content = Text(required=True)
708
tags = Set(Text)
709
view_count = Counter()
710
711
class Meta:
712
table_name = 'posts'
713
714
# Sync table schemas
715
management.sync_table(User)
716
management.sync_table(Post)
717
```
718
719
### Basic CRUD Operations
720
721
```python
722
from datetime import datetime
723
724
# Create users
725
user1 = User.create(
726
username='alice',
727
email='alice@example.com'
728
)
729
730
user2 = User(
731
username='bob',
732
email='bob@example.com',
733
created_at=datetime.utcnow()
734
)
735
user2.save()
736
737
# Read users
738
alice = User.get(username='alice')
739
print(f"User: {alice.username} ({alice.email})")
740
741
# Update user
742
alice.email = 'alice.smith@example.com'
743
alice.save()
744
745
# Or update specific fields
746
alice.update(is_active=False)
747
748
# Delete user
749
inactive_users = User.filter(is_active=False)
750
for user in inactive_users:
751
user.delete()
752
```
753
754
### Advanced Querying
755
756
```python
757
# Complex queries
758
recent_posts = Post.filter(
759
created_at__gte=datetime.utcnow() - timedelta(days=7)
760
).limit(10)
761
762
for post in recent_posts:
763
print(f"Post: {post.title} by {post.author_id}")
764
765
# Query with filtering
766
tagged_posts = Post.filter(
767
tags__contains='python'
768
).allow_filtering().all()
769
770
# Query with specific consistency
771
important_posts = Post.objects().consistency(ConsistencyLevel.QUORUM).filter(
772
author_id=user1.id
773
)
774
775
# Count queries
776
total_posts = Post.objects().count()
777
alice_posts = Post.filter(author_id=alice.id).count()
778
779
# Field selection
780
usernames = User.objects().values_list('username', flat=True)
781
user_info = User.objects().values_list('username', 'email')
782
783
# Token-based queries (for pagination)
784
from cassandra.cqlengine.functions import Token
785
786
posts_page = Post.filter(
787
pk__token__gt=Token(last_post_id)
788
).limit(20)
789
```
790
791
### Working with Collections
792
793
```python
794
# Create post with tags
795
post = Post.create(
796
author_id=alice.id,
797
title='Python and Cassandra',
798
content='How to use CQLEngine...',
799
tags={'python', 'cassandra', 'database'}
800
)
801
802
# Update collections
803
post.tags.add('tutorial')
804
post.tags.remove('database')
805
post.save()
806
807
# Query by collection values
808
python_posts = Post.filter(tags__contains='python').allow_filtering()
809
```
810
811
### Batch Operations
812
813
```python
814
from cassandra.cqlengine.query import BatchQuery
815
816
# Create multiple records in a batch
817
with BatchQuery() as batch:
818
batch.create(Post,
819
author_id=alice.id,
820
title='First Post',
821
content='Hello world!')
822
batch.create(Post,
823
author_id=alice.id,
824
title='Second Post',
825
content='More content...')
826
827
# Update multiple records in a batch
828
with BatchQuery() as batch:
829
for post in alice_posts:
830
batch.update(post, view_count=post.view_count + 1)
831
832
# Mixed operations in batch
833
with BatchQuery() as batch:
834
batch.save(new_user)
835
batch.update(existing_user, email='new@example.com')
836
batch.delete(old_user)
837
```
838
839
### User-Defined Types
840
841
```python
842
from cassandra.cqlengine.usertype import UserType
843
844
# Define UDT
845
class UserProfile(UserType):
846
__keyspace__ = 'blog'
847
__type_name__ = 'user_profile'
848
849
bio = Text()
850
website = Text()
851
location = Text()
852
birth_date = Date()
853
854
# Sync UDT
855
management.sync_type('blog', UserProfile)
856
857
# Use UDT in model
858
class User(Model):
859
__keyspace__ = 'blog'
860
861
id = UUID(primary_key=True, default=uuid.uuid4)
862
username = Text(required=True)
863
profile = UserDefinedType(user_type=UserProfile)
864
865
# Create user with UDT
866
profile = UserProfile(
867
bio='Software developer',
868
website='https://alice.dev',
869
location='San Francisco, CA'
870
)
871
872
user = User.create(
873
username='alice',
874
profile=profile
875
)
876
877
# Access UDT fields
878
print(f"Bio: {user.profile.bio}")
879
print(f"Location: {user.profile.location}")
880
```
881
882
### Time-Based Queries
883
884
```python
885
from cassandra.cqlengine.functions import MinTimeUUID, MaxTimeUUID
886
from datetime import datetime, timedelta
887
888
# Query posts from a specific time range
889
start_time = datetime.utcnow() - timedelta(days=1)
890
end_time = datetime.utcnow()
891
892
# For TimeUUID columns
893
posts_today = Post.filter(
894
created_at__gte=MinTimeUUID(start_time),
895
created_at__lte=MaxTimeUUID(end_time)
896
)
897
898
# Time-based pagination
899
last_week = datetime.utcnow() - timedelta(days=7)
900
posts_page = Post.filter(
901
created_at__gte=MinTimeUUID(last_week)
902
).limit(50)
903
904
for post in posts_page:
905
print(f"Posted at: {post.created_at}")
906
```
907
908
### Error Handling
909
910
```python
911
from cassandra.cqlengine import DoesNotExist, MultipleObjectsReturned, ValidationError
912
913
# Handle missing records
914
try:
915
user = User.get(username='nonexistent')
916
except DoesNotExist:
917
print("User not found")
918
919
# Handle multiple results
920
try:
921
user = User.get(is_active=True) # Multiple users might be active
922
except MultipleObjectsReturned:
923
print("Multiple active users found")
924
users = User.filter(is_active=True)
925
user = users.first()
926
927
# Handle validation errors
928
try:
929
invalid_user = User(username='', email='invalid-email')
930
invalid_user.validate()
931
except ValidationError as e:
932
print(f"Validation failed: {e}")
933
934
# Custom validation
935
class Post(Model):
936
title = Text(required=True)
937
content = Text(required=True)
938
939
def validate(self):
940
super().validate()
941
if len(self.title) < 3:
942
raise ValidationError("Title must be at least 3 characters")
943
if len(self.content) < 10:
944
raise ValidationError("Content must be at least 10 characters")
945
```
946
947
### Schema Migration
948
949
```python
950
# Add new column to existing model
951
class User(Model):
952
__keyspace__ = 'blog'
953
954
id = UUID(primary_key=True)
955
username = Text(required=True)
956
email = Text(required=True)
957
created_at = DateTime(default=datetime.utcnow)
958
is_active = Boolean(default=True)
959
last_login = DateTime() # New column
960
login_count = Counter() # New counter column
961
962
# Sync changes
963
management.sync_table(User)
964
965
# The existing data will remain, new columns will be null for existing rows
966
967
# Populate new columns for existing users
968
for user in User.objects().all():
969
if user.last_login is None:
970
user.update(last_login=user.created_at, login_count=0)
971
```