Fast Parallel CSV Reader, Better Daft Launcher UX, and Support for Stateful UDFs
A look into October’s Daft-SQL developments, Hive-partitioned reads, Apache Iceberg Community Meetup, and more!
Daft Feature Updates 💡
v0.2 to 0.3 📈
The release of Daft v0.3 marked the first minor version increment in almost 10 months! Thank you to the growing Daft community who have supported us throughout this journey by raising issues, submitting pull requests, and asking questions on our Slack. This release includes several exciting new features and performance improvements, including enhanced data source support, full join type coverage, faster file reads, and streaming execution. To learn more and get a sneak peek at our future roadmap, read our blog post From v0.2 to 0.3: Harder, Better, Faster, Stronger.
Daft-SQL 📄
Last month we introduced Daft-SQL, our newest API that is designed to let users interact with their data using traditional SQL queries within the Daft ecosystem. Whether you’re analyzing large datasets or performing simple queries, Daft-SQL offers a streamlined, SQL-based interface that simplifies your workflow without any loss in speed or efficiency. You can read all about it in Introducing Daft-SQL, where Cory Grinstead describes why and how to use the new API and provides some tips & tricks for easily switching between the DataFrame and SQL APIs.
Cory has added several new features to Daft-SQL since the release, including support for IN
(#3086), DISTINCT
(#3087), BETWEEN
(#3062), and AS
(#3066). Here’s an example query utilizing some of these operators:
SELECT
DISTINCT * EXCLUDE(n),
n as num
FROM df
WHERE n BETWEEN 0 AND 100;
Ronnie has also implemented SQL functionality in our Daft Launcher (#13), where it can take a raw SQL string and run it directly on a Ray cluster. This makes interfacing with the Daft-SQL API in a distributed setting way easier for end users. Previously, users would need to create a Python script containing some internal boilerplate and then use the daft submit
API to try out their SQL. Now, all of that boilerplate has been abstracted away! Here’s an example:
# Run a SQL query using the default .daft.toml configuration file
daft sql -- "\\"SELECT * FROM my_table\\""
# Or, if you want, establish the port-forward using a custom configuration file
daft sql -c my-custom-config.toml -- "\\"SELECT * FROM my_table\\""
Daft Launcher 🚀
Ronnie has made several other improvements to Daft Launcher as well—he’s been working on adding instance profiles, improving the user flow, and increasing the robustness of data deserialization and serialization (#24).
Instance profiles allow end users to further abstract away machine details. Instead of having to specify an AWS AMI, AWS instance type, etc., users can now just specify template = "light"
, template = "normal"
, or template = "gpus"
in order to use preconfigured specifications. We know that getting started with Daft should be as simple as possible, and this allows users to not worry about any lower-level details that shouldn’t be of concern to them.
Continuing with the theme of improving the user experience, submitting jobs via the CLI has been simplified. Previously, users needed to invoke a verbose submission command (daft submit --working-dir $WORKING_DIR -- command ...
). Now, you can place a list of jobs inside your .daft.toml
file and submit those jobs instead (e.g., daft submit my-cool-job
, where my-cool-job
is a table entry inside .daft.toml
).
Errors are also now much more transparent to end users. During config file deserialization, any errors (missing keys, unexpected keys, wrong-type keys, etc.) are immediately caught and printed to the console, alongside the line number and location of the error! This will help end users quickly find out what they’re doing wrong and how to fix it.
Stateful UDFs 🐍
Kevin has added more functionality to Daft’s experimental stateful user-defined functions (UDFs), which allow you to run Python UDFs that maintain state between calls. This is particularly useful for running ML models on Daft, since it means you can initialize your model weights on the GPU once and use them on the entire DataFrame. In addition, with PR #3002 merged in, GPU workflows on Daft have become even easier, with CUDA_VISIBLE_DEVICES
automatically set per UDF actor.
Project Swordfish now also supports stateful UDFs (#3127), giving users the ability to take advantage of its improved memory and performance when running stateful UDFs locally.
Smaller Updates ✏️
We’ve made a couple of smaller updates this month as well—you can learn about them here.
Include File Path as Column
We recently added support for including the file path as a new column when reading from Parquet, CSV, and JSON files (#2953) via the file_path_column: str | None
parameter. This works by appending a column of the file path to the Table with the name provided in the file_path_column
argument.
This feature is particularly useful for tracking data lineage, debugging data quality issues, and performing analyses based on file organization patterns. For example, if your data is organized by region (region_1/data.parquet
, region_2/data.parquet
), you can easily aggregate metrics by region using the file path information. The column is automatically populated during the read operation, adding minimal overhead to your data processing pipeline. When the parameter is None
(the default), no file path column is added, maintaining backward compatibility with existing code.
Standard Deviation
Ronnie has implemented global standard deviation (#3005), so you can now run a standard deviation aggregation on a numerical column. This is useful for end users who have a column of a numerical type in which the data is normally distributed, for which it’s required to know the spread of the data.
Daft Developments ⚙️
Project Swordfish 🗡️🐟
We’ve made lots of updates to Project Swordfish, our new local streaming execution engine! This month, we’ve been focused on completing feature parity of Swordfish with our existing engine. We implemented concat
(#2976), explode
(#3077), pivot
(#3081), unpivot
(#3078), and sample
(#3079). Additionally, one of the larger Swordfish feature updates was implementing outer joins (#2860), completing the full set of joins that Daft supports, in addition to inner, left, right, anti, and semi joins.
Colin finished adding streaming writes for Parquet and CSV (#2992) and streaming catalog writes for Iceberg and Delta Lake (#2966), which will make Swordfish a fully end-to-end streaming execution engine. Get ready for much lower memory usage on your ETL workloads 🐟
Parallel Local CSV Reader 🔍
Desmond added a parallel CSV reader to speed up the ingestion of local and uncompressed CSV files (#3055). The main idea behind this reader is to split CSV files into roughly 4 MiB chunks, then use a CSV state machine to adjust the start and end positions of each chunk so that their boundaries align with CSV record boundaries. This allows us to decode these chunks independently in parallel, without having to stitch the results together.
At the same time, we used buffer pools for both file reads and CSV record decoding. Minimizing memory allocations through the buffer pools accounted for most of the performance gains.
To verify the soundness of this approach, we can consider a simple case of reading and performing .collect()
on a 5 GB CSV file with 10^8 rows. We’re currently seeing a 12x speedup on CSV reads for Project Swordfish.
In this example, the performance of this new CSV reader matches that of DuckDB’s, and it outperforms Spark by 5x!
Hive-Partitioned Reads 🗂️
Desmond recently completed support for Hive-partitioned data (#3029). This is a common partitioning strategy where a table is split into multiple files based on the permutation of partition keys and values, following which the partitioned files are organized in a hierarchy of folders so that each partition key is used in a single nested layer of folders. For example, if we partition a table via key1
and key2
, data files will be stored in <table_location>/key1=<partition value>/key2=<partition value>/*.{csv,json,parquet}
. When combined with filters on partition values, Hive partitioning allows us to speed up queries by skipping folders with partition values that lie outside the queries’ range.
Contributor Sync 📺
In October’s Contributor Sync, we took a quick look at major updates to Project Swordfish and Daft-SQL. We also talked about our new parallel local CSV reader, which has already achieved a 5x performance improvement for our non-native executor and an impressive 12x improvement for our native executor.
For October’s Data Topic Deep Dive, Cory walked through the necessary components for building a SQL engine, such as a tokenizer, parser, query planner, and query optimizer, and how we built one on top of Daft’s existing query engine. In the spirit of Halloween, he showcased his Daft-SQL work with a live demo searching for the most haunted place in the US.
Community Events 👥
Ray Summit 🌁
The team had a blast at Ray Summit 2024, learning from the developer audience about their Ray use cases and introducing how to use Daft for distributed DataFrame workloads at scale with Ray, for large-scale model batch inference on distributed GPU clusters, and for optimizing data curation with advanced clustering and deduplication. Our shirts definitely received a lot of love and attention and sparked some great conversations!
Jay Chia, Co-Founder of Daft, demonstrated using Daft in different scenarios for data analytics (using SQL), data engineering (running models and downloading data from URLs), and AI/ML engineering (fast data loading and launching clusters). Watch until the end, where he LIVE open-sourced our Daft Launcher.
Watch: An Overview of CloudKitchens’s Ray-Powered ML Platform
CloudKitchens shared their ML Platform 2.0, calling it their ✨DREAM✨ stack—built on Daft, Ray, poEtry, Argo, and Metaflow. Starting at 18:40, software engineer Ammar Alrashed shared why Daft was their DataFrame API of choice and showcased some of Daft’s highlights, such as distributed job execution, advanced query optimization, being built on Apache Arrow, and most importantly our very active community!
In a quick lightning talk, data scientist Garrett Weaver shared how City Storage Systems (CloudKitchens) transformed their data processing and ETL workflows, moving from traditional Spark clusters to a more unified and efficient ecosystem built on Daft and Ray for end-to-end machine learning pipelines. He went on to give a very honest review, critiquing and highlighting the benefits and drawbacks of both Daft and Apache Spark.
Meetups 🧑🏻💻
Upcoming
Apache Iceberg Community Meetup
Monday, November 4 | 5pm - 8pm
We’re partnering with AWS, Snowflake, and the Apache Iceberg Community to co-host the next Bay Area Apache Iceberg Community Meetup on Monday, November 4 in San Francisco! We have a very exciting agenda featuring speakers from Daft, AWS, RisingWave, Netflix, and HANSETAG sharing their experiences and developments with Apache Iceberg — such as implementing Iceberg in a distributed fashion, improvements in Iceberg FileIO, building an Iceberg connector in Rust, Netflix’s journey from Hive to Iceberg, and building a Rust-native modular Iceberg Rest Catalog.
Lessons from Building Iceberg Capabilities in Daft, a Distributed Query Engine
Kevin will be walking through how we adapted PyIceberg for distributed workloads and built features like partitioned writes into Daft, and sharing some challenges and workarounds with using existing Python/Rust Iceberg tooling. Learn what it means for an Iceberg library to provide useful abstractions while giving the query engine proper control over execution, and which of the API interfaces we propose enable that.
Past Recordings
Watch: Life After Apache Spark: Why & How We're Building the Daft Query Engine
Back in September, Colin and Desmond presented at the SF Systems Meetup, introducing Apache Spark’s origins, strengths, and limitations as well as sharing the design decisions behind Daft’s local execution model that enable it to outperform Spark.
On October 8, we hosted our very first meetup: Multimodal Data with Modern Tools! A huge thank you to our guest speakers, Stu Stewart and Paul George from Twelve Labs, Chang She from LanceDB, and Ramesh Chandra from Databricks, for sharing their modern solutions to handling complexities related to multimodal data. Videos of the presentations are available at the links below!
Sammy Sidhu, CEO & Co-Founder of Daft, discussed challenges that traditional query engines face when processing multimodal data and revealed how we designed Daft to solve many of these problems in a distributed fashion. Discover why we chose Rust to power our fast, distributed Python query engine and how to unlock new workloads and possibilities for multimodal data by leveraging Daft!
Watch: Distributed Data Tools Should Be Easy to Use: Entreaties from an End User
Twelve Labs’ Stu Stewart, Head of ML/AI Engineering, and Paul George, Senior Staff ML Engineer, discussed their recent work prototyping Ray Serve as a foundation for scaling multimodal AI application inference workloads.
Chang She, CEO and Co-Founder of LanceDB, introduced the main challenges that AI data poses, how the Lance columnar format works, and the value it delivers to AI teams training models or putting applications into production.
Ramesh Chandra, Principal Engineer at Databricks building Unity Catalog and Governance, discussed some of the main benefits of Unity Catalog, which is multimodal, provides interoperability across lakehouse formats like Delta Lake and Iceberg and various compute engines, and comes with built-in governance and security.
That same week, we also hosted From Data to Deployment: Lessons from AI Founders, a panel discussion with AI startup founders on the pivotal role of data in taking their AI models from concept to deployment.
Featuring Sammy Sidhu (Eventual), Pablo Palafox (Happyrobot), Chris Van Dyke (Overview), Peter Fishman (Mozart Data), and our moderator Shruti Gandhi (Array Ventures), the panel covered topics such as challenges with managing data, data infrastructure, data governance, product pricing, and more.
We’re aiming to host monthly meetups and bring together the developer community—subscribe to our Daft Luma calendar for updates! https://lu.ma/daft
Join Our Team 🤝
Our team is growing! At Eventual, we’re dedicated to fostering innovation, collaboration, and growth. We’re looking for passionate individuals with a strong sense of intellectual curiosity who are eager to make an impact and contribute to our mission.
If you thrive in a dynamic environment and want to be part of a supportive community, we’d love to hear from you. To learn more about what we’re building at Daft and how you can help, head to our Careers Page and apply today!
If you haven’t already, join our Distributed Data Community Slack and star our Daft GitHub repo! Engage in technical conversations with our engineers about Daft developments and stay in the know about the latest Daft news and updates.
Join us for the next Daft Monthly Contributor Sync on Thursday, November 21. Add it to your calendar! → https://bit.ly/DaftContributorSync