A guide to help you efficiently transfer big data between services
As insights and sentiments about data become more abundant through various data ingestion sources, there’s a real need to design efficient mechanisms. These would deliver that data pertinently and reliably across applications and services to perform computations for data analytics, machine learning, and general persistence.
When we transition from large data sets (which are usually manageable without the need for specialized tools), we move to the uncharted realm of big data where size becomes significant. Without sound design principles and tools, it becomes challenging to work with, as it takes a longer time. In some cases, it becomes impossible to read or write with limited hardware, while the problem exponentially increases alongside data size.
Dealing with big data is a common problem for software developers and data scientists. It has paved the way for the tech we are accustomed to today, such as Cassandra, Hadoop, Apache Storm, NoSQL databases and Apache Spark, which now have become the de facto standards of how we should handle big data.
Having the means to move data in and out in an efficient and fast way has become vital to the success of critical decision making. In this article, we take a deep dive into principles and technology to keep in mind when you design and implement systems that need to handle big data transfer. At the end of this article, you will be well versed with techniques you can use to transfer big data from one location to another and build out truly scalable solutions.
Understand the fundamentals first
The core principles you need to keep in mind when performing big data transfers with python is to optimize by reducing resource utilization memory disk I/O and network transfer, and to efficiently utilize available resources through design patterns and tools, so as to efficiently transfer that data from point A to point N, where N can be one or more destinations.
“The essential problem of dealing with big data is a resource issue.”
Before you start to build any data processor, you need to know, or at least anticipate, the data volume. Having foresight for how much the data may potentially grow in the future will enable you to pick the right tools and technics to apply. Things you need to grasp and be aware of:
- It is time-consuming to process large datasets from end to end. It’s crucial to have checkpoints that allow restarting from the last successful checkpoint, as starting from the beginning tends to be more expensive.
- A function that works fine with small datasets may not work efficiently with big data. Big data processing requires a different mindset. Assumptions should be verified early on.
- Parallel processing works best for large datasets, while the ROI of parallelism starts to diminish with smaller datasets. Hence, it’s always good to strike a balance calculate and create a plan before attempting parallelism.
Reduce your data volume earlier
When working with big data, it’s crucial to reduce the data size early in the process. This is to ensure data can be processed in a performant way. This reduction process is iterative; there is no silver bullet; no one size fits all solution to solving the big data problem. It’s a combination of balancing resources and techniques and fully taking advantage of the hardware you have.
- Choosing the correct data type used to ingest data is important. For example, using appropriate integer type, over an unsigned integer, text or over integer.
- Apply data aggregation to reduce data volume while seeking granularity through ETL to transform data-keeping only what you need. Little is sometimes not enough, and too much becomes noise. Having an intimate understanding of your data can also help you correctly reduce the number of fields and data points required.
- Choose compression over raw data whenever possible to minimize data footprint, while frequently leveraging complex data structures to reduce data duplication.
Partition data as often as possible
When your data is larger than memory, partition it into smaller, more manageable parts. This is a crucial principle you need to take note of and remember when designing your big data transfer solutions.
Partitioning enables faster parallelism as the data volume grows. The size of each partition depends on how you partition your data, i.e which data points you use a user hash id for, or which you group by dates should be evenly distributed, to ensure the same amount of time taken to process each partition.
While partitioning is great, this should not be viewed as a constant that does not change. Changing the partition strategy at different stages of processing should be considered to improve performance.
Shared mount volumes for sharing data
One of the interesting things you can do when you work with solutions that should be scaled is to work with mounted external volumes. Mount volumes can come in various flavors. If you are working with AWS, shared EFS volumes can be attached to multiple instances, giving you have the ability to share data between multiple EC2 instances while having your data replicated between multiple Availability Zones. EFS is optimized for big data reads and writes. Most cloud solution providers have something similar.
Alternatively, you can opt for using s3fs, a FUSE filesystem that allows you to mount an Amazon S3 bucket as a local filesystem. s3fs stores files natively in S3, which can be accessed by other services. The maximum size of objects that s3fs can handle depends on Amazon S3.
If you intended to use s3fs, there are some important considerations to take note of. You do get the ability to mount volumes quickly across various services even outside of your AWS environment, provided you have sufficient permissions. This gives you the ability to have a dropbox-style sync across applications limited only by your bandwidth. While s3fs has some benefits, it also has some inherent limitations derived from s3 service such as:
5GB max for each PUT. If you want to upload a 5TB object, you’ll need to turn on multipart upload.
Due to S3’s eventual consistency design, the file creation can and, at times, will occasionally fail. To avoid such issues becoming a pain point, your application must either tolerate or compensate for these failures by retrying creates or reads.
Leverage multipart uploads often
One of the fastest ways to upload and share large data files is via multipart uploads, which give you the ability to split an object into multiple data blocks called parts and upload them separately.
Smaller datasets mean faster data transfer cycles, which makes sense when you are looking to use Python to perform Big data transfer to one or more services in your cluster.
Python provides a few libraries you can use to make it easy for you to perform multipart uploads, such as the Requests library, a simple library for networking in Python with rich features.
Now you might be thinking: if you are working with Big data in multipart uploads, how does that work? The key is to use a combination of chunking and multipart uploads, as big data can span gigabytes and terabytes. It wouldn’t make sense from a networking point of view.
There are some criteria you should keep in mind when deciding to use multipart for Big data transfer. Only use multipart when there is:
Poor network connectivity: Multipart uploads in the event of failure allows for re-uploading data from the point of failure.
Resumable upload required: An upload in progress can be paused and resumed at a later time if you are using AWS S3 boto3. It provides API’s you can use to build a resume feature into your logic.
Upload concurrency: When data is too large, you can take advantage of your network speed to perform concurrently multipart uploads.
Below is an example of how you would achieve a threaded multipart upload using AWS s3 for large data, which you can push to your data science services such as your Saturn Cloud Jupyter Notebooks.
To run the below code snippets, ensure you have the below prerequisites installed into your Python virtual environment or Python global packages if you are using anaconda’s conda. You can do either.
$ pip install boto3 python-magic
The python-magic library requires some additional dependencies for Mac & Windows users. Please refer to the GitHub for install instructions. Also to ensure you grant your AWS access, create access in order to upload data to an s3 bucket called saturncloud-ml-latest.
Multipart AWS S3 upload with threaded concurrency for performance improvements:
Multipart AWS S3 upload with threaded concurrency for performance improvements
When tools are not enough, managed services make sense.
A Case for AWS Kinesis
When you work with Saturn Cloud hosted on your AWS environment, you have at your disposal a wide range of tools that you can leverage to streamline how you pass big data around your applications.
One of the essential tools you need familiarize yourself with is Amazon Kinesis, a sort of Apache Kafka managed service that allows infinite scalability stream as a service. Kinesis consists of shards combined with Kinesis Data Firehose, and can ingest, process, and persist streaming data in realtime into a range of supported destinations such as S3, Redshift, Amazon Elasticsearch, and Splunk.
Kinesis service is commonly used due to its ease of use and low overhead, alongside its competitive pricing. While there are other tools like Apache Kafka that are self-hosted and not managed, the pricing and the managed nature is a differentiator between Kinesis Streams and Kafka.
Like any managed service, Amazon Kinesis has some limitations. You should be familiar with these — as well as how to overcome these with scaling and throttling. It will be wise to leverage the AWS provided producers, consumers, and available tools to leverage these best practices.
We can reduce our costs by leveraging on compression and bundling records into one insertion, as opposed to sequentially appending records to the stream. You can use the Kinesis Aggregation/Deaggregation Module for Python, which provides the ability to do in-memory aggregation and deaggregation of standard Kinesis user records using the Kinesis Aggregated Record Format to allow for more efficient transmission of records.
Let’s create our first piece of code that publishes records to an AWS stream. Ensure you have access to AWS Console with the necessary access keys granted to a user that has programmatic access to hit AWS with code below. Navigate to kinesis and create a stream with the default configurations. Take note that Amazon Kinesis Data Streams are currently not part of the AWS Free Tier program, so expect some minor charges to get below code working.
Ensure you have the necessary packages installed for Python ≥ 3.
$ pip install boto3
AWS Kinesis producer:
In order to read the data of the kinesis stream, you need to create a consumer such as illustrated below. Kinesis streams are based on the producer-consumer design pattern. The producer is the data generator or performs insertion, and the consumer reads the stream and performs an action with the data set. This makes sense when you wish to broadcast your big data to multiple services.
AWS Kinesis consumer:
Pandas can be used for large data
Pandas is one of the most popular python data science packages that allows you to process and manipulate large datasets. Though it’s more famous for number crunching and performing statistics, math, and data manipulation, this gem of a library can be used to partition large datasets so that data can be easily transmitted between applications, with the added advantage to write files directly to data stores.
Pandas is very efficient with small data (usually from 100MB up to 1GB), and performance is rarely a concern. Things start to change once you go above 1GB, and how much memory you have on your system matters. Naturally, the last thing you should be doing is loading Gigabytes of data into memory, which may eventually cause your python kernel to crash.
We all know about the distributed file systems like Hadoop and Spark which handle big data by parallelizing across multiple worker nodes in a cluster. There are times when it’s not necessary to use such tools as they introduce more moving parts into your applications. Pandas provide the ability to chunk large data sets which may alleviate the need for big data tools.
Within pandas, when reading JSON or CSV files you have the ability to chunk your data so as to minimize memory utilization. Chunking a fundamental big data concept is necessary to transfer large data between services. Large files generally should not be transferred over the wire as they are inherently susceptible to network interruptions which will lead your applications needed to retry the entire upload from scratch in the event of failure.
The “chunksize” parameter in pandas denotes the number of rows to be read into a dataframe at any single time in order to fit into the local memory.
The large data set used in the example below is extracted from GoupLens’ rating. The datasets are based on MovieLens’ website and are used in our coding example for educational purposes. Datasets will be used to showcase how you can manipulate large datasets with chunking in pandas. Data consists of over 22 million records. It’s more efficient to process the data set in chunks, using an iterator object. Ensure you have the necessary packages for the example code below:
$ pip install pandas tqdm memory-profiler
Below code illustrates how chunking can be achieved on a CSV file.
Pandas chunking illustrated:
The operation below results in a TextFileReader object for iteration. df_chunk is not a dataframe but an object for further operation in the next step.
One of the things you want to take note of is that chunk size should not be too small nor too large. If it is too small, the IO cost will be high to overcome. If it’s too high, you risk maxing out your memory, which may lead to crashing your application. Using pandas is not only limited to CSV files. You can apply the same principles of chunking with JSON files too.
df_partition = pd.read_json("ml-latest/ratings.json", orient="index", lines=True, chunksize=1000)
In the above example, we saw how we can use chunking and write content to a file. We can just as easily take the same code and route data into a messaging queue service such as AWS SQS. This could also potentially trigger Lambda serverless handle to persist your data into alternative storage store, or directly drop your files into S3. Which would give us the benefit of triggering AWS lambdas for further processing.
When working with queues, you should consider designing your system around the concept of eventual consistency. This means your message will eventually get there and be processed, but maybe not be when or even in the order you expect.
When Pandas is not good enough, leverage on Dask for big data.
Dask is a lightweight distributed data science library built with the same concept as Apache Spark. Dask allows you to take your common data science packages such as NumPy pandas, mix in larger-than-memory work, distribute it, and scale it across one or nodes, and, in essence, form a distributed network capable of performing parallel computations. Virtually any python function can be distributed across multiple nodes. This gives you the ability to potentially process petabytes of data in an efficient manner.
What you might not know is you can take advantage of the distributed nature of Dask to help chunk out big data in a way that is easy to distribute between your application services. There is nothing preventing you from distributing any python functions such as the functions you use for performing big data transfer, as illustrated in the below design.
Below code example, we can see how we can split large CSV files into smaller, more manageable partitions that we can then use to transmit over the wire to other services. Tqdm library is being used to display progress by downloading the sample data.
Ensure you have the necessary packages installed.
$ pip install tqdm memory-profiler dask[complete]
Dask distributed file chunking:
In the above example, a local cluster is being spun up on your local machine (i.e. the one running the Python interpreter). The scheduler of this cluster is listening Dask default ports 8786 while the Dask dashboard running on bokeh server will be displayed on port 8787.
Each worker is a process since I specified:
If we are running on Saturn Cloud-managed Dask, we don’t have to worry about cores and the number of CPUs as that will be managed automatically for us.
We can make things more dynamic by adding a simple calculation. If you’re running on non-managed servers or your local machine, apply a simple calculate as shown below:
n_workers=int(0.9 * mp.cpu_count())
This would be 90% of the number of CPU cores, rounded down. So if you have a 4-core machine laptop, 3 worker processes will be able to span up, leaving 1 core for your IDE, jupyter notebooks, or any other background processes running on your machine for the duration of your testing.
If you’re running on a machine with limited resources, you might get the below exception:
Worker exceeded 95% memory budget. Restarting
When faced with such an error, there are a couple of things you can do. Adjust your configurations e.g increase memory on workers, and reduce and readjust code to use smaller partitions. npartitions property is the number of Pandas data frames that compose a single Dask dataframe. This affects performance in two main ways.
If you don’t have enough partitions, then you may not be able to use all of your cores effectively. For example, if your dask.dataframe has only one partition, then only one core can operate at a time. If you have too many partitions, then the scheduler may incur a lot of overhead deciding where to compute each task.
As of Dask 2.x.x, you may call:
This method performs an object-considerate
breakdown of the partition size. It will join smaller partitions or split partitions that have grown too large.
If you’re using Dask in a cluster, you must always remember each worker in a cluster has no idea about your data unless you take it where you need to find that data which you can achieve using one of these methods:
- Replicate the file by copying it to every worker — this might not make sense if you are running in a serverless environment or are limited by disk space.
- Add your data in a shared volume or Networked filesystem mount, mounted HDFS. The implementation details go beyond the scope of this article, but this should give you some insights.
- Add your data to block storage such as Amazon S3 which Dask supports via s3fs.
- If your data is relatively small enough and not larger than memory, you may opt to use local memory and distribute data amongst your cluster, which makes sense for local testing.
This is to ensure that your data can be processed in a distributed fashion and relayed correctly to those target microservices or services.
To fully take advantage of Dask, it is recommended to leverage on a cloud-based Dask implementation so you don’t have to worry about scaling from single to multiple machines, memory, and compute to fully derive the benefits of processing big data within your Python applications.
As your data increases in size, it is important to take note of some best practices when using Dask, such as always starting small and optimizing from that point forward. For additional tips on dask best practices, you may refer to dask best practices.
Work with better file formats
We have discussed how to process big data, and we also touched on points about network speeds and disk IO. Another big hurdle when it comes to working with big data is the choice of file formats. Every file format has its benefits and disadvantages, which can potentially mean building out scalable python solutions or solutions that will only work until your first 20GB before grinding to a halt.
This is something we all need to keep in mind when we design systems that handle big data. A tremendous pain point and times bottleneck for applications that handle big data is the time it takes to find relevant data and the time it takes to write the data to another target location.
Incorrectly choosing a file format can be costly down the road. While storage has gotten relatively cheap over the years, incorrect file types of storage data can have overheads in terms of data extraction and processing.
While choosing a good file type has more robust complex data structures optimized for speed can have some significant benefits such as Faster read times, better write times, easier ability to split or even out of the box compression support, and schema evolution support.
Below are some file formats you may want to start incorporating into your python applications to aid in big data processing and data transfer between services.
Hierarchical Data Format (HDF) is a set of file formats (HDF4, HDF5), data models, and a library designed to handle and manage extremely large amounts of data and complex data collection. Leveraging on this file type can be done through the h5py package on a Pythonic interface.
Parquet, an open-source file format for stores nested data structures in a flat columnar format. Compared to other traditional approaches where data is stored in a row-oriented approach, it’s very efficient in terms of storage and performance with the ability to query specific data sets which directly minimizes IO operations.
Avro is a row-based storage format for Hadoop which is used as a serialization platform. File stores schema in JSON format making it easy to read and interpret by any program. The data itself is stored in a binary format making it compact and efficient.
These should give you some ideas of file formats you need to be looking into to manage your big data with python while incorporating the principles we have discussed to transfer the data to your services at times — JSON and CSV just don’t cut it when data becomes too big.
Designing applications that work on big data while still being performant is a challenging undertaking. The principles and design approaches outlined in this article will give you the necessary knowledge you can start applying within your python applications.
With most things when it comes to big data, your first design implementation might not derive massive performance improvements due to the limitation of hardware, network, design choices, or storage capacity. It’s essential to approach systems that work with big data in an iterative approach to optimize performance.
As our need for insights through machine learning grows, adhering to best principles is necessary to build out scalable solutions. There are many more tools you can use to aid in big data transfer — most, if not all, share the same principles discussed.
Guest Post: Tim Mugayi
You may also be interested in: Python Cheat Sheet