Random Forest on GPUs: 2000x Faster than Apache Spark
If you prefer to watch a video demo, click here.
Random forest is a machine learning algorithm trusted by many data scientists for its robustness, accuracy, and scalability. The algorithm trains many decision trees through bootstrap aggregation, then predictions are made from aggregating the outputs of the trees in the forest. Due to its ensemble nature, a random forest is an algorithm that can be implemented in distributed computing settings. Trees can be trained in parallel across processes and machines in a cluster, resulting in significantly faster training time than using a single process.
In this article, we explore implementations of distributed random forest training on clusters of CPU machines using Apache Spark and compare that to the performance of training on clusters of GPU machines using RAPIDS and Dask. While GPU computing in the ML world has traditionally been reserved for deep learning applications, RAPIDS is a library that executes data processing and non-deep learning ML workloads on GPUs, leading to immense performance speedups when compared to executing on CPUs. We trained a random forest model using 300 million instances: Spark took 37 minutes on a 20-node CPU cluster, whereas RAPIDS took 1 second on a 20-node GPU cluster. That’s over 2000x faster with GPUs!
Warp speed random forest with GPUs and RAPIDS!
We use the publicly available NYC Taxi dataset and train a random forest regressor that can predict the fare amount of a taxi ride using attributes related to rider pickup. Taxi rides from 2017, 2018, and 2019 were used as the training set, amounting to 300,700,143 instances.
The Spark and RAPIDS code is available in Jupyter notebooks here.
Spark clusters are managed using Amazon EMR, while Dask/RAPIDS clusters are managed using Saturn Cloud.
Both clusters have 20 worker nodes with these AWS instance types:
- 8 CPU, 64 GB RAM
- On-demand price: $0.504/hour
- 4 CPU, 16 GB RAM
- 1 GPU , 16 GB GPU RAM (NVIDIA T4)
- On-demand price: $0.526/hour
Saturn Cloud can also launch Dask clusters with NVIDIA Tesla V100 GPUs,
but we chose
g4dn.xlarge for this
exercise to maintain a similar hourly cost profile as the Spark
Apache Spark is an open-source big data processing engine built-in Scala with a Python interface that calls down to the Scala/JVM code. It’s a staple in the Hadoop processing ecosystem, built around the MapReduce paradigm, and has interfaces for DataFrames as well as machine learning.
Setting up a Spark cluster is outside of the scope of this article, but once you have a cluster ready, you can run the following inside a Jupyter notebook to initialize Spark:
import findspark findspark.init() from pyspark.sql import SparkSession spark = (SparkSession .builder .config('spark.executor.memory', '36g') .getOrCreate())
findspark package detects the location of the Spark install on
your system; this may not be required if the Spark packages are
discoverable. There are several configuration settings that
need to be set to get performant Spark code, and it depends on your
cluster setup and workflow. In this case, we set
spark.executor.memory to ensure we don’t encounter any memory overflow or
Java heap errors.
RAPIDS is an open-source Python framework that executes data science code on GPUs instead of CPUs. This results in huge performance gains for data science work, similar to those seen for training deep learning models. RAPIDS has interfaces for DataFrames, ML, graph analysis, and more. RAPIDS uses Dask to handle parallelizing to machines with multiple GPUs, as well as a cluster of machines each with one or more GPUs.
Setting up GPU machines can be a bit tricky, but Saturn Cloud has pre-built images for launching GPU clusters so you get up and running in just a few minutes! To initialize a Dask client pointing to your cluster, you can run the following:
from dask.distributed import Client from dask_saturn import SaturnCluster cluster = SaturnCluster() client = Client(cluster)
To set up a Dask cluster yourself, refer to this docs page.
The data files are hosted on a public S3 bucket, so we can read the
CSVs directly from there. The S3 bucket has all files in the same
directory, so we use
s3fs to select the
files we want:
import s3fs fs = s3fs.S3FileSystem(anon=True) files = [f"s3://" for x in fs.ls('s3://nyc-tlc/trip data/') if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)] cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']
With Spark, we need to read in each CSV file individually than combine them together:
import functools from pyspark.sql.types import * import pyspark.sql.functions as F from pyspark.sql import DataFrame # manually specify schema because inferSchema in read.csv is quite slow schema = StructType([ StructField('VendorID', DoubleType()), StructField('tpep_pickup_datetime', TimestampType()), ... # refer to notebook for full schema object ]) def read_csv(path): df = spark.read.csv(path, header=True, schema=schema, timestampFormat='yyyy-MM-dd HH:mm:ss', ) df = df.select(cols) return df dfs =  for tf in files: df = read_csv(tf) dfs.append(df) taxi = functools.reduce(DataFrame.unionAll, dfs) taxi.count()
With Dask+RAPIDS, we can read in all the CSV files in one shot:
import dask_cudf taxi = dask_cudf.read_csv(files, assume_missing=True, parse_dates=[1,2], usecols=cols, storage_options=) len(taxi)
We’ll generate a few features based on the pickup time and then cache/persist the DataFrame. In both frameworks, this executes all the CSV loading and preprocessing, and stores the results in RAM (in the RAPIDS case, GPU RAM). The features we will use for training are:
features = ['pickup_weekday', 'pickup_hour', 'pickup_minute', 'pickup_week_hour', 'passenger_count', 'VendorID', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID']
For Spark, we need to collect the features into a
from pyspark.ml.feature import VectorAssembler from pyspark.ml.pipeline import Pipeline taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType())) taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType())) taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType())) taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType())) taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0)) taxi = taxi.withColumn('label', taxi.total_amount) taxi = taxi.fillna(-1) assembler = VectorAssembler( inputCols=features, outputCol='features', ) pipeline = Pipeline(stages=[assembler]) assembler_fitted = pipeline.fit(taxi) X = assembler_fitted.transform(taxi) X.cache() X.count()
For RAPIDS, we convert all float values to
float32 precision for
from dask import persist from dask.distributed import wait taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float) taxi = taxi.fillna(-1) X = taxi[features].astype('float32') y = taxi['total_amount'] X, y = persist(X, y) _ = wait([X, y]) len(X)
Train random forest!
We initialize and train the random forest in a couple of lines for both packages.
from pyspark.ml.regression import RandomForestRegressor rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42) fitted = rf.fit(X)
from cuml.dask.ensemble import RandomForestRegressor rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42) _ = rf.fit(X, y)
We trained a random forest model on 300,700,143 instances of NYC taxi data on Spark (CPU) and RAPIDS (GPU) clusters. Both clusters had 20 worker nodes and approximately the same hourly price. Here are the results for each portion of the workflow:
|Load/row count||20.6 seconds||25.5 seconds|
|Feature engineering||54.3 seconds||23.1 seconds|
|Random forest||36.9 minutes||1.02 seconds|
That’s 37 minutes with Spark vs. 1 second for RAPIDS
GPUs for the win! Think about how much faster you can iterate and improve your model when you don’t have to wait over 30 minutes for a single fit. Once you add in hyperparameter tuning or testing different models, each iteration can easily add up to hours or days.
Need to see it to believe it? You can find the notebooks here and run them yourself!
Do you need faster Random Forest?
Yes! You can get going on a Dask/RAPIDS cluster in seconds with Saturn Cloud. Saturn handles all the tooling infrastructure, security, and deployment headaches to get you up and running with RAPIDS right away. Click here to use it for free.