Dask offers high-speed, scalable Data Science in Python
If you work with data, whether that’s as a Data Scientist, Analyst, Engineer, or Enthusiast, Pandas is going to be your go-to Python library. Pandas handles just about everything you’d want to do with your data, including loading, filtering, transforming, and general manipulation.
Pandas is remarkably easy to use, serving a very intuitive API with many options for every DataFrame function. Most data that people work with using Pandas is handled in a very similar way. That is, there isn’t much extra effort required for working with different types of data in terms of code usage. Kaggle has run hundreds of Data Science competitions over the years in a pretty diverse variety of domains and Pandas gets used by most of the competitors almost every time. The graph below comes from the Stack Overflow blog and shows how the popularity of Pandas has boomed over the past several years.
Yet handling big data with Pandas still remains a challenge. Pandas wasn’t initially designed for data at large scales. It was originally released back in 2008, quite a long time ago. The challenge of getting Pandas to work with big data really has to do with natural limitations related to both software and hardware. In general, there are three main points to look at:
- Once your data gets too big to fit into memory, you’ll have to get pretty clever with your Pandas tricks to be able to fit it into memory. It’s a lot of extra code and ends up slowing down the computation since it’s not an optimal use of Pandas.
- Even if you do manage to work with the data on limited RAM, the processing itself will be slow. Doing something as simple as computing the average over 10 million rows will be incredibly slow since Pandas will do the computation using one CPU core at a time. And that’s only a simplistic operation.
- Once things get really slow, scaling hardware is usually the next chosen option. Companies will set up large cloud clusters and leverage frameworks like Apache Spark to scale computations across the cluster. This requires extra skills in DevOps and using Apache Spark, not to mention a lot of extra work, time, and money.
The Dask framework attempts to address all of these challenges while adhering to Pandas style and structure.
What is Dask?
Dask is popularly known as a Python parallel computing library. Through its parallel computing features, Dask allows for rapid and efficient scaling of computation. What this does for us is provide an easy way to handle big data in Python with minimal extra effort beyond the regular Pandas workflow.
As an example, suppose we had a DataFrame containing hourly stock information, including prices, for all companies in the S&P500. That’s
24 x 365 x 500 = 4,380,000 4.38 Million rows, a fairly sizable DataFrame. To calculate the mean value of all columns in this DataFrame using Pandas, one would normally just execute some built-in Pandas function like so:
That’s all fine and dandy but also pretty darn slow. Calculating the mean requires adding all values in each column of the DataFrame. Doing this for 4.38 Million rows will take an unreasonable amount of time.
Part of the reason it’s so slow is that the
.mean() function only uses one CPU core at a time. Yet such an operation can benefit greatly from having multiple cores working on it. If your process is able to use two cores for example, instead of having one core add up all the numbers, you could have one core add half of them and the other core add the other half, which should get you about double the speed. This gets even better at scale, since the speed-up for calculating the mean can be roughly linear — 100 cores ~= 100X speed-up.
Most consumer CPUs have multiple cores, at least 4 in the case of an Intel i5 and even hitting 18 for the newer Intel i9. If you need more, you can pretty easily spin up a compute-optimized instance on AWS with 96 virtual CPU cores. GPUs can potentially offer even more speed with thousands of CUDA cores available.
Dask allows you to pick from any of those options at any time, along with the ability to scale them up and down, all without more than a few lines of code changes. We saw before that working with big data in Pandas had three main challenges: fitting the data into memory, processing it fast, and scaling the whole thing. Dask addresses these directly:
- To manage memory consumption, Dask will keep the data stored on disk (your hard drive) and load chunks of the data (smaller parts) as needed for processing. This is similar to how you would do chunking with Pandas, only this time it’s handled entirely by Dask and is optimized. During the processing, the intermediate values generated (if any) are discarded as soon as possible to save on memory, so Pandas is only ever working with as much data as it absolutely needs, no more.
- Dask seamlessly performs parallel processing on a single machine using all available CPU cores. You don’t have to write any custom code to get it to work efficiently. Dask just uses the cores naturally, out of the box.
- Dask scales seamlessly to any number of cores and across any number of machines. It uses all the cores of larger clusters, such as those in the cloud, as if they were in one single machine. It’s not necessary for the instances in the cluster to have the same number of cores or have the same power. Dask will use them all together, applying whatever resources it has available to it to process the DataFrame.
We’ll now go through a few examples to see how we can actually use Dask with our data — on a multi-core CPU and on a GPU. To get started, we can install Dask via pip.
pip install "dask[complete]"
Dask by Example
Handling Data with Dask DataFrames
For the following examples, I’ll be using an i7–8700k CPU which has 6 cores and 12 threads along with a 1080Ti GPU. We’ll start by generating some random stock data for 5 different companies: Apple, Google, Amazon, Netflix, Microsoft. We’ll generate 1 Million rows, enough to benchmark the speed of Pandas vs Dask.
Now that our data is in a CSV, we can load it directly into a Dask DataFrame. Once loaded, we repartition the data into a more optimal arrangement.
When we load up our data from the CSV, Dask will create a DataFrame that is row-wise partitioned i.e rows are grouped by index value. That’s how Dask is able to load the data into memory on-demand and process it super fast — it goes by partition.
The official documentation of Dask recommends that partitions be kept to a size of 100MB or less like we’ve done in the code above. This is done to balance the parallel processing speed gains with the overhead and handling of the partitions.
The Dask documentation also lists several DataFrame operations that are particularly fast when using Dask. Let’s run a few of them.
Dask Acceleration on CPU
Let’s start with something simple, calculating the mean price of the entire 1 million rows. This isn’t listed as one of Dask’s specialties, but it’s still good to see how it performs in different situations. The code below is quite simple:
To measure the run time, I used the
time.time() function before and after each operation and then subtracted to get the difference. In the end, the Pandas operation took 2.5 ms while the Dask one took about 1.0 ms — that’s a 2.5X speed-up that we got basically for free.
Things get even better when we benchmark operations that Dask is specially optimized for as specified in the official docs. Doing filtering is a little bit more computationally heavy than the simple mean calculation we did before.
In this case, Pandas took 13 ms while Dask only took 0.7 ms. That’s an 18X speedup! That’s the real power of using all of our cores and efficient memory management.
But we’re not done yet.
Adding DataFrames that have 1 Million rows takes a lot of operations. Yet the operations are quite simple — it’s just a lot of additions. Dask should be very well suited to this kind of thing with its efficient usage of the CPU cores.
The Pandas operation took 665 ms while Dask took only 13 ms, that’s an insane 51X speedup!
These examples showcase the real power of Dask. The code looks exactly like Pandas code, except we have Dask working its speed magic on the backend.
The table below shows a list of operations benchmarking Pandas vs. Dask. Check it out to get an idea of just how much speed Dask can give you on CPU.
Dask Acceleration on GPU
One of the core components of the design of Dask is that it’s seamless. It simply runs whatever Python functions you give it. Dask doesn’t care what kind of compute it’s using, whether CPU or GPU, local or in a cloud cluster. Such a flexible design allows us to easily leverage GPUs with Dask.
Recall that when replacing Pandas DataFrames with Dask, all we do is simply convert the DataFrame type, from Pandas to Dask. Dask takes care of the rest.
We can use the same process when using GPUs with Dask: convert the DataFrames to a format which can leverage GPU acceleration on DataFrames. Luckily, this already exists in Python.
cuDF is a Python-based GPU DataFrame library for working with data including loading, joining, aggregating, and filtering data. Just like Dask, it’s almost a mirror copy of the Pandas API allowing for very smooth workflow while leveraging GPUs on the backend.
To take advantage of cuDF speedups with Dask, we can convert our Pandas-Dask DataFrame to a cuDF-Dask DataFrame. This will automatically and seamlessly get the data ready to be processed on GPU. We can do this with a single line of code.
Dask will then work the exact same way, with the only difference being that we’re now doing the computation on the GPU rather than the CPU.
That’s it! All of the code from our previous section on CPU acceleration with Dask remains the exact same. The only important thing to note is that cuDF doesn’t yet support all of the operations Pandas does, only a subset. Luckily, that subset contains the most common operations.
The table below shows our new speedup benchmark, this time using Dask with the GPU. Our speed got even better without requiring any extra effort and very little extra code. Note that some operations were omitted since cuDF didn’t support them.
To see more of Dask on GPU, check out the official blog post on GPU Dask Arrays.
Practical Dask Tips
We’ve covered all of the basics of Dask. By following along with the examples above, you’ll find that it’s quite easy to get up and running with Dask in your workflow. Real-world usage and applications are often another big step. Here are a few tips for using Dask effectively in practice.
When using any kind of software designed to increase speed, it’s important to understand exactly how much performance boost you’re getting out of it, and in what way. Dask provides a built-in diagnostics tool to profile its performance. In particular, the diagnostics tool can be used to:
-Display a progress bar on the terminal to keep track of how some longer processes are progressing
-Measure the speed of each Dask task using the Profiler
-Measure the CPU and Memory percentage usage with the ResourceProfile
For a visual representation of the status of the cluster, Dask provides a web interface. Built using Bokeh, this interface shows running and completed tasks, resource usage, worker status, and overall cluster health.
Handling data that can’t fit into memory
Dask can effectively be used as a tool for datasets that can’t fit into memory. Since Dask partitions your data, when you try and load a dataset from disk that is bigger than your RAM, Dask will just read in the data on an as-needed basis. Thus, if you find that you have a dataset that can’t fit into memory with Pandas, you can use Dask as a direct drop-in replacement.
The most optimal way of loading data with Dask
There are actually two ways to create the optimized Dask DataFrames: directly from the saved data on disk and by converting from Pandas DataFrames. In our examples above, we took the former approach which is the fastest one. The latter approach will work OK, but will be less efficient since Dask now has to move around all of the data into the format that it likes. Whereas if you load from CSV to Dask DataFrame directly, Dask is able to get the data in the format it likes right away without any extra work.
Selecting the best number of partitions
The selection of the number of partitions for Dask to use is critical for optimal performance. If you select too little or too many partitions, Dask could end up being slower than regular Pandas! The official Best Practices for Dask recommend finding a balance for the number of partitions: small enough that each worker process can fit its task into memory and big enough that Dask can actually take advantage of having multiple cores.