Enable Swordfish for Local Execution, Tracing for Daft Distributed Queries, and Pre-Shuffle Merge Strategy
The latest November updates on visualizing Daft distributed, completing TPC-H and TPC-DS Benchmarks, and highlighting our open source contributors!
Daft Feature Updates 💡
Project Swordfish 🗡️🐟
You can now enable Swordfish as the top-level runner with daft.context.set_runner_native()
(in code) or DAFT_RUNNER = NATIVE
(via environment variable)! The new local execution engine will become the default in version 0.4.0, featuring streaming-first design to support datasets larger than available RAM and morsel-driven parallelism for optimal resource utilization. We’ve already seen enhanced single-node performance and improved memory management and resource utilization.
Daft-SQL 📄
More SQL support has been added! Daft now supports CROSS JOIN
(#3110), common table expressions (CTEs) (#3137), CONCAT
and STDDEV
(#3153), EXTRACT
(#3188), READ_CSV
(#3255), UNION/UNION ALL
and INTERSECT
(#3274), and HAVING
(#3364).
In the last month, we’ve made a big push to benchmark TPC-H and TPC-DS SQL queries to showcase Daft’s ability to efficiently handle complex queries and large-scale data processing. Completing these benchmarks will allow us to demonstrate Daft’s performance across diverse SQL workloads, highlighting its scalability and optimization for real-world enterprise data environments. So far, we have full SQL support for 12 of 22 TPC-H queries and 22 of 99 TPC-DS queries!
Daft on Ray 🚀
We built a trace tool for profiling Daft-on-Ray queries (#3113) that helps visualize Ray tasks to understand how they’re executed across Ray’s distributed cluster and identify bottlenecks, task latencies, and job failures. The tool is turned off by default, but it can be activated by setting DAFT_ENABLE_RAY_TRACING = 1
.
The RayRunner dispatch loop view gives us an idea of what the while
loop is doing, as well as providing overall Ray metrics like number of tasks in flight, number of cores available, etc. The Ray Task Execution view gives us a view of the Ray tasks that were scheduled, when they were completed, and their ResourceRequests. You can see both of those here:
A Stages view allows us to point to the tasks that were launched by each stage and also the related views for each node and their worker processes, which are updated at the end of every “wave” by polling a metrics actor.
Setting DAFT_ENABLE_RAY_TRACING = 1
when submitting jobs to a Ray cluster also enables producing custom Daft logs (#3406).
In addition, we added a mechanism for Daft to automatically detect when it is running inside a Ray Job by checking the $RAY_JOB_ID
environment variable (#3148). In that case, Daft defaults to using the RayRunner
, eliminating the need for users to manually call daft.context.set_runner_ray()
. Previously, Daft could not recognize Ray Jobs unless an explicit Ray command or ray.init()
was invoked, causing ray.is_initialized()
to return False
. This led to issues with using daft-launcher
, requiring users to manually set the runner, which was error-prone and unintuitive.
Daft Developments ⚙️
Project Swordfish 🗡️🐟
As mentioned earlier, you can now enable Swordfish as the top-level runner with daft.context.set_runner_native()
(in code) or DAFT_RUNNER = NATIVE
(via environment variable)! #3178 refactors Swordfish by:
Buffering scan tasks based on
num_parallel_tasks
, considering push-down limitsAdding
is_err
checks to handle dropped senders inwhile receiver.recv.await -> sender.send
loops, ensuring proper handling when consumers finish receiving data early, such as duringLimit
operations oriter(df)
We also implemented a single-threaded monotonically increasing ID sink with max_concurrency = 1
(#3180). This simplifies the implementation by using a running count of morsels’ lengths, as the operation is memory-bound with parallel threads bottlenecked by memory bandwidth, making multithreading unnecessary.
Lastly, we enabled an experimental pre-shuffle merge strategy to improve shuffle efficiency (#3191). Instead of fully materializing map-reduce tasks, it merges P
map partitions before the fanout stage, and merges are dynamically capped at 1 GB. This reduces the number of intermediate objects from M * N
to M / P * N
and lowers the overall memory overhead.
Tests so far show significant performance gains:
1000x1000 shuffle (100 MB partitions): Runtime decreased from 3:30 to 2:10 and peak memory usage from 5.5 GB to 3.1 GB
2000x2000 shuffle (100 MB partitions): Runtime decreased from 15:51 to 9:23 and peak memory usage from 16 GB to 5 GB
Smaller Updates ✏️
We also made several smaller updates in November, including:
Adding a
DaftCatalog
API to help cement Daft’s catalog data access patterns (#3036).Removing the
Int128
type and refactoringDecimal128
(#3143) to be backed by aDataArray
instead of aLogicalArray
. This PR also implemented math operations and comparison operations for Decimal types.Adding support for aggregation expressions that use multiple
AggExprs
(#3296), enabling expressions such assum("a") + sum("b")
ormean("a")/100
in aggregations.Enabling functionality for converting SQL queries with correlated subqueries into
LogicalPlans
(#3304). Although it does not yet add the ability to execute queries with correlated subqueries, this is the last large piece of support we needed on the SQL side for TPC-H queries.
Thank You Contributors 🙌
A huge thank you to this month’s open source contributors—your contributions are incredibly valuable!
@sunaysanghani contributed a section to the Daft documentation about temporal expressions (#2487), covering basic temporal operations, temporal component extractions, time zone operations, and temporal truncations. Read it here.
@conradsoon contributed a fix to ensure that an error is thrown for invalid
**
usage outside of folder segments (#3100). The main issue stemmed from the permissive behavior of theglobset
crate, which allows patterns like/tmp/**.csv
to be built without raising an error.@advancedxy added support for
INTERSECT
in the DataFrame and SQL APIs (#3134), which leverages support for null-safe equal in joins (#3161) and the SQL equivalent (#3166). #3161 enabled the logical plan join’son
condition to be null-safe aware, enabling translation of null-safe equal joins into physical plans. It also ensures that optimization rules, such as eliminating cross joins and pushing down filters, remain functional. Additionally, modifications were made to physical join operations to support null-safe equal joins, including hash and broadcast joins, though sort-merge joins (SMJs) are not supported yet. Glue code was added on the Python side to ensure that the entire pipeline works seamlessly, along with unit tests to validate functionality.@austin362667 added support for
INTERVAL
in SQL (#3146), enabling many TPC-DS workloads.@sagiahrac added support for more compact index data types in
FixedShapeSparseTensors
(#3149). Since the indices are limited by the total number of elements defined by the tensor’s shape, they can remain within this range. This optimization reduces memory usage without sacrificing the tensor’s functionality.@itzhakstern contributed
READ_CSV
(#3255), which adds functionality to read from a CSV file in a SQL query.@willvo2004 helped update several incomplete SQL documentation pages (#3298), for
AzureConfig
,GCSConfig
,HTTPConfig
,count_matches
,hash
,minhash
,normalize
,tokenize_decode
, andtokenize_encode
.@ConeyLiu adjusted the
REGEXP_REPLACE
expression (#3306) such that it setsREGEXP
toTRUE
. He also added support for floor division of integers (#3064). Previously, you had to perform true division, floor it, then cast from a float to an int, which may raise precision issues.
Contributor Sync 📺
In November’s Contributor Sync we reviewed some key updates, such as making Swordfish the default runner for development, adding even more support for SQL functions, and having a new DaftCatalog
API to cement Daft’s catalog data access patterns and allow for PyIceberg catalog registration and reading its tables..
For November’s Data Topic Deep Dive, Jay walked through the trace tool for profiling Daft-on-Ray queries. This tool gives insights into what’s happening under the hood in a query and provides metrics such as resource utilization, number of tasks in flight, number of cores available, etc.
Community Events 👥
On November 4, we co-hosted the Apache Iceberg Community Meetup with AWS and Snowflake, drawing 100+ attendees from over 200 registrations—a packed house with great conversations and three rounds of pizza! It was great to see so many Iceberg developers and enthusiasts gathering in one space to share learnings, best practices, and wishlist items for the Iceberg project. Thank you to our speakers for wonderfully informative presentations on their experiences using and developing with Apache Iceberg.
Watch: Lessons from Building Iceberg Capabilities in Daft, a Distributed Query Engine
Watch Kevin’s talk where he shared his experiences with building distributed Iceberg operations in Daft, such as adapting PyIceberg for distributed workloads and implementing features like partitioned writes in Daft. He also detailed challenges and workarounds we implemented when using existing Python and Rust Iceberg tooling and proposed API interfaces (such as building a single low-level API for core Iceberg logic) to enhance Iceberg abstractions for query engines.
We aim to host monthly meetups to bring together the developer community—subscribe to our Daft Luma calendar for updates: https://lu.ma/daft
If you’d like to co-host or speak at one of our meetups in San Francisco, please reach out to us on LinkedIn or Slack!
Join Our Team 🤝
Our team is growing! At Eventual, we’re dedicated to fostering innovation, collaboration, and growth. We’re looking for passionate individuals with a strong sense of intellectual curiosity who are eager to make an impact and contribute to our mission.
If you thrive in a dynamic environment and want to be part of a supportive community, we’d love to hear from you. To learn more about what we’re building at Daft and how you can help, head to our Careers Page and apply today!
If you haven’t already, join our Distributed Data Community Slack and star our Daft GitHub repo! Engage in technical conversations with our engineers about Daft developments and stay in the know about the latest Daft news and updates.
Join us in the new year for the next Daft Monthly Contributor Sync on Thursday, January 30, 2025! Be sure to add it to your calendar → https://bit.ly/DaftContributorSync