Skip to content

#5 PySpark DataFrame Tutorial

In this tutorial you will learn

1. Create a SparkSession

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tutorial').getOrCreate()

2. Load and show dataset

# read dataset and show the header
df = spark.read.option('header','true').csv('raw.githubusercontent.com/ivywang-ds/PySpark-from-zero-to-hero/main/bike_rental.csv')
# check schema
df.printSchema()

# show data
df.show()

3. Check duplicates values

# check the all columns name
df.columns

# check the duplicated record in the whole dataset
df_dup = df.groupBy('rental_id',
 'duration',
 'duration_ms',
 'bike_id',
 'bike_model',
 'end_date',
 'end_station_id',
 'end_station_name',
 'start_date',
 'start_station_id',
 'start_station_name',
 'end_station_logical_terminal',
 'start_station_logical_terminal',
 'end_station_priority_id').count().filter('count>1')

df_dup.collect()

# you should see this:an empty list
[]

4. Check missing values

from pyspark.sql.functions import col, sum

# Check for missing values in all columns
missing_values_count = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Display the results
missing_values_count.collect()

# you should see this: it shows the number of missing value in each column

Row(rental_id=0, duration=0, duration_ms=0, bike_id=0, bike_model=13432, end_date=0, end_station_id=502, end_station_name=0, start_date=0, start_station_id=0, start_station_name=0, end_station_logical_terminal=47368, start_station_logical_terminal=47368, end_station_priority_id=47368)]

5. Extract date and time from datetime column

# parse datetime and extract year, month, day of week, hour of day
from pyspark.sql.functions import year, month, dayofweek, hour

# Extract year
df = df.withColumn("year", year("start_date"))

# Extract month
df = df.withColumn("month", month("start_date"))

# Extract day of the week (1 = Sunday, 2 = Monday, ..., 7 = Saturday)
df = df.withColumn("day_of_week", dayofweek("start_date"))

# Extract hour of the day
df = df.withColumn("hour_of_day", hour("start_date"))

# show the related columns with top 10 rows
df.select('year','month','day_of_week','hour_of_day').show(10)

# you should see this
+----+-----+-----------+-----------+
|year|month|day_of_week|hour_of_day|
+----+-----+-----------+-----------+
|2022|   10|          5|         22|
|2022|    9|          3|         21|
|2022|   12|          5|         10|
|2022|   11|          7|         20|
|2023|    1|          7|         18|
|2022|   12|          7|         17|
|2022|   10|          1|         12|
|2022|    7|          7|         17|
|2022|   10|          3|         21|
|2022|   11|          6|         12|
+----+-----+-----------+-----------+
only showing top 10 rows

5. Create a new Column

# add a new column called 'duration_min'

from pyspark.sql.functions import col, round

df = df.withColumn("duration_min",df["duration"]/60)
df = df.withColumn("duration_min", round(col("duration_min"),2))

# check the distribution of duration_min
df.describe(['duration_min']).show()

# you should see this
+-------+------------------+
|summary|      duration_min|
+-------+------------------+
|  count|             47368|
|   mean| 36.81580645161297|
| stddev|228.91379025049127|
|    min|              0.05|
|    max|          24055.77|
+-------+------------------+

5. Group, Aggregate, and Filter data

# group by single column
df.groupby('year').count().show()

+----+-----+
|year|count|
+----+-----+
|2023| 3415|
|2022|43953|
+----+-----+

# group by multiple columns
df.groupby(['year','month']).count().show()
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2022|   10|12038|
|2022|    2|  779|
|2022|    7| 2159|
|2022|   11| 8424|
|2022|    3| 1425|
|2022|    1|  796|
|2022|    5| 1859|
|2022|    6| 2270|
|2022|    9| 4608|
|2022|    4| 1670|
|2022|   12| 5845|
|2023|    1| 3415|
|2022|    8| 2080|
+----+-----+-----+

# with filter 
df.filter(df['year']== '2022').groupby(['year','month']).count().orderBy('month').show()
+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2022|    1|  796|
|2022|    2|  779|
|2022|    3| 1425|
|2022|    4| 1670|
|2022|    5| 1859|
|2022|    6| 2270|
|2022|    7| 2159|
|2022|    8| 2080|
|2022|    9| 4608|
|2022|   10|12038|
|2022|   11| 8424|
|2022|   12| 5845|
+----+-----+-----+

6. Make business insights

Question1 : which time of day that people prefer renting a bike?

df.filter(df['year']== '2022').groupby(['hour_of_day']).count().orderBy('hour_of_day').show()

# you should see this
+-----------+-----+
|hour_of_day|count|
+-----------+-----+
|          0| 1015|
|          1|  687|
|          2|  478|
|          3|  310|
|          4|  190|
|          5|  159|
|          6|  153|
|          7|  385|
|          8| 1255|
|          9| 2467|
|         10| 2839|
|         11| 2019|
|         12| 2002|
|         13| 2445|
|         14| 2599|
|         15| 2741|
|         16| 2771|
|         17| 3008|
|         18| 3618|
|         19| 4133|
+-----------+-----+
only showing top 20 rows

The solution is correct but the result is not clear enough. We can create a new column and assign time_of_day such as ‘morning’, ‘afternoon’, ‘evening’ and ‘night’.

from pyspark.sql.functions import when

df = df.withColumn(
    "time_of_day",
    when((col("hour_of_day") >= 6) & (col("hour_of_day") < 12), "morning")
    .when((col("hour_of_day") >= 12) & (col("hour_of_day") < 18), "afternoon")
    .when((col("hour_of_day") >= 18) & (col("hour_of_day") < 24), "evening")
    .otherwise("night")
)

df.filter(df['year']== '2022').groupby(['time_of_day']).count().orderBy('time_of_day').show(
business insight result 1
business insight result 1

From the result we can see, most of people rent bikes in the afternoon and evening, only a few people rent bikes in the night.

This observation prompts the consideration that focusing on bike maintenance during the early morning and late evening, despite potential associated labor costs, could be a strategic choice to uphold the overall quality of the bikes in our fleet.

Question 2: What about the bike usage frequency? Have they been used once or multiple times? Should we invest in more bikes?

data = df.filter(df['year'] == '2022') \
              .groupBy('bike_id') \
              .count() \
              .orderBy('count', ascending=False) \
              .withColumnRenamed('count', 'Bike_rent') \
              .groupBy('Bike_rent').count().orderBy('Bike_rent').show()
business insight result 2
business insight result 2

From the result we can see, most of bikes have been used only once or twice. Only one bike has been used once.

We should encourage people to rent the current bikes instead of investing in more bikes.

7. End session

# close the session and clean up
spark.stop()