Skip to content

Conversation

@Tushar7012
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Currently, nesting asynchronous User-Defined Functions (e.g., my_async_udf(my_async_udf(col))) causes an internal error: "async functions should not be called directly". This happens because AsyncFuncExec and the physical planner treat all async expressions as independent and parallelizable against the initial input batch. When one async UDF depends on the output of another, the dependency chain is invalid, and the inner UDF's output is not available when the outer UDF tries to execute.

This PR updates the planning and execution logic to correctly handle these dependencies by processing async expressions sequentially when necessary and ensuring intermediate results are available for subsequent expressions.

What changes are included in this PR?

  • AsyncMapper::find_and_map: Updated to perform a bottom-up transformation. It now dynamically extends a temporary schema with the output fields of inner async functions, allowing outer functions to be correctly planned against those intermediate columns.
  • AsyncFuncExec::execute: Modified to process async expressions incrementally. The record batch is updated with the results of each async call before being passed to the next one, ensuring that subsequent expressions can access the results of previous ones.
  • AsyncFuncExec::try_new: Updated schema validation logic to mirror the execution path. It now verifies expressions against an incrementally built schema rather than validating all expressions against the static input schema.
  • ProjectionMapping: Logic added to exclude columns created by inner async UDFs from the ProjectionMapping validation, as these columns do not exist in the original input schema.

Are these changes tested?

Yes. I added a new regression test test_nested_async_udf in datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs.

  • The test defines a mock async UDF.
  • It executes a query with nested calls: SELECT test_async_udf(test_async_udf(prompt)) ....
  • It asserts that the query executes successfully and produces the correct result (previously it panicked).

Are there any user-facing changes?

  • No breaking API changes.
  • Bug Fix: Users can now nest asynchronous UDFs in their SQL queries without encountering internal errors.

Copilot AI review requested due to automatic review settings January 28, 2026 06:18
@github-actions github-actions bot added core Core DataFusion crate catalog Related to the catalog crate datasource Changes to the datasource crate physical-plan Changes to the physical-plan crate labels Jan 28, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes planning/execution of nested async scalar UDFs so dependent async expressions are evaluated in the correct order and can reference intermediate results.

Changes:

  • Plan async expressions bottom-up and rewrite nested async UDF calls into intermediate columns.
  • Execute async UDFs sequentially per batch, updating the intermediate batch as each async result becomes available.
  • Refactor listing-table file discovery APIs to pass ConfigOptions / RuntimeEnv explicitly and introduce JoinSet-based parallelization for multi-path listings.

Reviewed changes

Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
datafusion/physical-plan/src/async_func.rs Incremental async-field schema validation; sequential per-batch async evaluation; async expr mapping adjustments.
datafusion/core/src/physical_planner.rs Switch to bottom-up async expression mapping via new find_and_map.
datafusion/core/tests/user_defined/user_defined_async_scalar_functions.rs Adds regression test for nested async UDF execution and adjusts assertions.
datafusion/datasource/src/url.rs Refactors ListingTableUrl listing APIs to accept ConfigOptions/RuntimeEnv instead of Session.
datafusion/catalog-listing/src/helpers.rs Updates pruned_partition_list to use new listing API parameters.
datafusion/catalog-listing/src/table.rs Updates calls to new listing APIs; introduces JoinSet-based parallel listing for non-WASM.
datafusion/catalog-listing/src/options.rs Updates schema/statistics inference listing calls to new listing APIs.
datafusion/core/src/datasource/listing/table.rs Updates listing-table config extension to use new listing APIs.
datafusion/core/tests/catalog_listing/pruned_partition_list.rs Updates tests for new pruned_partition_list signature.
datafusion/catalog-listing/Cargo.toml Adds tokio dependency for JoinSet usage.
Cargo.lock Updates lockfile for new dependency edge.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 726 to 740
let mut join_set = JoinSet::new();
let config = ctx.config_options().clone();
let runtime_env = Arc::clone(ctx.runtime_env());
let file_extension = self.options.file_extension.clone();
let partition_cols = self.options.table_partition_cols.clone();
let filters = filters.to_vec();

for table_path in &self.table_paths {
let store = Arc::clone(&store);
let table_path = table_path.clone();
let config = config.clone();
let runtime_env = Arc::clone(&runtime_env);
let file_extension = file_extension.clone();
let partition_cols = partition_cols.clone();
let filters = filters.clone();
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JoinSet branch clones the entire ConfigOptions (let config = ctx.config_options().clone()) and then clones it again for every spawned task. ConfigOptions can be fairly large, so this can add unnecessary allocation/copy overhead.

Consider wrapping the cloned ConfigOptions in an Arc once and cloning the Arc into tasks (passing &*arc_config), or otherwise sharing immutable config across tasks without repeated deep clones.

Copilot uses AI. Check for mistakes.
Comment on lines 245 to 257
let mut output_arrays = Vec::with_capacity(async_exprs_captured.len());
let input_columns = batch.columns().len();

for (i, async_expr) in async_exprs_captured.iter().enumerate() {
// Create a batch with the input columns and the async columns evaluated so far
// We need to construct a schema for this intermediate batch
let current_schema_fields: Vec<_> = schema_captured.fields().iter().take(input_columns + i).cloned().collect();
let current_schema = Arc::new(Schema::new(current_schema_fields));

let mut current_columns = batch.columns().to_vec();
current_columns.extend_from_slice(&output_arrays);

let current_batch = RecordBatch::try_new(current_schema, current_columns)?;
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncFuncExec::execute now rebuilds an intermediate Schema and re-clones the input columns vector on every async expression (current_schema_fields + current_columns). This makes per-batch work grow roughly O(#async_exprs^2) and can be a noticeable regression for projections with multiple async UDFs.

Consider maintaining an incremental current_fields / current_columns (starting from the input batch) and appending each evaluated async column in-place, creating the next intermediate RecordBatch from those incrementally-updated structures. This avoids repeatedly copying the input columns and re-collecting schema fields each iteration.

Copilot uses AI. Check for mistakes.
Comment on lines 91 to 99
if let Some(col) = e.as_any().downcast_ref::<Column>() {

if col.index() >= input_len {
refers_to_new_cols = true;
}
}
Ok(TreeNodeRecursion::Continue)
})?;

Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ProjectionMapping construction scans each expr.func tree to detect references to newly-created async columns, but it never short-circuits once such a column is found. Since this runs for every async expression, it can add unnecessary traversal overhead.

You can return TreeNodeRecursion::Stop once refers_to_new_cols becomes true (or check the flag early in the visitor) to avoid walking the rest of the subtree.

Suggested change
if let Some(col) = e.as_any().downcast_ref::<Column>() {
if col.index() >= input_len {
refers_to_new_cols = true;
}
}
Ok(TreeNodeRecursion::Continue)
})?;
// Short-circuit traversal once we know this expression
// refers to newly created async columns
if refers_to_new_cols {
return Ok(TreeNodeRecursion::Stop);
}
if let Some(col) = e.as_any().downcast_ref::<Column>() {
if col.index() >= input_len {
refers_to_new_cols = true;
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(TreeNodeRecursion::Continue)
})?;

Copilot uses AI. Check for mistakes.
Comment on lines 724 to 755
#[cfg(not(target_arch = "wasm32"))]
let file_list = {
let mut join_set = JoinSet::new();
let config = ctx.config_options().clone();
let runtime_env = Arc::clone(ctx.runtime_env());
let file_extension = self.options.file_extension.clone();
let partition_cols = self.options.table_partition_cols.clone();
let filters = filters.to_vec();

for table_path in &self.table_paths {
let store = Arc::clone(&store);
let table_path = table_path.clone();
let config = config.clone();
let runtime_env = Arc::clone(&runtime_env);
let file_extension = file_extension.clone();
let partition_cols = partition_cols.clone();
let filters = filters.clone();

join_set.spawn(async move {
let stream = pruned_partition_list(
&config,
&runtime_env,
store.as_ref(),
&table_path,
&filters,
&file_extension,
&partition_cols,
)
.await?;
stream.try_collect::<Vec<_>>().await
});
}
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new JoinSet-based implementation spawns one task per table_path with no explicit bound. With many table paths this can oversubscribe the runtime and/or hammer the object store.

Consider bounding the number of concurrent pruned_partition_list tasks (e.g., using a semaphore / buffer_unordered, or reuse an existing concurrency setting) so listing parallelism remains controlled.

Copilot uses AI. Check for mistakes.
Comment on lines 718 to 729
// For non-WASM targets, use parallel execution with JoinSet.
// Note: This implementation collects files into memory per table_path rather than
// streaming lazily. This is a trade-off required because JoinSet tasks need 'static
// lifetime, which prevents returning borrowed streams directly. For most use cases,
// the parallelization benefit outweighs the temporary memory overhead. The WASM
// fallback below preserves streaming behavior for memory-constrained environments.
#[cfg(not(target_arch = "wasm32"))]
let file_list = {
let mut join_set = JoinSet::new();
let config = ctx.config_options().clone();
let runtime_env = Arc::clone(ctx.runtime_env());
let file_extension = self.options.file_extension.clone();
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is described as fixing nested async UDF execution, but it also introduces a fairly broad refactor of listing-table file discovery (API changes in ListingTableUrl::list_* / pruned_partition_list) and changes listing concurrency behavior (JoinSet).

If these changes aren’t strictly required for the async UDF fix, please consider splitting them into a separate PR (or documenting why they’re coupled) to keep the review surface and risk contained.

Copilot uses AI. Check for mistakes.
@Tushar7012 Tushar7012 force-pushed the fix/nested-async-udfs branch from bec3299 to fb35596 Compare January 28, 2026 08:11
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jan 28, 2026
/// using only the partition columns.
pub async fn pruned_partition_list<'a>(
ctx: &'a dyn Session,
config: &'a ConfigOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! These changes are not related to the async UDF fix. My branch was accidentally based on top of another feature branch (parallelize-list-files-for-scan) instead of main. I'll rebase this PR to only include the commits relevant to fixing nested async UDF execution. Thanks for pointing this out!

@Tushar7012
Copy link
Contributor Author

Tushar7012 commented Jan 28, 2026

Hi @Jefffrey , all CI checks are passing now. Ready for review when you have a chance. Thanks!

@Jefffrey
Copy link
Contributor

Hi @Jefffrey , all CI checks are passing now. Ready for review when you have a chance. Thanks!

This PR still contains unrelated changes.

@Tushar7012
Copy link
Contributor Author

Hey @Jefffrey all the checks are passed. Ready to review.

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Jefffrey all the checks are passed. Ready to review.

Please read my comment. It has not been addressed.

- Updated AsyncMapper to perform bottom-up mapping with schema extension
- Updated AsyncFuncExec to process expressions incrementally and pass intermediate results
- Updated DefaultPhysicalPlanner to use correct planning logic
- Added regression test case for nested async UDFs
- Remove unused import datafusion::prelude::*

- Remove unused import assert_batches_eq

- Use inline format variable in panic! macro
@Tushar7012 Tushar7012 force-pushed the fix/nested-async-udfs branch from 4eb1826 to d9928f1 Compare January 29, 2026 07:00
@github-actions github-actions bot removed catalog Related to the catalog crate datasource Changes to the datasource crate labels Jan 29, 2026
@Jefffrey Jefffrey self-requested a review January 29, 2026 13:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Nesting async UDF calls causes an internal error

2 participants