-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix(core): Correctly handle nested async UDF execution #20039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Fixes planning/execution of nested async scalar UDFs so dependent async expressions are evaluated in the correct order and can reference intermediate results.
Changes:
- Plan async expressions bottom-up and rewrite nested async UDF calls into intermediate columns.
- Execute async UDFs sequentially per batch, updating the intermediate batch as each async result becomes available.
- Refactor listing-table file discovery APIs to pass
ConfigOptions/RuntimeEnvexplicitly and introduce JoinSet-based parallelization for multi-path listings.
Reviewed changes
Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| datafusion/physical-plan/src/async_func.rs | Incremental async-field schema validation; sequential per-batch async evaluation; async expr mapping adjustments. |
| datafusion/core/src/physical_planner.rs | Switch to bottom-up async expression mapping via new find_and_map. |
| datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs | Adds regression test for nested async UDF execution and adjusts assertions. |
| datafusion/datasource/src/url.rs | Refactors ListingTableUrl listing APIs to accept ConfigOptions/RuntimeEnv instead of Session. |
| datafusion/catalog-listing/src/helpers.rs | Updates pruned_partition_list to use new listing API parameters. |
| datafusion/catalog-listing/src/table.rs | Updates calls to new listing APIs; introduces JoinSet-based parallel listing for non-WASM. |
| datafusion/catalog-listing/src/options.rs | Updates schema/statistics inference listing calls to new listing APIs. |
| datafusion/core/src/datasource/listing/table.rs | Updates listing-table config extension to use new listing APIs. |
| datafusion/core/tests/catalog_listing/pruned_partition_list.rs | Updates tests for new pruned_partition_list signature. |
| datafusion/catalog-listing/Cargo.toml | Adds tokio dependency for JoinSet usage. |
| Cargo.lock | Updates lockfile for new dependency edge. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let mut join_set = JoinSet::new(); | ||
| let config = ctx.config_options().clone(); | ||
| let runtime_env = Arc::clone(ctx.runtime_env()); | ||
| let file_extension = self.options.file_extension.clone(); | ||
| let partition_cols = self.options.table_partition_cols.clone(); | ||
| let filters = filters.to_vec(); | ||
|
|
||
| for table_path in &self.table_paths { | ||
| let store = Arc::clone(&store); | ||
| let table_path = table_path.clone(); | ||
| let config = config.clone(); | ||
| let runtime_env = Arc::clone(&runtime_env); | ||
| let file_extension = file_extension.clone(); | ||
| let partition_cols = partition_cols.clone(); | ||
| let filters = filters.clone(); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JoinSet branch clones the entire ConfigOptions (let config = ctx.config_options().clone()) and then clones it again for every spawned task. ConfigOptions can be fairly large, so this can add unnecessary allocation/copy overhead.
Consider wrapping the cloned ConfigOptions in an Arc once and cloning the Arc into tasks (passing &*arc_config), or otherwise sharing immutable config across tasks without repeated deep clones.
| let mut output_arrays = Vec::with_capacity(async_exprs_captured.len()); | ||
| let input_columns = batch.columns().len(); | ||
|
|
||
| for (i, async_expr) in async_exprs_captured.iter().enumerate() { | ||
| // Create a batch with the input columns and the async columns evaluated so far | ||
| // We need to construct a schema for this intermediate batch | ||
| let current_schema_fields: Vec<_> = schema_captured.fields().iter().take(input_columns + i).cloned().collect(); | ||
| let current_schema = Arc::new(Schema::new(current_schema_fields)); | ||
|
|
||
| let mut current_columns = batch.columns().to_vec(); | ||
| current_columns.extend_from_slice(&output_arrays); | ||
|
|
||
| let current_batch = RecordBatch::try_new(current_schema, current_columns)?; |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncFuncExec::execute now rebuilds an intermediate Schema and re-clones the input columns vector on every async expression (current_schema_fields + current_columns). This makes per-batch work grow roughly O(#async_exprs^2) and can be a noticeable regression for projections with multiple async UDFs.
Consider maintaining an incremental current_fields / current_columns (starting from the input batch) and appending each evaluated async column in-place, creating the next intermediate RecordBatch from those incrementally-updated structures. This avoids repeatedly copying the input columns and re-collecting schema fields each iteration.
| if let Some(col) = e.as_any().downcast_ref::<Column>() { | ||
|
|
||
| if col.index() >= input_len { | ||
| refers_to_new_cols = true; | ||
| } | ||
| } | ||
| Ok(TreeNodeRecursion::Continue) | ||
| })?; | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ProjectionMapping construction scans each expr.func tree to detect references to newly-created async columns, but it never short-circuits once such a column is found. Since this runs for every async expression, it can add unnecessary traversal overhead.
You can return TreeNodeRecursion::Stop once refers_to_new_cols becomes true (or check the flag early in the visitor) to avoid walking the rest of the subtree.
| if let Some(col) = e.as_any().downcast_ref::<Column>() { | |
| if col.index() >= input_len { | |
| refers_to_new_cols = true; | |
| } | |
| } | |
| Ok(TreeNodeRecursion::Continue) | |
| })?; | |
| // Short-circuit traversal once we know this expression | |
| // refers to newly created async columns | |
| if refers_to_new_cols { | |
| return Ok(TreeNodeRecursion::Stop); | |
| } | |
| if let Some(col) = e.as_any().downcast_ref::<Column>() { | |
| if col.index() >= input_len { | |
| refers_to_new_cols = true; | |
| return Ok(TreeNodeRecursion::Stop); | |
| } | |
| } | |
| Ok(TreeNodeRecursion::Continue) | |
| })?; |
| #[cfg(not(target_arch = "wasm32"))] | ||
| let file_list = { | ||
| let mut join_set = JoinSet::new(); | ||
| let config = ctx.config_options().clone(); | ||
| let runtime_env = Arc::clone(ctx.runtime_env()); | ||
| let file_extension = self.options.file_extension.clone(); | ||
| let partition_cols = self.options.table_partition_cols.clone(); | ||
| let filters = filters.to_vec(); | ||
|
|
||
| for table_path in &self.table_paths { | ||
| let store = Arc::clone(&store); | ||
| let table_path = table_path.clone(); | ||
| let config = config.clone(); | ||
| let runtime_env = Arc::clone(&runtime_env); | ||
| let file_extension = file_extension.clone(); | ||
| let partition_cols = partition_cols.clone(); | ||
| let filters = filters.clone(); | ||
|
|
||
| join_set.spawn(async move { | ||
| let stream = pruned_partition_list( | ||
| &config, | ||
| &runtime_env, | ||
| store.as_ref(), | ||
| &table_path, | ||
| &filters, | ||
| &file_extension, | ||
| &partition_cols, | ||
| ) | ||
| .await?; | ||
| stream.try_collect::<Vec<_>>().await | ||
| }); | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new JoinSet-based implementation spawns one task per table_path with no explicit bound. With many table paths this can oversubscribe the runtime and/or hammer the object store.
Consider bounding the number of concurrent pruned_partition_list tasks (e.g., using a semaphore / buffer_unordered, or reuse an existing concurrency setting) so listing parallelism remains controlled.
| // For non-WASM targets, use parallel execution with JoinSet. | ||
| // Note: This implementation collects files into memory per table_path rather than | ||
| // streaming lazily. This is a trade-off required because JoinSet tasks need 'static | ||
| // lifetime, which prevents returning borrowed streams directly. For most use cases, | ||
| // the parallelization benefit outweighs the temporary memory overhead. The WASM | ||
| // fallback below preserves streaming behavior for memory-constrained environments. | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| let file_list = { | ||
| let mut join_set = JoinSet::new(); | ||
| let config = ctx.config_options().clone(); | ||
| let runtime_env = Arc::clone(ctx.runtime_env()); | ||
| let file_extension = self.options.file_extension.clone(); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is described as fixing nested async UDF execution, but it also introduces a fairly broad refactor of listing-table file discovery (API changes in ListingTableUrl::list_* / pruned_partition_list) and changes listing concurrency behavior (JoinSet).
If these changes aren’t strictly required for the async UDF fix, please consider splitting them into a separate PR (or documenting why they’re coupled) to keep the review surface and risk contained.
bec3299 to
fb35596
Compare
| /// using only the partition columns. | ||
| pub async fn pruned_partition_list<'a>( | ||
| ctx: &'a dyn Session, | ||
| config: &'a ConfigOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes related?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! These changes are not related to the async UDF fix. My branch was accidentally based on top of another feature branch (parallelize-list-files-for-scan) instead of main. I'll rebase this PR to only include the commits relevant to fixing nested async UDF execution. Thanks for pointing this out!
|
Hi @Jefffrey , all CI checks are passing now. Ready for review when you have a chance. Thanks! |
This PR still contains unrelated changes. |
|
Hey @Jefffrey all the checks are passed. Ready to review. |
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Jefffrey all the checks are passed. Ready to review.
Please read my comment. It has not been addressed.
- Updated AsyncMapper to perform bottom-up mapping with schema extension - Updated AsyncFuncExec to process expressions incrementally and pass intermediate results - Updated DefaultPhysicalPlanner to use correct planning logic - Added regression test case for nested async UDFs
- Remove unused import datafusion::prelude::* - Remove unused import assert_batches_eq - Use inline format variable in panic! macro
4eb1826 to
d9928f1
Compare
Which issue does this PR close?
Rationale for this change
Currently, nesting asynchronous User-Defined Functions (e.g.,
my_async_udf(my_async_udf(col))) causes an internal error:"async functions should not be called directly". This happens because AsyncFuncExec and the physical planner treat all async expressions as independent and parallelizable against the initial input batch. When one async UDF depends on the output of another, the dependency chain is invalid, and the inner UDF's output is not available when the outer UDF tries to execute.This PR updates the planning and execution logic to correctly handle these dependencies by processing async expressions sequentially when necessary and ensuring intermediate results are available for subsequent expressions.
What changes are included in this PR?
AsyncMapper::find_and_map: Updated to perform a bottom-up transformation. It now dynamically extends a temporary schema with the output fields of inner async functions, allowing outer functions to be correctly planned against those intermediate columns.AsyncFuncExec::execute: Modified to process async expressions incrementally. The record batch is updated with the results of each async call before being passed to the next one, ensuring that subsequent expressions can access the results of previous ones.AsyncFuncExec::try_new: Updated schema validation logic to mirror the execution path. It now verifies expressions against an incrementally built schema rather than validating all expressions against the static input schema.ProjectionMapping: Logic added to exclude columns created by inner async UDFs from theProjectionMappingvalidation, as these columns do not exist in the original input schema.Are these changes tested?
Yes. I added a new regression test test_nested_async_udf in datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs.
SELECT test_async_udf(test_async_udf(prompt)) ....Are there any user-facing changes?