A quick reference guide to the most commonly used patterns and functions in PySpark SQL

Overview

Using PySpark we can process data from Hadoop HDFS, AWS S3, and many file systems. PySpark also is used to process real-time data using Streaming and Kafka.

All Spark examples provided in this PySpark (Spark with Python) tutorial is basic, simple, and easy to practice for beginners who are enthusiastic to learn PySpark and advance their career in BigData and Machine Learning.

Features of PySpark

    In-memory computation
    Distributed processing using parallelize
    Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
    Fault-tolerant
    Immutable
    Lazy evaluation
    Cache & persistence
    Inbuild-optimization when using DataFrames
    Supports ANSI SQL

PySpark Quick Reference

A quick reference guide to the most commonly used patterns and functions in PySpark SQL

Read CSV file into DataFrame with schema and delimited as comma


df = spark.read.option(header='True', inferSchema='True',delimiter=',').csv("/tmp/resources/sales.csv")

Easily reference these as F.func() and T.type()

from pyspark.sql import functions as F, types as T

Common Operation

### Filter on equals condition

df = df.filter(df.is_adult == 'Y')

### Filter on >, <, >=, <= condition

df = df.filter(df.age > 25)

### Sort results

df = df.orderBy(df.age.asc()))  
df = df.orderBy(df.age.desc()))


### Multiple conditions require parentheses around each condition

df = df.filter((df.age > 25) & (df.is_adult == 'Y'))


### Compare against a list of allowed values

df = df.filter(col('age').isin([3, 4, 7]))

Joins

### Left join in another dataset

df = df.join(person_lookup_table, 'person_id', 'left')

### Match on different columns in left & right datasets
df = df.join(other_table, df.id == other_table.person_id, 'left')

### Match on multiple columns
df = df.join(other_table, ['first_name', 'last_name'], 'left')

### Useful for one-liner lookup code joins if you have a bunch
def lookup_and_replace(df1, df2, df1_key, df2_key, df2_value):
    return (
        df1
        .join(df2[[df2_key, df2_value]], df1[df1_key] == df2[df2_key], 'left')
        .withColumn(df1_key, F.coalesce(F.col(df2_value), F.col(df1_key)))
        .drop(df2_key)
        .drop(df2_value)
    )

df = lookup_and_replace(people, pay_codes, id, pay_code_id, pay_code_desc)

Column Operations

# Add a new static column
df = df.withColumn('status', F.lit('PASS'))

# Construct a new dynamic column
df = df.withColumn('full_name', F.when(
    (df.fname.isNotNull() & df.lname.isNotNull()), F.concat(df.fname, df.lname)
).otherwise(F.lit('N/A'))

# Pick which columns to keep, optionally rename some
df = df.select(
    'name',
    'age',
    F.col('dob').alias('date_of_birth'),
)

Casting & Coalescing Null Values & Duplicates

# Cast a column to a different type
df = df.withColumn('price', df.price.cast(T.DoubleType()))

# Replace all nulls with a specific value
df = df.fillna({
    'first_name': 'Sundar',
    'age': 18,
})

# Take the first value that is not null
df = df.withColumn('last_name', F.coalesce(df.last_name, df.surname, F.lit('N/A')))

# Drop duplicate rows in a dataset (distinct)
df = df.dropDuplicates()

# Drop duplicate rows, but consider only specific columns
df = df.dropDuplicates(['name', 'height'])

# Replace empty strings with null (leave out subset keyword arg to replace in all columns)
df = df.replace({"": None}, subset=["name"])

# Convert Python/PySpark/NumPy NaN operator to null
df = df.replace(float("nan"), None)


# Remove columns
df = df.drop('mod_dt', 'mod_username')

# Rename a column
df = df.withColumnRenamed('dob', 'date_of_birth')

# Keep all the columns which also occur in another dataset
df = df.select(*(F.col(c) for c in df2.columns))

# Batch Rename/Clean Columns
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))

String Operations

String Filters

# Contains - col.contains(string)
df = df.filter(df.name.contains('o'))

# Starts With - col.startswith(string)
df = df.filter(df.name.startswith('Al'))

# Ends With - col.endswith(string)
df = df.filter(df.name.endswith('ice'))

# Is Null - col.isNull()
df = df.filter(df.is_adult.isNull())

# Is Not Null - col.isNotNull()
df = df.filter(df.first_name.isNotNull())

# Like - col.like(string_with_sql_wildcards)
df = df.filter(df.name.like('Al%'))

# Regex Like - col.rlike(regex)
df = df.filter(df.name.rlike('[A-Z]*ice$'))

# Is In List - col.isin(*cols)
df = df.filter(df.name.isin('Bob', 'Mike'))

String Functions

# Substring - col.substr(startPos, length)
df = df.withColumn('short_id', df.id.substr(0, 10))

# Trim - F.trim(col)
df = df.withColumn('name', F.trim(df.name))

# Left Pad - F.lpad(col, len, pad)
# Right Pad - F.rpad(col, len, pad)
df = df.withColumn('id', F.lpad('id', 4, '0'))

# Left Trim - F.ltrim(col)
# Right Trim - F.rtrim(col)
df = df.withColumn('id', F.ltrim('id'))

# Concatenate - F.concat(*cols)
df = df.withColumn('full_name', F.concat('fname', F.lit(' '), 'lname'))

# Concatenate with Separator/Delimiter - F.concat_ws(delimiter, *cols)
df = df.withColumn('full_name', F.concat_ws('-', 'fname', 'lname'))

# Regex Replace - F.regexp_replace(str, pattern, replacement)[source]
df = df.withColumn('id', F.regexp_replace(id, '0F1(.*)', '1F1-$1'))

# Regex Extract - F.regexp_extract(str, pattern, idx)
df = df.withColumn('id', F.regexp_extract(id, '[0-9]*', 0))

Number Operations

# Round - F.round(col, scale=0)
df = df.withColumn('price', F.round('price', 0))

# Floor - F.floor(col)
df = df.withColumn('price', F.floor('price'))

# Ceiling - F.ceil(col)
df = df.withColumn('price', F.ceil('price'))

# Absolute Value - F.abs(col)
df = df.withColumn('price', F.abs('price'))

# X raised to power Y – F.pow(x, y)
df = df.withColumn('exponential_growth', F.pow('x', 'y'))

# Select smallest value out of multiple columns – F.least(*cols)
df = df.withColumn('least', F.least('subtotal', 'total'))

# Select largest value out of multiple columns – F.greatest(*cols)
df = df.withColumn('greatest', F.greatest('subtotal', 'total'))

Date & Timestamp Operations

# Convert a string of known format to a date (excludes time information)
df = df.withColumn('date_of_birth', F.to_date('date_of_birth', 'yyyy-MM-dd'))

# Convert a string of known format to a timestamp (includes time information)
df = df.withColumn('time_of_birth', F.to_timestamp('time_of_birth', 'yyyy-MM-dd HH:mm:ss'))

# Get year from date:       F.year(col)
# Get month from date:      F.month(col)
# Get day from date:        F.dayofmonth(col)
# Get hour from date:       F.hour(col)
# Get minute from date:     F.minute(col)
# Get second from date:     F.second(col)
df = df.filter(F.year('date_of_birth') == F.lit('2017'))

# Add & subtract days
df = df.withColumn('three_days_after', F.date_add('date_of_birth', 3))
df = df.withColumn('three_days_before', F.date_sub('date_of_birth', 3))

# Add & Subtract months
df = df.withColumn('next_month', F.add_month('date_of_birth', 1))

# Get number of days between two dates
df = df.withColumn('days_between', F.datediff('start', 'end'))

# Get number of months between two dates
df = df.withColumn('months_between', F.months_between('start', 'end'))

# Keep only rows where date_of_birth is between 2017-05-10 and 2018-07-21
df = df.filter(
    (F.col('date_of_birth') >= F.lit('2017-05-10')) &
    (F.col('date_of_birth') <= F.lit('2018-07-21'))
)

Array Operations

# Column Array - F.array(*cols)
df = df.withColumn('full_name', F.array('fname', 'lname'))

# Empty Array - F.array(*cols)
df = df.withColumn('empty_array_column', F.array([]))

# Array Size/Length – F.size(col)
df = df.withColumn('array_length', F.size(F.col('my_array')))

Aggregation Operations

# Row Count:                F.count()
# Sum of Rows in Group:     F.sum(*cols)
# Mean of Rows in Group:    F.mean(*cols)
# Max of Rows in Group:     F.max(*cols)
# Min of Rows in Group:     F.min(*cols)
# First Row in Group:       F.alias(*cols)
df = df.groupBy('gender').agg(F.max('age').alias('max_age_by_gender'))

# Collect a Set of all Rows in Group:       F.collect_set(col)
# Collect a List of all Rows in Group:      F.collect_list(col)
df = df.groupBy('age').agg(F.collect_set('name').alias('person_names'))

# Just take the lastest row for each combination (Window Functions)
from pyspark.sql import Window as W

window = W.partitionBy("first_name", "last_name").orderBy(F.desc("date"))
df = df.withColumn("row_number", F.row_number().over(window))
df = df.filter(F.col("row_number") == 1)
df = df.drop("row_number")

Advanced Operations

Repartitioning

# Repartition – df.repartition(num_output_partitions)
df = df.repartition(1)

UDFs (User Defined Functions)

# Multiply each row's age column by two
times_two_udf = F.udf(lambda x: x * 2)
df = df.withColumn('age', times_two_udf(df.age))

# Randomly choose a value to use as a row's name
import random

random_name_udf = F.udf(lambda: random.choice(['Bob', 'Tom', 'Amy', 'Jenna']))
df = df.withColumn('name', random_name_udf())

Window Functions

Window Functions Usage & Syntax PySpark Window Functions description
row_number(): Column Returns a sequential number starting from 1 within a window partition
rank(): Column Returns the rank of rows within a window partition, with gaps.
percent_rank(): Column Returns the percentile rank of rows within a window partition.
dense_rank(): Column Returns the rank of rows within a window partition without any gaps. Where as Rank() returns rank with gaps.
ntile(n: Int): Column Returns the ntile id in a window partition
cume_dist(): Column Returns the cumulative distribution of values within a window partition
lag(e: Column, offset: Int): Column
lag(columnName: String, offset: Int): Column
lag(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows before the current row, and `null` if there is less than `offset` rows before the current row.
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int): Column
lead(columnName: String, offset: Int, defaultValue: Any): Column
returns the value that is `offset` rows after the current row, and `null` if there is less than `offset` rows after the current row.

row_number Window Function


from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)
Owner
Sundar Ramamurthy
Sundar Ramamurthy
To-Be is a machine learning challenge on CodaLab Platform about Mortality Prediction

To-Be is a machine learning challenge on CodaLab Platform about Mortality Prediction. The challenge aims to adress the problems of medical imbalanced data classification.

Marwan Mashra 1 Jan 31, 2022
虚拟货币(BTC、ETH)炒币量化系统项目。在一版本的基础上加入了趋势判断

🎉 第二版本 🎉 (现货趋势网格) 介绍 在第一版本的基础上 趋势判断,不在固定点位开单,选择更优的开仓点位 优势: 🎉 简单易上手 安全(不用将api_secret告诉他人) 如何启动 修改app目录下的authorization文件

幸福村的码农 250 Jan 07, 2023
Hypernets: A General Automated Machine Learning framework to simplify the development of End-to-end AutoML toolkits in specific domains.

A General Automated Machine Learning framework to simplify the development of End-to-end AutoML toolkits in specific domains.

DataCanvas 216 Dec 23, 2022
A Python Module That Uses ANN To Predict A Stocks Price And Also Provides Accurate Technical Analysis With Many High Potential Implementations!

Stox A Module to predict the "close price" for the next day and give "technical analysis". It uses a Neural Network and the LSTM algorithm to predict

Stox 31 Dec 16, 2022
A comprehensive set of fairness metrics for datasets and machine learning models, explanations for these metrics, and algorithms to mitigate bias in datasets and models.

AI Fairness 360 (AIF360) The AI Fairness 360 toolkit is an extensible open-source library containg techniques developed by the research community to h

1.9k Jan 06, 2023
A model to predict steering torque fully end-to-end

torque_model The torque model is a spiritual successor to op-smart-torque, which was a project to train a neural network to control a car's steering f

Shane Smiskol 4 Jun 03, 2022
Xeasy-ml is a packaged machine learning framework.

xeasy-ml 1. What is xeasy-ml Xeasy-ml is a packaged machine learning framework. It allows a beginner to quickly build a machine learning model and use

9 Mar 14, 2022
Machine learning that just works, for effortless production applications

Machine learning that just works, for effortless production applications

Elisha Yadgaran 16 Sep 02, 2022
easyNeuron is a simple way to create powerful machine learning models, analyze data and research cutting-edge AI.

easyNeuron is a simple way to create powerful machine learning models, analyze data and research cutting-edge AI.

Neuron AI 5 Jun 18, 2022
Implementation of linesearch Optimization Algorithms in Python

Nonlinear Optimization Algorithms During my time as Scientific Assistant at the Karlsruhe Institute of Technology (Germany) I implemented various Opti

Paul 3 Dec 06, 2022
Massively parallel self-organizing maps: accelerate training on multicore CPUs, GPUs, and clusters

Somoclu Somoclu is a massively parallel implementation of self-organizing maps. It exploits multicore CPUs, it is able to rely on MPI for distributing

Peter Wittek 239 Nov 10, 2022
Microsoft contributing libraries, tools, recipes, sample codes and workshop contents for machine learning & deep learning.

Microsoft contributing libraries, tools, recipes, sample codes and workshop contents for machine learning & deep learning.

Microsoft 366 Jan 03, 2023
Library for machine learning stacking generalization.

stacked_generalization Implemented machine learning *stacking technic[1]* as handy library in Python. Feature weighted linear stacking is also availab

114 Jul 19, 2022
Pydantic based mock data generation

This library offers powerful mock data generation capabilities for pydantic based models. It can also be used with other libraries that use pydantic as a foundation, for example SQLModel, Beanie and

Na'aman Hirschfeld 396 Dec 28, 2022
ml4ir: Machine Learning for Information Retrieval

ml4ir: Machine Learning for Information Retrieval | changelog Quickstart → ml4ir Read the Docs | ml4ir pypi | python ReadMe ml4ir is an open source li

Salesforce 77 Jan 06, 2023
使用数学和计算机知识投机倒把

偷鸡不成项目集锦 坦率地讲,涉及金融市场的好策略如果公开,必然导致使用的人多,最后策略变差。所以这个仓库只收集我目前失败了的案例。 加密货币组合套利 中国体育彩票预测 我赚不上钱的项目,也许可以帮助更有能力的人去赚钱。

Roy 28 Dec 29, 2022
Tutorial for Decision Threshold In Machine Learning.

Decision-Threshold-ML Tutorial for improve skills: 'Decision Threshold In Machine Learning' (from GeeksforGeeks) by Marcus Mariano For more informatio

0 Jan 20, 2022
Gaussian Process Optimization using GPy

End of maintenance for GPyOpt Dear GPyOpt community! We would like to acknowledge the obvious. The core team of GPyOpt has moved on, and over the past

Sheffield Machine Learning Software 847 Dec 19, 2022
Distributed Deep learning with Keras & Spark

Elephas: Distributed Deep Learning with Keras & Spark Elephas is an extension of Keras, which allows you to run distributed deep learning models at sc

Max Pumperla 1.6k Dec 29, 2022
scikit-fem is a lightweight Python 3.7+ library for performing finite element assembly.

scikit-fem is a lightweight Python 3.7+ library for performing finite element assembly. Its main purpose is the transformation of bilinear forms into sparse matrices and linear forms into vectors.

Tom Gustafsson 297 Dec 13, 2022