If you have some basic knowledge in data analysis with Python Pandas and are curious about PySpark and don't know where to start, tag along.
Python Pandas encouraged us to leave excel tables behind and to look at data from a coder perspective instead. Data sets became bigger and bigger, turned from data bases to data files and into data lakes. Some smart minds from Apache blessed us with the Scala based framework Spark to process the bigger amounts in a reasonable time. Since Python is the go to language for data science nowadays, there was a Python API available soon that’s called PySpark.
For a while now I am trying to conquer this Spark interface with its non-pythonic syntax that everybody in the big data world praises. It took me a few attempts and it’s still work in progress. However in this post I want to show you, who is also starting learning PySpark, how to replicate the same analysis you would otherwise do with Pandas.
The data analysis example we are going to look at you can find in the book "Python for Data Analysis" by Wes McKinney. In that analysis, the aim is to find out the top ranked movies from the MovieLens 1M data set, which is acquired and maintained by the GroupLens Research project from the University of Minnesota.
As a coding framework I used Kaggle, since it comes with the convenience of notebooks that have the basic data science modules installed and are ready to go with two clicks.
The complete analysis and the Pyspark code you can also find in this Kaggle notebook and the Pandas code in this one. We won’t replicate the same analysis here, but instead focus on the syntax differences when handling Pandas and Pyspark dataframes. I will always show the Pandas code first following with the PySpark equivalent.
The basic functions that we need for this analysis are:
Pandas has different read functions, which make it easy to import data depending on the file type the data is stored in.
# Pandas
pd.read_table(path_to_file, sep='::', header=None, names=column_names, engine='python')
With PySpark we first need to create a Spark Context as the entry point to Sparks functionality. Not that the indentation might not match when you copy-paste this into your code.
# PySpark
from pyspark.sql import SparkSession
spark = SparkSession.Builder().getOrCreate() #--> Spark Context
spark.read #--> what do you want to do
.format("csv") #--> which format is the file
.option("delimiter", "::") #--> specify delimiter that's used
.option("inferSchema", "true")#--> default value type is string
.load(path_to_file) #--> file path
The default function for combining datasets in Pandas is merge(), which combines datasets on one or multiple columns. Even Pandas join() function uses merge with an index column under the hood. Ratings, users and movies are in this code snippet Pandas datframes that share respective columns, so we can just call the merge function. In the case of name mismatch, we would need to use the join() instead.
# Pandas
data = pd.merge(pd.merge(ratings, users), movies)
In PySpark there is no merge function, the default here is join() on a selection of columns. Otherwise it looks fairly similar.
# PySpark
data = ratings.join(users, ["user_id"]).join(movies, ["movie_id"])
Now that we have our data in the form that we can work with we can take a look how to actually extract the content.
Pandas dataframes derive from numpy arrays and thereby elements can be accessed by using square backets. If we want to see the first 5 elements of a table we would do it in this way:
# Pandas
users[:5]
In Spark dataframes are objects and therefore we need to use functions. In the case if showing the first 5 rows we can use show() which derives from the underlying SQL syntax used by Spark dataframes. The function show is just a way to represent the dataset, if you would like to create a new dataframe from a selection of entries, you would need to use take().
# PySpark
users.show(5) # --> print result
users.take(5) # --> list of Row() objects
In pandas, we can filter in different ways. One of the functions we can use is loc which is a label based filter to access rows or columns. In the data analysis example, we want to filter all rows in a dataframe that occur in a list of titles.
# Pandas
mean_ratings = mean_ratings.loc[active_titles]
PySpark dataframes don’t support loc, so instead we need to use the filter function. An easy way to handle columns in PySpark dataframes is the col() function. Calling that one with the column name, will return the respective column from the dataframe.
# PySpark
from pyspark.sql.functions import col
mean_ratings = mean_ratings.filter(col('title').isin(active_titles))
Grouping looks very similar in Pandas and PySpark with groupby(). Pandas dataframes have a lot of aggregating functions implemented for their group-by objects, like median, mean, sum, variance etc. For Pyspark dataframes, it is necessary to import them, for example from pyspark.sql.functions, like the mean() and standard dev stddev().
# Pandas
ratings_by_title = data.groupby('title')['rating'].std()
# PySpark
from pyspark.sql.functions import mean, stddev
coldata_sdf.groupBy('title').agg(stddev(col('rating')).alias('std'))
Pivot tables are commonly used in data analysis. Pandas dataframes arrange and aggregate all in one function pivot_table.
# Pandas
mean_ratings = data.pivot_table('rating',
index='title',
columns='gender',
aggfunc='mean')
To replicate the same outcome with Pyspark dataframes, we need to concatenate the grouping, pivot and aggregation functions.
# PySpark
mean_ratings_pivot = data
.groupBy('title')
.pivot('gender')
.agg(mean('rating'))
To sort the values in a column in ascending or descending order we can call the sort_values() function for Pandas dataframes. Remember that the default sorting order is ascending.
# Pandas
top_female_ratings = mean_ratings.sort_values(by='F',
ascending=False)
In PySpark there is a similar function called sort(), but the column that we want to sort, we have to give as an input. Similarly we could use the function orderBy() and would receive the same result.
# PySpark
top_female_ratings = mean_ratings.sort(mean_ratings.F.desc())
Over time the syntax of Pandas and PySpark will change. Maybe we are lucky and they will become more pythonic. There are also modules that combine provide and a Pandas API for PySpark, which is called Koalas. If its performance can compete with PySpark, we might see more of this in the future.
I hope you found some helpful tips and please let me know your insights from learning Spark, both syntax and functionality!
All the best,
Christine