When we set out to build Daft to fix our many dissatisfactions with the state of distributed complex data processing, efficient reading of data from cloud storage was at the top of our wishlist.
Modern data teams rely heavily on cloud object storage (such as Amazon’s AWS S3) to back a data catalog format (e.g. Apache Iceberg, Delta Lake or Hudi) for storing and serving their data. These S3-compatible systems and their Python client libraries provide good performance on simple workloads (e.g. downloading entire ~100MB files).
However, real-world data use-cases will often come in all sorts of shapes and sizes:
Pulling only one column from thousands of Parquet files, each varying in size from 10KB to 200MB
1 million .jpeg files
Deeply nested file hierarchies that make just file listing take many minutes to run
While running our benchmarks and large distributed workloads, we saw that an overwhelming amount of walltime (>80% in some cases) was actually just spent reading data from cloud storage!
As it turns out, I/O efficiency from cloud storage matters a ton when optimizing these workloads since it is unlikely that real world workloads will have all the data conveniently already downloaded onto high speed local NVMe SSDs.
The unavoidable Step #0 in most workloads:
Fetch as little data as possible, as fast as possible, from cloud storage please!
In prior versions of Daft, we leverage the PyArrow library to read data from AWS S3. Starting from Daft 0.2 onwards, we now default to using our own highly optimized async Rust code for all data I/O by default.
In collaboration with our partners at Amazon, our native Rust I/O code is battle-tested daily on petabytes of data in production
This blogpost covers some quick experiments using the latest version of Daft getdaft==0.2.7
. For ease of demonstration, we will be running all code on this blogpost on an AWS EC2 r5.8xlarge machine, but all of the demonstrated benefits will also translate into distributed execution on a Ray cluster!
r5.8xlarge machines have 32 cores / 256G memory / 10Gbps network bandwidth.
Highlight #1: Native Parquet reads
Daft is now 5x faster across the board when reading Parquet files from S3 compared to our previous solution that leverages just PyArrow’s Parquet readers.
import daft
# Folder with 22GB of Parquet files
# (about 44GB when decompressed in-memory)
PATH ="s3://daft-public-datasets/tpch-lineitem/100_0/32/"
df = daft.read_parquet(PATH)
When calling df.collect()
at this point, Daft reads and decodes all the parquet files into memory, and we can observe network throughput peaking at 9Gbps, saturating the machine’s network:
(5x Speedup) PyArrow Reader: 148s vs Daft Native Reads: 27s
We can also select just a single column, which is an operation that we can perform extremely efficiently on the Parquet columnar file format. This now takes a fraction of the time!
df.select("L_ORDERKEY").collect()
(5x Speedup) Daft PyArrow Reader: 18.5s vs Daft Native Reads: 3.8s
Lastly, Daft now also performs incredibly efficient data pruning based on Parquet metadata, allowing us to perform filters such as these without having to pull the actual data!
df.where(df["L_ORDERKEY"] < 100).collect()
(50x Speedup) Daft PyArrow Reader: 131s vs Daft Native Reads: 2.4s
Highlight #2: Native CSV/JSON reads
Our CSV and JSON reader have also been rewritten using native Rust code. We built our code to efficiently pipeline downloading and deserialization of data which maximizes resource utilization when reading from a high latency data source such as AWS S3. This gives us significant speed-ups compared to Daft’s previous PyArrow-based solution.
This 700MB CSV file is now much faster to read.
df = daft.read_csv(
"s3://daft-public-data/test_fixtures/csv-dev/big.csv"
)
(3x Speedup) Daft PyArrow Reader: 51s vs Daft Native Reads: 18.4s
And this 1.1GB JSON-line-delimited file similarly sees a big speedup:
df = daft.read_json(
"s3://daft-public-data/test_fixtures/json-dev/nyc_taxi.jsonl"
)
(6x Speedup) Daft PyArrow Reader: 172s vs Daft Native Reads: 29.3s
However, we did notice network utilization hovering only at ~2Gbps, indicating that these workloads are likely bottlenecked during deserialization by the CPU. There is a lot more to improve on here such as leveraging even more SIMD instructions or smarter parallelism strategies and the team is really excited to deliver even more speed-ups in the coming few months!
Highlight #3: Parallel small-file S3 downloads
Often when working with S3, we hear that it can be challenging to work with “many small files” (e.g. tens of thousands of small 300kb jpeg image files).
With Daft, we have successfully optimized this workload when executing our .url.download()
expression, which is an easy way of downloading data from a column of URLs.
Incredibly, Daft is able to almost saturate the AWS EC2 machine’s network on this workload. Network bandwidth peaks at about 7Gbps when downloading 40,000 small JPEG images from their URLs in the dataframe!
IMAGES_PATH = "s3://daft-public-datasets/open-images/validation-images/**.jpg"
df = daft.from_glob_path(IMAGES_PATH)
df = df.with_column("data", df["path"].url.download())
(30x Speedup) Daft multithreaded fsspec: 630s vs Daft Native Reads: 25s
Highlight #4: The fastest S3 file lister in the wild west
That’s not all: We even gave file listing a little bit of love and a lot more oomph!
Globbing of filepaths is an extremely common operation in workloads that operate just on files in AWS S3, such as ML training or when you need to discover “new” files in a bucket. Daft provides a Unix glob API to allow users to flexibly specify which filepaths should be matched, including support for special glob characters such as *
, ?
, […]
and the recursive wildcard **
.
Rebuilding our globbing in Rust nets Daft anywhere between a 5-20x increase in file listing performance depending on your folder/prefix layouts in S3. We think that we may have just built the fastest cloud bucket globber available in Python! Here is an example where we can list 10,000 files in a nested folder hierarchy:
df = daft.from_glob_path(
"s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv"
)
(7x++ Speedup) Boto3 `list_objects_v2`: 2.32s vs Daft Globbing: 300ms
Naive file listing scales linearly with the number of items in your bucket, but Daft’s file listing algorithm leverages nested folder hierarchies and intelligent parallel listing where possible to achieve logarithmic scaling. This makes a big difference for buckets with hundreds of thousands/millions of files which can take minutes to just list their contents!
And that’s a wrap!
We put a lot of work into our I/O layer, and are looking forward to delivering even more powerful features in the coming months:
Data Catalog support (Hive, Apache Iceberg, Delta Lake, Hudi etc)
Native data writing support and optimizations
Other file formats (ORC, Avro etc)
Microsoft Azure Blob Store and Google Cloud Storage is also supported in Daft, but we would love to work with users who need to optimize workloads for these use-cases. Let us know if you’d like to collaborate!
Stay tuned for more updates and join us on GitHub/Slack to chat with us. We’d love to hear your feedback and learn more about your workloads!