Skip to content

#6 PySpark SQL Tutorial

In this tutorial you will learn how to make business insights with SQL statement in PySpark

Preparation

1. Create a SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tutorial').getOrCreate()

2. Load and show dataset

# read dataset and show the header
df = spark.read.option('header','true').csv('https://raw.githubusercontent.com/ivywang-ds/PySpark-from-zero-to-hero/main/Online_Retail.csv')
# check schema
df.printSchema()

# show data
df.show()

3. Add a new column names ‘Sales’

# create a new column names Sales. Sales = Quantity * UnitPrice

from pyspark.sql.functions import col, round
df = df.withColumn('Sales',col('Quantity') * col('UnitPrice'))
df = df.withColumn('Sales', round(col('Sales'),2))
df.show()

4. Register the dataset into SQL

To execute SQL queries on the DataFrame in Spark, you need to register the dataset as a temporary view in Spark’s SQL context.

# register the dataset into sql 
df.createOrReplaceTempView("df")

Use SQL statement to find insights

Question 1: How many unique orders are there in the dataset?

# how many unique orders are there in the dataset?
spark.sql('''
     SELECT COUNT(DISTINCT InvoiceNo) as counts_of_distinct_order
    FROM df
    ''').show()

Question 2: Find out the start date and end date of the dataset

# find the min and max of InvoiceDate
df.describe('InvoiceDate').show()

Question 3: What is the total sales in the dataset?

# how many unique orders are there in the dataset?
spark.sql('''
    SELECT SUM(Sales) AS total_sales
    FROM df
          ''').show()

Question 4: Who are the top 10 best customers?

Hint: who spent the most money?

# The top 10 customers who spend the most.
spark.sql('''
          SELECT CustomerID, SUM(Sales) as sales
          FROM df
          GROUP BY CustomerID
          ORDER BY Sales DESC
          LIMIT 11
          ''').show()

Question 5: How many unique customers are there in each country?

spark.sql('''
          SELECT Country, COUNT(DISTINCT CustomerID) AS number_of_customer
          FROM df
          GROUP BY Country
          ORDER BY number_of_customer DESC
          ''').show()

Question 6: How many unique purchases made by each customer in Germany?

# how many purchases made by each customer in Germany?

spark.sql('''
          SELECT CustomerId, COUNT(DISTINCT InvoiceNo) AS number_of_purchase
          FROM df
          WHERE Country = 'Germany'
          GROUP BY CustomerId
          ORDER BY number_of_purchase DESC
          ''').show()

Question 7: What is the purchase frequency in Germany?

tmp = spark.sql('''
          SELECT CustomerId, COUNT(DISTINCT InvoiceNo) AS number_of_purchase
          FROM df
          WHERE Country = 'Germany'
          GROUP BY CustomerId
          ORDER BY number_of_purchase DESC
          ''')
#tmp.show()

tmp.groupby('number_of_purchase').count().orderBy('number_of_purchase').show()

Question 8: How about the sales per customer in each country?

# how many purchases made by each customer in Germany?

spark.sql('''
          SELECT Country, Sum(Sales) AS total_revenue, Count(DISTINCT CustomerID) AS no_of_customers,total_revenue/no_of_customers  AS sales_per_customer
          FROM df
          GROUP BY Country
          ORDER BY sales_per_customer DESC
          ''').show()

Question 9: What is the best-selling product in each country?

spark.sql('''SELECT Country, StockCode, Description, counts
FROM (
    SELECT Country, StockCode, Description, counts,
           ROW_NUMBER() OVER (PARTITION BY Country ORDER BY counts DESC) AS row_num
    FROM (
        SELECT Country, StockCode, Description, COUNT(StockCode) AS counts
        FROM df
        GROUP BY Country, StockCode, Description
    ) temp
) temp2
WHERE row_num = 1
          ''').show()