Adversarial file reading: from 10,000 small CSVs to massive Parquet files
How Daft optimizes the reading of real-world data which is often a mix of "many small files" and "few large files"
TLDR;
When reading large Parquet files, Daft can automatically split the file read across multiple processes and machines. By parallelizing the read, Daft can maximize network bandwidth and CPU utilization while keeping partition sizes manageable!
On the other hand, when reading many small files, Daft will automatically merge file reads, reducing the total number of partitions and performing bulk file reads, which also improves performance. Bulk reads allow Daft to interleave compute and IO not just within a single file but also across files. For high latency object storage like AWS S3 or Google GCS, this can increase throughput up to 10 times.
Read on to learn how we do this and why this is important!
Background
Real world data is messy, but Daft has to handle it all!
Query engines differ from Databases in one key area: they don’t manage storage!
That is to say, the Daft query engine has no control over the format or sizes of the files that it is instructed to read. As a result, Daft needs to be able to effectively handle adversarial cases such as:
10,000 small 10KB Parquet files
1 large 1TB Parquet file
A mix of the above situations, potentially with mixed file formats
This is a hard problem, especially for distributed query engines such as Daft and Spark, where the data from these files need to be partitioned and sent to many different machines for processing.
Viewed from an algorithmic perspective, this is a Bin Packing problem where each bin is a partition. Our variation of the problem has a few interesting properties to consider:
Minimizing the number of partitions: Downstream distributed data shuffles (done during sorts, groupby’s, etc) are generally
O(N^2)
, where N is the number of partitions, so reducing the number of partitions improves the efficiency of shuffles quadratically! Additionally, creating each partition incurs some fixed cost from things such as reading file metadata, job scheduling, and establishing network connections.Sufficient partitioning to leverage available resources: However, we also want to have enough partitions to allow for higher parallelism over the cluster, such that network bandwidth and CPU utilization are maximized.
Preventing partitions from being too large: Each partition should also not exceed the memory limits of the worker, as this could lead to Out-Of-Memory (OOM) issues when trying to process giant partitions!
Parquet File Format
As a recap, in addition to being a columnar file format, Parquet also has the ability to split data in a file into RowGroups, which are chunks of contiguous rows. Each RowGroup can be read independently, as long as you incur the overhead of reading the file’s FileMetadata first upon each read.
RowGroups are typically around 128MB in size, although this can be configured by the application that writes the Parquet file. Thus, a big 1TB Parquet file might actually have about 7,800 RowGroups! This means that there is a big opportunity here for Daft to read this file in parallel extremely efficiently.
We previously did a deep-dive into the Parquet format where you can learn more about the deep details of Parquet!
Solution: Daft ScanTasks
Daft packs the work of reading files into ScanTasks, which encapsulate the “scanning” of a batch of data. Each ScanTask turns external data into one partition in Daft. For Parquet files, the smallest granularity of a batch of data is simply the Parquet RowGroup!
During the construction of a query’s physical plan, we perform a two-pass algorithm to optimize the reads of file lists produced by storage systems such as Iceberg, DeltaLake, and S3 file listing. This algorithm is also configurable using the Daft configuration parameters:
daft.set_execution_config(
scan_tasks_min_size_bytes=96*1024*1024,
scan_tasks_max_size_bytes=384*1024*1024,
parquet_split_row_groups_max_files=10,
)
Splitting Files
daft_scan::scan_task_iters::split_by_row_groups
First, we split files to handle the cases where reading files that are too large.
We split Parquet files that are over scan_tasks_max_size_bytes
in size by RowGroup into multiple ScanTasks. In order to ensure that the fixed cost of running another ScanTask doesn’t outweigh the benefits of increased parallelism, we combine RowGroups into each ScanTask until the size of the ScanTask is at least scan_tasks_min_size_bytes
.
Note: To prevent ScanTask creation from being bottlenecked by metadata reads, we only do Parquet file splitting when there are fewer than
parquet_split_row_groups_max_files
number of files.
As you can see from the comparison below, splitting one Parquet file into many ScanTasks yields much better CPU and memory utilization across a cluster.
The result is a potential speed-up of 2x or more in file reads and downstream tasks! Exactly how we measured this is described in the Pull Request that made this change.
Merging Files
daft_scan::scan_task_iters::merge_by_sizes
Next, we merge ScanTasks to handle the case of having many small files.
When ScanTasks are smaller than scan_tasks_min_size_bytes
, we iteratively merge them into larger ScanTasks. We merge tasks until we pass the scan_tasks_min_size_bytes
threshold or when the merged ScanTask would produce a ScanTask larger than scan_tasks_max_size_bytes
.
Check out our talk at PyData Global 2023 if you’d like to see an in-depth demo of Daft’s I/O capabilities, including the use of Daft to read 10,000 CSV files in one minute!
Conclusion
By splitting or combining files into ScanTasks, Daft is able to control the size of its partitions in order to execute queries faster and more efficiently. This is done using a two-pass algorithm, and takes advantage of the RowGroups feature that is part of the Parquet file format!
Roadmap
Of course, this algorithm isn’t perfectly optimal, and follow-on work can be done to further optimize this workload. Some future work includes:
Taking into account the CPU and memory capacities of the workers in the cluster to automatically determine better configuration values
Performing splitting and parallel reads for other formats such as CSV and JSON
If this sounds interesting to you, join our Slack!