0
# SQL and DataFrames
1
2
Structured data processing with DataFrames, SQL queries, and comprehensive built-in functions. This is the primary high-level interface for working with structured and semi-structured data in PySpark, providing optimized query execution through the Catalyst optimizer.
3
4
## Capabilities
5
6
### Spark Session
7
8
Unified entry point for DataFrame and Dataset functionality, replacing SQLContext in newer versions.
9
10
```python { .api }
11
class SparkSession:
12
def __init__(self, sparkContext, jsparkSession=None):
13
"""Create a new SparkSession."""
14
15
@classmethod
16
def builder(cls):
17
"""
18
Creates a Builder for constructing a SparkSession.
19
20
Returns:
21
SparkSession.Builder
22
"""
23
24
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
25
"""
26
Creates a DataFrame from RDD, list, pandas DataFrame, numpy array, or PyArrow Table.
27
28
Parameters:
29
- data: Input data (RDD, Iterable, pandas DataFrame, numpy array, PyArrow Table)
30
- schema (StructType, str, list): Schema definition for the DataFrame
31
- samplingRatio (float): Sample ratio for schema inference from RDD (0.0-1.0)
32
- verifySchema (bool): Verify data conforms to the provided schema
33
34
Returns:
35
DataFrame with the specified data and schema
36
"""
37
38
def sql(self, sqlQuery):
39
"""
40
Returns a DataFrame representing the result of the given query.
41
42
Parameters:
43
- sqlQuery (str): SQL query string
44
45
Returns:
46
DataFrame with query results
47
"""
48
49
def table(self, tableName):
50
"""
51
Returns the specified table as a DataFrame.
52
53
Parameters:
54
- tableName (str): Table name
55
56
Returns:
57
DataFrame representing the table
58
"""
59
60
def range(self, start, end=None, step=1, numPartitions=None):
61
"""
62
Create a DataFrame with single pyspark.sql.types.LongType column named id.
63
64
Parameters:
65
- start (int): Start value (inclusive)
66
- end (int): End value (exclusive)
67
- step (int): Step size
68
- numPartitions (int): Number of partitions
69
70
Returns:
71
DataFrame with id column
72
"""
73
74
def stop(self):
75
"""Stop the underlying SparkContext."""
76
77
@property
78
def read(self):
79
"""
80
Returns a DataFrameReader for reading data.
81
82
Returns:
83
DataFrameReader
84
"""
85
86
@property
87
def readStream(self):
88
"""
89
Returns a DataStreamReader for reading streaming data.
90
91
Returns:
92
DataStreamReader
93
"""
94
95
@property
96
def catalog(self):
97
"""
98
Interface to Spark's catalog of databases, tables and functions.
99
100
Returns:
101
Catalog
102
"""
103
104
@property
105
def udf(self):
106
"""
107
Returns a UDFRegistration for registering user-defined functions.
108
109
Returns:
110
UDFRegistration
111
"""
112
```
113
114
### SparkSession Builder
115
116
Builder pattern for creating SparkSession instances with configuration.
117
118
```python { .api }
119
class Builder:
120
def appName(self, name):
121
"""
122
Sets a name for the application.
123
124
Parameters:
125
- name (str): Application name
126
127
Returns:
128
Builder
129
"""
130
131
def master(self, master):
132
"""
133
Sets the Spark master URL.
134
135
Parameters:
136
- master (str): Master URL
137
138
Returns:
139
Builder
140
"""
141
142
def config(self, key=None, value=None, conf=None):
143
"""
144
Sets a config option or SparkConf.
145
146
Parameters:
147
- key (str): Configuration key
148
- value (str): Configuration value
149
- conf (SparkConf): SparkConf object
150
151
Returns:
152
Builder
153
"""
154
155
def enableHiveSupport(self):
156
"""
157
Enables Hive support.
158
159
Returns:
160
Builder
161
"""
162
163
def getOrCreate(self):
164
"""
165
Gets an existing SparkSession or creates a new one.
166
167
Returns:
168
SparkSession
169
"""
170
```
171
172
### DataFrame
173
174
Distributed collection of data organized into named columns.
175
176
```python { .api }
177
class DataFrame:
178
def select(self, *cols):
179
"""
180
Projects a set of expressions and returns a new DataFrame.
181
182
Parameters:
183
- cols: Column expressions or names
184
185
Returns:
186
DataFrame with selected columns
187
"""
188
189
def filter(self, condition):
190
"""
191
Filters rows using the given condition.
192
193
Parameters:
194
- condition: Filter condition (Column or string)
195
196
Returns:
197
Filtered DataFrame
198
"""
199
200
def where(self, condition):
201
"""
202
Filters rows using the given condition (alias for filter).
203
204
Parameters:
205
- condition: Filter condition (Column or string)
206
207
Returns:
208
Filtered DataFrame
209
"""
210
211
def groupBy(self, *cols):
212
"""
213
Group DataFrame using the specified columns.
214
215
Parameters:
216
- cols: Column names or expressions
217
218
Returns:
219
GroupedData for aggregation
220
"""
221
222
def agg(self, *exprs):
223
"""
224
Aggregate on the entire DataFrame without groups.
225
226
Parameters:
227
- exprs: Aggregation expressions
228
229
Returns:
230
DataFrame with aggregated results
231
"""
232
233
def orderBy(self, *cols, **kwargs):
234
"""
235
Sort DataFrame by specified columns.
236
237
Parameters:
238
- cols: Column names or expressions
239
- ascending (bool): Sort in ascending order
240
241
Returns:
242
Sorted DataFrame
243
"""
244
245
def sort(self, *cols, **kwargs):
246
"""
247
Sort DataFrame by specified columns (alias for orderBy).
248
249
Parameters:
250
- cols: Column names or expressions
251
- ascending (bool): Sort in ascending order
252
253
Returns:
254
Sorted DataFrame
255
"""
256
257
def join(self, other, on=None, how=None):
258
"""
259
Join with another DataFrame.
260
261
Parameters:
262
- other (DataFrame): DataFrame to join with
263
- on: Join condition (column names or expression)
264
- how (str): Join type ("inner", "outer", "left", "right", "semi", "anti")
265
266
Returns:
267
Joined DataFrame
268
"""
269
270
def union(self, other):
271
"""
272
Return a new DataFrame containing union of rows.
273
274
Parameters:
275
- other (DataFrame): Another DataFrame
276
277
Returns:
278
Union DataFrame
279
"""
280
281
def unionByName(self, other, allowMissingColumns=False):
282
"""
283
Return a new DataFrame containing union of rows by column names.
284
285
Parameters:
286
- other (DataFrame): Another DataFrame
287
- allowMissingColumns (bool): Allow missing columns
288
289
Returns:
290
Union DataFrame
291
"""
292
293
def intersect(self, other):
294
"""
295
Return a new DataFrame containing rows in both DataFrames.
296
297
Parameters:
298
- other (DataFrame): Another DataFrame
299
300
Returns:
301
Intersection DataFrame
302
"""
303
304
def subtract(self, other):
305
"""
306
Return a new DataFrame containing rows in this DataFrame but not in the other.
307
308
Parameters:
309
- other (DataFrame): Another DataFrame
310
311
Returns:
312
Difference DataFrame
313
"""
314
315
def distinct(self):
316
"""
317
Return a new DataFrame with distinct rows.
318
319
Returns:
320
DataFrame with distinct rows
321
"""
322
323
def dropDuplicates(self, subset=None):
324
"""
325
Return a new DataFrame with duplicate rows removed.
326
327
Parameters:
328
- subset (list): Column names to consider for duplicates
329
330
Returns:
331
DataFrame without duplicates
332
"""
333
334
def drop(self, *cols):
335
"""
336
Return a new DataFrame with specified columns dropped.
337
338
Parameters:
339
- cols: Column names to drop
340
341
Returns:
342
DataFrame with columns dropped
343
"""
344
345
def withColumn(self, colName, col):
346
"""
347
Return a new DataFrame by adding or replacing a column.
348
349
Parameters:
350
- colName (str): Column name
351
- col (Column): Column expression
352
353
Returns:
354
DataFrame with new/updated column
355
"""
356
357
def withColumnRenamed(self, existing, new):
358
"""
359
Return a new DataFrame by renaming a column.
360
361
Parameters:
362
- existing (str): Existing column name
363
- new (str): New column name
364
365
Returns:
366
DataFrame with renamed column
367
"""
368
369
def show(self, n=20, truncate=True, vertical=False):
370
"""
371
Print the first n rows to the console.
372
373
Parameters:
374
- n (int): Number of rows to show
375
- truncate (bool or int): Truncate strings (True/False or max character width)
376
- vertical (bool): Print rows vertically instead of horizontally
377
"""
378
379
def collect(self):
380
"""
381
Return all the records as a list of Row.
382
383
Returns:
384
List of Row objects
385
"""
386
387
def take(self, num):
388
"""
389
Return the first num rows as a list of Row.
390
391
Parameters:
392
- num (int): Number of rows to return
393
394
Returns:
395
List of Row objects
396
"""
397
398
def first(self):
399
"""
400
Return the first row as a Row.
401
402
Returns:
403
First Row
404
"""
405
406
def head(self, n=None):
407
"""
408
Return the first n rows or the first row if n is None.
409
410
Parameters:
411
- n (int): Number of rows to return
412
413
Returns:
414
Row or list of Rows
415
"""
416
417
def count(self):
418
"""
419
Return the number of rows in this DataFrame.
420
421
Returns:
422
Number of rows
423
"""
424
425
def describe(self, *cols):
426
"""
427
Compute basic statistics for numeric and string columns.
428
429
Parameters:
430
- cols: Column names
431
432
Returns:
433
DataFrame with statistics
434
"""
435
436
def summary(self, *statistics):
437
"""
438
Compute specified statistics for numeric and string columns.
439
440
Parameters:
441
- statistics: Statistics to compute
442
443
Returns:
444
DataFrame with statistics
445
"""
446
447
def cache(self):
448
"""
449
Persist this DataFrame with the default storage level.
450
451
Returns:
452
This DataFrame
453
"""
454
455
def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK):
456
"""
457
Persist this DataFrame with the given storage level.
458
459
Parameters:
460
- storageLevel (StorageLevel): Storage level
461
462
Returns:
463
This DataFrame
464
"""
465
466
def unpersist(self, blocking=False):
467
"""
468
Mark the DataFrame as non-persistent.
469
470
Parameters:
471
- blocking (bool): Whether to block until complete
472
473
Returns:
474
This DataFrame
475
"""
476
477
def coalesce(self, numPartitions):
478
"""
479
Return a new DataFrame with reduced number of partitions.
480
481
Parameters:
482
- numPartitions (int): Target number of partitions
483
484
Returns:
485
Coalesced DataFrame
486
"""
487
488
def repartition(self, numPartitions, *cols):
489
"""
490
Return a new DataFrame partitioned by the given expressions.
491
492
Parameters:
493
- numPartitions (int): Number of partitions
494
- cols: Partitioning expressions
495
496
Returns:
497
Repartitioned DataFrame
498
"""
499
500
def createOrReplaceTempView(self, name):
501
"""
502
Create or replace a local temporary view.
503
504
Parameters:
505
- name (str): View name
506
"""
507
508
def createGlobalTempView(self, name):
509
"""
510
Create a global temporary view.
511
512
Parameters:
513
- name (str): View name
514
"""
515
516
@property
517
def write(self):
518
"""
519
Interface for saving the content of DataFrame.
520
521
Returns:
522
DataFrameWriter
523
"""
524
525
@property
526
def writeStream(self):
527
"""
528
Interface for saving the content of streaming DataFrame.
529
530
Returns:
531
DataStreamWriter
532
"""
533
534
@property
535
def schema(self):
536
"""
537
Return the schema of this DataFrame.
538
539
Returns:
540
StructType representing the schema
541
"""
542
543
@property
544
def columns(self):
545
"""
546
Return all column names as a list.
547
548
Returns:
549
List of column names
550
"""
551
552
@property
553
def dtypes(self):
554
"""
555
Return all column names and their data types as a list.
556
557
Returns:
558
List of (name, type) tuples
559
"""
560
```
561
562
### DataFrame Specialized Functions
563
564
Specialized function classes for handling missing data and statistical operations.
565
566
```python { .api }
567
class DataFrameNaFunctions:
568
"""
569
Functionality for working with missing data in DataFrames.
570
Accessed via DataFrame.na property.
571
"""
572
573
def drop(self, how="any", thresh=None, subset=None):
574
"""
575
Drop rows with null values.
576
577
Parameters:
578
- how (str): "any" or "all" - drop rows with any/all null values
579
- thresh (int): Minimum number of non-null values required
580
- subset (list): Column subset to consider for null checking
581
582
Returns:
583
DataFrame with null rows dropped
584
"""
585
586
def fill(self, value, subset=None):
587
"""
588
Fill null values with specified value.
589
590
Parameters:
591
- value: Value to replace nulls (dict for per-column values)
592
- subset (list): Column subset to fill
593
594
Returns:
595
DataFrame with null values filled
596
"""
597
598
def replace(self, to_replace, value=None, subset=None):
599
"""
600
Replace specified values in DataFrame.
601
602
Parameters:
603
- to_replace: Value(s) to replace
604
- value: Replacement value(s)
605
- subset (list): Column subset to apply replacement
606
607
Returns:
608
DataFrame with values replaced
609
"""
610
611
class DataFrameStatFunctions:
612
"""
613
Functionality for statistical operations on DataFrames.
614
Accessed via DataFrame.stat property.
615
"""
616
617
def approxQuantile(self, col, probabilities, relativeError):
618
"""
619
Calculate approximate quantiles for numeric columns.
620
621
Parameters:
622
- col (str): Column name or list of column names
623
- probabilities (list): List of quantile probabilities (0.0 to 1.0)
624
- relativeError (float): Relative error tolerance
625
626
Returns:
627
List of quantile values
628
"""
629
630
def corr(self, col1, col2, method="pearson"):
631
"""
632
Calculate correlation between two columns.
633
634
Parameters:
635
- col1 (str): First column name
636
- col2 (str): Second column name
637
- method (str): Correlation method ("pearson" or "spearman")
638
639
Returns:
640
Correlation coefficient as float
641
"""
642
643
def cov(self, col1, col2):
644
"""
645
Calculate covariance between two columns.
646
647
Parameters:
648
- col1 (str): First column name
649
- col2 (str): Second column name
650
651
Returns:
652
Covariance as float
653
"""
654
655
def crosstab(self, col1, col2):
656
"""
657
Calculate cross-tabulation between two columns.
658
659
Parameters:
660
- col1 (str): First column name
661
- col2 (str): Second column name
662
663
Returns:
664
DataFrame with cross-tabulation results
665
"""
666
667
def freqItems(self, cols, support=None):
668
"""
669
Find frequent items for specified columns.
670
671
Parameters:
672
- cols (list): Column names
673
- support (float): Minimum support threshold
674
675
Returns:
676
DataFrame with frequent items
677
"""
678
679
def sampleBy(self, col, fractions, seed=None):
680
"""
681
Stratified sampling by column values.
682
683
Parameters:
684
- col (str): Column to stratify by
685
- fractions (dict): Sampling fractions per stratum
686
- seed (int): Random seed
687
688
Returns:
689
Sampled DataFrame
690
"""
691
```
692
693
### Column Operations
694
695
Column expressions for DataFrame transformations.
696
697
```python { .api }
698
class Column:
699
def alias(self, *alias, **kwargs):
700
"""
701
Return a column with an alias.
702
703
Parameters:
704
- alias: Alias name(s)
705
706
Returns:
707
Aliased Column
708
"""
709
710
def cast(self, dataType):
711
"""
712
Convert the column to a different data type.
713
714
Parameters:
715
- dataType: Target data type
716
717
Returns:
718
Casted Column
719
"""
720
721
def contains(self, other):
722
"""
723
Check if column contains the specified value.
724
725
Parameters:
726
- other: Value to check
727
728
Returns:
729
Boolean Column
730
"""
731
732
def startswith(self, other):
733
"""
734
Check if column starts with the specified string.
735
736
Parameters:
737
- other (str): String to check
738
739
Returns:
740
Boolean Column
741
"""
742
743
def endswith(self, other):
744
"""
745
Check if column ends with the specified string.
746
747
Parameters:
748
- other (str): String to check
749
750
Returns:
751
Boolean Column
752
"""
753
754
def isNull(self):
755
"""
756
Check if column is null.
757
758
Returns:
759
Boolean Column
760
"""
761
762
def isNotNull(self):
763
"""
764
Check if column is not null.
765
766
Returns:
767
Boolean Column
768
"""
769
770
def isin(self, *cols):
771
"""
772
Check if column value is in the specified list.
773
774
Parameters:
775
- cols: Values to check against
776
777
Returns:
778
Boolean Column
779
"""
780
781
def between(self, lowerBound, upperBound):
782
"""
783
Check if column is between two values.
784
785
Parameters:
786
- lowerBound: Lower bound
787
- upperBound: Upper bound
788
789
Returns:
790
Boolean Column
791
"""
792
793
def when(self, condition, value):
794
"""
795
Evaluate a list of conditions and return one of multiple possible result expressions.
796
797
Parameters:
798
- condition: Condition expression
799
- value: Value when condition is true
800
801
Returns:
802
Column with conditional logic
803
"""
804
805
def otherwise(self, value):
806
"""
807
Evaluate a list of conditions and return one of multiple possible result expressions.
808
809
Parameters:
810
- value: Default value
811
812
Returns:
813
Column with conditional logic
814
"""
815
816
def substr(self, startPos, length):
817
"""
818
Return a substring of the column.
819
820
Parameters:
821
- startPos (int): Starting position
822
- length (int): Length of substring
823
824
Returns:
825
Substring Column
826
"""
827
828
def asc(self):
829
"""
830
Return a sort expression based on ascending order.
831
832
Returns:
833
Ascending sort Column
834
"""
835
836
def desc(self):
837
"""
838
Return a sort expression based on descending order.
839
840
Returns:
841
Descending sort Column
842
"""
843
```
844
845
### Window Operations
846
847
Window functions for analytical operations over groups of rows.
848
849
```python { .api }
850
class Window:
851
"""
852
Utility functions for defining window specifications in DataFrames.
853
"""
854
855
unboundedPreceding: int
856
"""Represents unbounded preceding frame boundary."""
857
858
unboundedFollowing: int
859
"""Represents unbounded following frame boundary."""
860
861
currentRow: int
862
"""Represents current row frame boundary."""
863
864
@staticmethod
865
def partitionBy(*cols):
866
"""
867
Creates a WindowSpec with the partitioning defined.
868
869
Parameters:
870
- cols: Column names or expressions for partitioning
871
872
Returns:
873
WindowSpec with partitioning defined
874
"""
875
876
@staticmethod
877
def orderBy(*cols):
878
"""
879
Creates a WindowSpec with the ordering defined.
880
881
Parameters:
882
- cols: Column names or expressions for ordering
883
884
Returns:
885
WindowSpec with ordering defined
886
"""
887
888
@staticmethod
889
def rowsBetween(start, end):
890
"""
891
Creates a WindowSpec with row-based frame boundaries.
892
893
Parameters:
894
- start (int): Start boundary (inclusive)
895
- end (int): End boundary (inclusive)
896
897
Returns:
898
WindowSpec with frame boundaries defined
899
"""
900
901
@staticmethod
902
def rangeBetween(start, end):
903
"""
904
Creates a WindowSpec with range-based frame boundaries.
905
906
Parameters:
907
- start: Start boundary value
908
- end: End boundary value
909
910
Returns:
911
WindowSpec with range frame boundaries
912
"""
913
914
class WindowSpec:
915
"""
916
Window specification that defines partitioning, ordering, and frame boundaries.
917
"""
918
919
def partitionBy(*cols):
920
"""
921
Defines the partitioning columns for this window.
922
923
Parameters:
924
- cols: Column names or expressions
925
926
Returns:
927
WindowSpec with updated partitioning
928
"""
929
930
def orderBy(*cols):
931
"""
932
Defines the ordering columns for this window.
933
934
Parameters:
935
- cols: Column names or expressions with optional sort direction
936
937
Returns:
938
WindowSpec with updated ordering
939
"""
940
941
def rowsBetween(start, end):
942
"""
943
Defines row-based frame boundaries for this window.
944
945
Parameters:
946
- start (int): Start row offset
947
- end (int): End row offset
948
949
Returns:
950
WindowSpec with frame boundaries
951
"""
952
953
def rangeBetween(start, end):
954
"""
955
Defines range-based frame boundaries for this window.
956
957
Parameters:
958
- start: Start range value
959
- end: End range value
960
961
Returns:
962
WindowSpec with range frame boundaries
963
"""
964
```
965
966
### Data Reading and Writing
967
968
Interfaces for reading and writing data to various formats and sources.
969
970
```python { .api }
971
class DataFrameReader:
972
def format(self, source):
973
"""
974
Specify the input data source format.
975
976
Parameters:
977
- source (str): Data source format
978
979
Returns:
980
DataFrameReader
981
"""
982
983
def option(self, key, value):
984
"""
985
Add an input option for the underlying data source.
986
987
Parameters:
988
- key (str): Option key
989
- value: Option value
990
991
Returns:
992
DataFrameReader
993
"""
994
995
def options(self, **options):
996
"""
997
Add input options for the underlying data source.
998
999
Parameters:
1000
- options: Keyword arguments of options
1001
1002
Returns:
1003
DataFrameReader
1004
"""
1005
1006
def schema(self, schema):
1007
"""
1008
Specify the input schema.
1009
1010
Parameters:
1011
- schema: Schema definition
1012
1013
Returns:
1014
DataFrameReader
1015
"""
1016
1017
def load(self, path=None, format=None, schema=None, **options):
1018
"""
1019
Load data from a data source.
1020
1021
Parameters:
1022
- path (str): File path
1023
- format (str): Data source format
1024
- schema: Schema definition
1025
- options: Additional options
1026
1027
Returns:
1028
DataFrame
1029
"""
1030
1031
def csv(self, path, schema=None, sep=None, encoding=None, quote=None,
1032
escape=None, comment=None, header=None, inferSchema=None,
1033
ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
1034
nullValue=None, nanValue=None, positiveInf=None, negativeInf=None,
1035
dateFormat=None, timestampFormat=None, maxColumns=None,
1036
maxCharsPerColumn=None, maxMalformedLogPerPartition=None,
1037
mode=None, columnNameOfCorruptRecord=None, multiLine=None,
1038
charToEscapeQuoteEscaping=None, samplingRatio=None,
1039
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
1040
pathGlobFilter=None, recursiveFileLookup=None,
1041
modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None):
1042
"""
1043
Load a CSV file and return the results as a DataFrame.
1044
1045
Parameters:
1046
- path (str): CSV file path
1047
- schema: Schema for the CSV file
1048
- sep (str): Column separator
1049
- encoding (str): Character encoding
1050
- quote (str): Quote character
1051
- escape (str): Escape character
1052
- comment (str): Comment character
1053
- header (bool): Whether first line is header
1054
- inferSchema (bool): Automatically infer column types
1055
- ignoreLeadingWhiteSpace (bool): Ignore leading whitespaces
1056
- ignoreTrailingWhiteSpace (bool): Ignore trailing whitespaces
1057
- nullValue (str): String representation of null value
1058
- nanValue (str): String representation of NaN value
1059
- positiveInf (str): String representation of positive infinity
1060
- negativeInf (str): String representation of negative infinity
1061
- dateFormat (str): Date format string
1062
- timestampFormat (str): Timestamp format string
1063
- maxColumns (int): Maximum number of columns
1064
- maxCharsPerColumn (int): Maximum characters per column
1065
- maxMalformedLogPerPartition (int): Maximum malformed records to log per partition
1066
- mode (str): Parse mode ("PERMISSIVE", "DROPMALFORMED", "FAILFAST")
1067
- columnNameOfCorruptRecord (str): Column name for corrupt records
1068
- multiLine (bool): Parse multi-line records
1069
- charToEscapeQuoteEscaping (str): Character to escape quote escaping
1070
- samplingRatio (float): Sampling ratio for schema inference
1071
- enforceSchema (bool): Enforce specified or inferred schema
1072
- emptyValue (str): String representation of empty value
1073
- locale (str): Locale for parsing
1074
- lineSep (str): Line separator
1075
- pathGlobFilter (str): Path glob filter
1076
- recursiveFileLookup (bool): Recursive file lookup
1077
- modifiedBefore (str): Files modified before timestamp
1078
- modifiedAfter (str): Files modified after timestamp
1079
- unescapedQuoteHandling (str): How to handle unescaped quotes
1080
1081
Returns:
1082
DataFrame
1083
"""
1084
1085
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
1086
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
1087
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
1088
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
1089
timestampFormat=None, multiLine=None, allowUnquotedControlChars=None,
1090
lineSep=None, samplingRatio=None, dropFieldIfAllNull=None,
1091
encoding=None, locale=None, pathGlobFilter=None, recursiveFileLookup=None,
1092
modifiedBefore=None, modifiedAfter=None):
1093
"""
1094
Load JSON files and return the results as a DataFrame.
1095
1096
Parameters:
1097
- path (str): JSON file path
1098
- schema: Schema for the JSON data
1099
- primitivesAsString (bool): Infer primitive types as strings
1100
- prefersDecimal (bool): Prefer decimal type for numbers
1101
- allowComments (bool): Allow Java/C++ style comments
1102
- allowUnquotedFieldNames (bool): Allow unquoted field names
1103
- allowSingleQuotes (bool): Allow single quotes
1104
- allowNumericLeadingZero (bool): Allow leading zeros in numbers
1105
- allowBackslashEscapingAnyCharacter (bool): Allow backslash escaping
1106
- mode (str): Parse mode
1107
- columnNameOfCorruptRecord (str): Column name for corrupt records
1108
- dateFormat (str): Date format string
1109
- timestampFormat (str): Timestamp format string
1110
- multiLine (bool): Parse multi-line JSON records
1111
- allowUnquotedControlChars (bool): Allow unquoted control characters
1112
- lineSep (str): Line separator
1113
- samplingRatio (float): Sampling ratio for schema inference
1114
- dropFieldIfAllNull (bool): Drop fields with all null values
1115
- encoding (str): Character encoding
1116
- locale (str): Locale for parsing
1117
- pathGlobFilter (str): Path glob filter
1118
- recursiveFileLookup (bool): Recursive file lookup
1119
- modifiedBefore (str): Files modified before timestamp
1120
- modifiedAfter (str): Files modified after timestamp
1121
1122
Returns:
1123
DataFrame
1124
"""
1125
1126
def parquet(self, *paths, **options):
1127
"""
1128
Load Parquet files and return the results as a DataFrame.
1129
1130
Parameters:
1131
- paths: Parquet file paths
1132
- options: Additional options
1133
1134
Returns:
1135
DataFrame
1136
"""
1137
1138
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
1139
recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
1140
"""
1141
Load text files and return a DataFrame with a single string column.
1142
1143
Parameters:
1144
- paths: Text file paths
1145
- wholetext (bool): Read files as single record
1146
- lineSep (str): Line separator
1147
- pathGlobFilter (str): Path glob filter
1148
- recursiveFileLookup (bool): Recursive file lookup
1149
- modifiedBefore (str): Files modified before timestamp
1150
- modifiedAfter (str): Files modified after timestamp
1151
1152
Returns:
1153
DataFrame
1154
"""
1155
1156
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None,
1157
modifiedBefore=None, modifiedAfter=None):
1158
"""
1159
Load ORC files and return the results as a DataFrame.
1160
1161
Parameters:
1162
- path (str): ORC file path
1163
- mergeSchema (bool): Merge schemas from multiple files
1164
- pathGlobFilter (str): Path glob filter
1165
- recursiveFileLookup (bool): Recursive file lookup
1166
- modifiedBefore (str): Files modified before timestamp
1167
- modifiedAfter (str): Files modified after timestamp
1168
1169
Returns:
1170
DataFrame
1171
"""
1172
1173
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None,
1174
numPartitions=None, predicates=None, properties=None):
1175
"""
1176
Construct a DataFrame representing the database table.
1177
1178
Parameters:
1179
- url (str): JDBC database URL
1180
- table (str): Table name
1181
- column (str): Column name for partitioning
1182
- lowerBound: Lower bound for partitioning column
1183
- upperBound: Upper bound for partitioning column
1184
- numPartitions (int): Number of partitions
1185
- predicates (list): List of expressions for partitioning
1186
- properties (dict): JDBC connection properties
1187
1188
Returns:
1189
DataFrame
1190
"""
1191
```
1192
1193
### Structured Streaming
1194
1195
Components for real-time data processing with streaming DataFrames.
1196
1197
```python { .api }
1198
class DataStreamReader:
1199
"""
1200
Interface for reading streaming data from various sources.
1201
Accessed via SparkSession.readStream property.
1202
"""
1203
1204
def format(self, source):
1205
"""
1206
Specify data source format.
1207
1208
Parameters:
1209
- source (str): Source format ("kafka", "socket", "rate", etc.)
1210
1211
Returns:
1212
DataStreamReader with format specified
1213
"""
1214
1215
def option(self, key, value):
1216
"""
1217
Add input option for streaming source.
1218
1219
Parameters:
1220
- key (str): Option name
1221
- value: Option value
1222
1223
Returns:
1224
DataStreamReader with option set
1225
"""
1226
1227
def schema(self, schema):
1228
"""
1229
Specify input schema for streaming data.
1230
1231
Parameters:
1232
- schema (StructType): Schema definition
1233
1234
Returns:
1235
DataStreamReader with schema specified
1236
"""
1237
1238
def load(self, path=None):
1239
"""
1240
Load streaming DataFrame from configured source.
1241
1242
Parameters:
1243
- path (str): Optional source path
1244
1245
Returns:
1246
Streaming DataFrame
1247
"""
1248
1249
class DataStreamWriter:
1250
"""
1251
Interface for writing streaming data to various sinks.
1252
Accessed via DataFrame.writeStream property.
1253
"""
1254
1255
def format(self, source):
1256
"""
1257
Specify output sink format.
1258
1259
Parameters:
1260
- source (str): Sink format ("console", "memory", "kafka", etc.)
1261
1262
Returns:
1263
DataStreamWriter with format specified
1264
"""
1265
1266
def outputMode(self, outputMode):
1267
"""
1268
Specify output mode for streaming queries.
1269
1270
Parameters:
1271
- outputMode (str): "append", "complete", or "update"
1272
1273
Returns:
1274
DataStreamWriter with output mode set
1275
"""
1276
1277
def trigger(self, **kwargs):
1278
"""
1279
Set trigger for streaming query execution.
1280
1281
Parameters:
1282
- kwargs: Trigger options (processingTime, once, continuous)
1283
1284
Returns:
1285
DataStreamWriter with trigger configured
1286
"""
1287
1288
def start(self, path=None):
1289
"""
1290
Start the streaming query.
1291
1292
Parameters:
1293
- path (str): Optional output path
1294
1295
Returns:
1296
StreamingQuery representing the running query
1297
"""
1298
1299
class StreamingQuery:
1300
"""
1301
Handle to a streaming query execution.
1302
"""
1303
1304
@property
1305
def id(self):
1306
"""Unique identifier for this query."""
1307
1308
@property
1309
def isActive(self):
1310
"""Whether the query is currently active."""
1311
1312
def awaitTermination(self, timeout=None):
1313
"""
1314
Wait for query termination.
1315
1316
Parameters:
1317
- timeout (int): Maximum time to wait (seconds)
1318
1319
Returns:
1320
True if query terminated, False if timeout
1321
"""
1322
1323
def stop(self):
1324
"""Stop the streaming query."""
1325
1326
class StreamingQueryManager:
1327
"""
1328
Manager for streaming queries.
1329
Accessed via SparkSession.streams property.
1330
"""
1331
1332
@property
1333
def active(self):
1334
"""List of currently active streaming queries."""
1335
1336
def get(self, id):
1337
"""
1338
Get a streaming query by ID.
1339
1340
Parameters:
1341
- id (str): Query ID
1342
1343
Returns:
1344
StreamingQuery or None if not found
1345
"""
1346
1347
def awaitAnyTermination(self, timeout=None):
1348
"""
1349
Wait for any streaming query to terminate.
1350
1351
Parameters:
1352
- timeout (int): Maximum time to wait (seconds)
1353
1354
Returns:
1355
True if any query terminated, False if timeout
1356
"""
1357
```
1358
1359
### SQL Functions
1360
1361
Comprehensive built-in functions for data processing and transformation.
1362
1363
```python { .api }
1364
# Column creation and references
1365
def col(col_name):
1366
"""
1367
Return a Column based on the given column name.
1368
1369
Parameters:
1370
- col_name (str): Column name
1371
1372
Returns:
1373
Column
1374
"""
1375
1376
def column(col_name):
1377
"""
1378
Return a Column based on the given column name (alias for col).
1379
1380
Parameters:
1381
- col_name (str): Column name
1382
1383
Returns:
1384
Column
1385
"""
1386
1387
def lit(col_value):
1388
"""
1389
Create a Column of literal value.
1390
1391
Parameters:
1392
- col_value: Literal value
1393
1394
Returns:
1395
Column with literal value
1396
"""
1397
1398
def expr(str_expr):
1399
"""
1400
Parse SQL expression into a Column.
1401
1402
Parameters:
1403
- str_expr (str): SQL expression string
1404
1405
Returns:
1406
Column representing the expression
1407
"""
1408
1409
# Mathematical functions
1410
def abs(col):
1411
"""Return the absolute value of a column."""
1412
1413
def acos(col):
1414
"""Return the arc cosine of a column."""
1415
1416
def asin(col):
1417
"""Return the arc sine of a column."""
1418
1419
def atan(col):
1420
"""Return the arc tangent of a column."""
1421
1422
def cos(col):
1423
"""Return the cosine of a column."""
1424
1425
def sin(col):
1426
"""Return the sine of a column."""
1427
1428
def tan(col):
1429
"""Return the tangent of a column."""
1430
1431
def sqrt(col):
1432
"""Return the square root of a column."""
1433
1434
def exp(col):
1435
"""Return the exponential of a column."""
1436
1437
def log(arg1, arg2=None):
1438
"""Return the natural logarithm or logarithm with specified base."""
1439
1440
def pow(col1, col2):
1441
"""Return col1 raised to the power of col2."""
1442
1443
def round(col, scale=0):
1444
"""Round the given value to scale decimal places."""
1445
1446
def ceil(col):
1447
"""Return the ceiling of a column."""
1448
1449
def floor(col):
1450
"""Return the floor of a column."""
1451
1452
# String functions
1453
def upper(col):
1454
"""Convert a string column to uppercase."""
1455
1456
def lower(col):
1457
"""Convert a string column to lowercase."""
1458
1459
def length(col):
1460
"""Return the length of a string column."""
1461
1462
def trim(col):
1463
"""Trim spaces from both ends of a string column."""
1464
1465
def ltrim(col):
1466
"""Trim spaces from the left end of a string column."""
1467
1468
def rtrim(col):
1469
"""Trim spaces from the right end of a string column."""
1470
1471
def concat(*cols):
1472
"""Concatenate multiple string columns."""
1473
1474
def concat_ws(sep, *cols):
1475
"""Concatenate multiple string columns with separator."""
1476
1477
def substring(str, pos, len):
1478
"""Return substring of str from pos with specified length."""
1479
1480
def split(str, pattern, limit=-1):
1481
"""Split str around matches of the given pattern."""
1482
1483
def regexp_extract(str, pattern, idx):
1484
"""Extract a specific group matched by pattern from str."""
1485
1486
def regexp_replace(str, pattern, replacement):
1487
"""Replace all substrings that match pattern with replacement."""
1488
1489
# Date and time functions
1490
def current_date():
1491
"""Return the current date as a date column."""
1492
1493
def current_timestamp():
1494
"""Return the current timestamp as a timestamp column."""
1495
1496
def date_add(start, days):
1497
"""Return the date that is days days after start."""
1498
1499
def date_sub(start, days):
1500
"""Return the date that is days days before start."""
1501
1502
def datediff(end, start):
1503
"""Return the number of days from start to end."""
1504
1505
def year(col):
1506
"""Extract the year from a date/timestamp column."""
1507
1508
def month(col):
1509
"""Extract the month from a date/timestamp column."""
1510
1511
def dayofmonth(col):
1512
"""Extract the day of month from a date/timestamp column."""
1513
1514
def hour(col):
1515
"""Extract the hour from a timestamp column."""
1516
1517
def minute(col):
1518
"""Extract the minute from a timestamp column."""
1519
1520
def second(col):
1521
"""Extract the second from a timestamp column."""
1522
1523
def date_format(date, format):
1524
"""Convert a date/timestamp to a string with the given format."""
1525
1526
def to_date(col, format=None):
1527
"""Convert a string column to a date column."""
1528
1529
def to_timestamp(col, format=None):
1530
"""Convert a string column to a timestamp column."""
1531
1532
# Array functions
1533
def array(*cols):
1534
"""Create a new array column."""
1535
1536
def array_contains(col, value):
1537
"""Return true if the array contains the given value."""
1538
1539
def size(col):
1540
"""Return the size of an array or map column."""
1541
1542
def sort_array(col, asc=True):
1543
"""Sort the input array in ascending or descending order."""
1544
1545
def reverse(col):
1546
"""Return a reversed string or array."""
1547
1548
def slice(x, start, length):
1549
"""Return a slice of the array from start index with specified length."""
1550
1551
def array_join(col, delimiter, null_replacement=None):
1552
"""Concatenate array elements using delimiter."""
1553
1554
def explode(col):
1555
"""Return a new row for each element in the array column."""
1556
1557
def posexplode(col):
1558
"""Return a new row for each element with position in the array column."""
1559
1560
# Map functions
1561
def create_map(*cols):
1562
"""Create a map from key-value pairs."""
1563
1564
def map_keys(col):
1565
"""Return the keys of a map column as an array."""
1566
1567
def map_values(col):
1568
"""Return the values of a map column as an array."""
1569
1570
def map_from_arrays(col1, col2):
1571
"""Create a map from key and value arrays."""
1572
1573
# Aggregate functions
1574
def count(col):
1575
"""Return the number of items in a group (including null values)."""
1576
1577
def countDistinct(col, *cols):
1578
"""Return the number of distinct items in a group."""
1579
1580
def sum(col):
1581
"""Return the sum of values in a group."""
1582
1583
def avg(col):
1584
"""Return the average of values in a group."""
1585
1586
def mean(col):
1587
"""Return the average of values in a group (alias for avg)."""
1588
1589
def max(col):
1590
"""Return the maximum value in a group."""
1591
1592
def min(col):
1593
"""Return the minimum value in a group."""
1594
1595
def first(col, ignorenulls=False):
1596
"""Return the first value in a group."""
1597
1598
def last(col, ignorenulls=False):
1599
"""Return the last value in a group."""
1600
1601
def stddev(col):
1602
"""Return the sample standard deviation of values in a group."""
1603
1604
def stddev_pop(col):
1605
"""Return the population standard deviation of values in a group."""
1606
1607
def variance(col):
1608
"""Return the sample variance of values in a group."""
1609
1610
def var_pop(col):
1611
"""Return the population variance of values in a group."""
1612
1613
def collect_list(col):
1614
"""Return a list of objects with duplicates."""
1615
1616
def collect_set(col):
1617
"""Return a set of objects with duplicate elements eliminated."""
1618
1619
# Window functions
1620
def row_number():
1621
"""Return a sequential number starting at 1 within a window partition."""
1622
1623
def rank():
1624
"""Return the rank of rows within a window partition."""
1625
1626
def dense_rank():
1627
"""Return the dense rank of rows within a window partition."""
1628
1629
def percent_rank():
1630
"""Return the relative rank of rows within a window partition."""
1631
1632
def cume_dist():
1633
"""Return the cumulative distribution of values within a window partition."""
1634
1635
def lag(col, offset=1, default=None):
1636
"""Return the value that is offset rows before the current row."""
1637
1638
def lead(col, offset=1, default=None):
1639
"""Return the value that is offset rows after the current row."""
1640
1641
def ntile(n):
1642
"""Return the ntile group id (1-indexed) within a window partition."""
1643
1644
# Conditional functions
1645
def when(condition, value):
1646
"""Evaluate a list of conditions and return one of multiple possible result expressions."""
1647
1648
def coalesce(*cols):
1649
"""Return the first non-null value among the given columns."""
1650
1651
def greatest(*cols):
1652
"""Return the greatest value among the given columns."""
1653
1654
def least(*cols):
1655
"""Return the least value among the given columns."""
1656
1657
def isnull(col):
1658
"""Return true if the column is null."""
1659
1660
def isnan(col):
1661
"""Return true if the column is NaN."""
1662
1663
# JSON functions
1664
def from_json(col, schema, options=None):
1665
"""Parse a column containing a JSON string."""
1666
1667
def to_json(col, options=None):
1668
"""Convert a column containing a struct to a JSON string."""
1669
1670
def get_json_object(col, path):
1671
"""Extract a JSON object from a JSON string."""
1672
1673
def json_tuple(col, *fields):
1674
"""Return a tuple of JSON object based on the given fields."""
1675
1676
# Type conversion functions
1677
def cast(col, dataType):
1678
"""Convert the column to a different data type."""
1679
1680
# Null handling functions
1681
def isNull(col):
1682
"""Return true if the column is null."""
1683
1684
def isNotNull(col):
1685
"""Return true if the column is not null."""
1686
1687
def dropna(how='any', thresh=None, subset=None):
1688
"""Return a new DataFrame omitting rows with null values."""
1689
1690
def fillna(value, subset=None):
1691
"""Replace null values with specified value."""
1692
1693
def replace(to_replace, value=None, subset=None):
1694
"""Replace values matching keys in to_replace with corresponding values."""
1695
```
1696
1697
## Types
1698
1699
```python { .api }
1700
class Row:
1701
"""A row of data in a DataFrame."""
1702
1703
def __init__(self, **kwargs):
1704
"""Create a row with named arguments."""
1705
1706
def asDict(self, recursive=False):
1707
"""
1708
Return as a dict.
1709
1710
Parameters:
1711
- recursive (bool): Turn nested rows to dict recursively
1712
1713
Returns:
1714
Dict representation of the row
1715
"""
1716
1717
class GroupedData:
1718
"""A set of methods for aggregations on a DataFrame."""
1719
1720
def agg(self, *exprs):
1721
"""Compute aggregates and return the result as a DataFrame."""
1722
1723
def count(self):
1724
"""Count the number of rows for each group."""
1725
1726
def mean(self, *cols):
1727
"""Compute the average value for each numeric column for each group."""
1728
1729
def avg(self, *cols):
1730
"""Compute the average value for each numeric column for each group."""
1731
1732
def max(self, *cols):
1733
"""Compute the max value for each numeric column for each group."""
1734
1735
def min(self, *cols):
1736
"""Compute the min value for each numeric column for each group."""
1737
1738
def sum(self, *cols):
1739
"""Compute the sum for each numeric column for each group."""
1740
1741
from pyspark.sql.types import *
1742
1743
class DataType:
1744
"""Base class for data types."""
1745
1746
class NullType(DataType):
1747
"""Null data type."""
1748
1749
class StringType(DataType):
1750
"""String data type."""
1751
1752
class BinaryType(DataType):
1753
"""Binary data type."""
1754
1755
class BooleanType(DataType):
1756
"""Boolean data type."""
1757
1758
class DateType(DataType):
1759
"""Date data type."""
1760
1761
class TimestampType(DataType):
1762
"""Timestamp data type."""
1763
1764
class DecimalType(DataType):
1765
"""Decimal data type with precision and scale."""
1766
1767
def __init__(self, precision=10, scale=0): ...
1768
1769
class DoubleType(DataType):
1770
"""Double precision floating point data type."""
1771
1772
class FloatType(DataType):
1773
"""Single precision floating point data type."""
1774
1775
class ByteType(DataType):
1776
"""Byte integer data type."""
1777
1778
class IntegerType(DataType):
1779
"""32-bit integer data type."""
1780
1781
class LongType(DataType):
1782
"""64-bit integer data type."""
1783
1784
class ShortType(DataType):
1785
"""16-bit integer data type."""
1786
1787
class ArrayType(DataType):
1788
"""Array data type."""
1789
1790
def __init__(self, elementType, containsNull=True): ...
1791
1792
class MapType(DataType):
1793
"""Map data type."""
1794
1795
def __init__(self, keyType, valueType, valueContainsNull=True): ...
1796
1797
class StructField:
1798
"""A field in StructType."""
1799
1800
def __init__(self, name, dataType, nullable=True, metadata=None): ...
1801
1802
class StructType(DataType):
1803
"""Struct data type representing a row."""
1804
1805
def __init__(self, fields=None): ...
1806
1807
def add(self, field, data_type=None, nullable=True, metadata=None): ...
1808
```