Skip to content

Conversation

@quasiben
Copy link
Member

No description provided.

@quasiben quasiben requested review from a team as code owners November 12, 2025 15:28
@quasiben quasiben changed the title [WIP] Q17 Q17 Dec 6, 2025
@quasiben quasiben added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Dec 6, 2025
);
}

rapidsmpf::streaming::Node filter_part(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the filter can be done in the read_parquet with something similar to https://github.com/rapidsai/rapidsmpf/pull/710/changes#diff-9743b2e766c061cb2f29e446eb28ac8761389f601474ecfd11cac9971deb81f7R262-R343. I can try implementing that if you'd like, since I just did it for query 4.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ed723dc

Comment on lines 178 to 184
// Select specific columns from the input table
rapidsmpf::streaming::Node select_columns(
std::shared_ptr<rapidsmpf::streaming::Context> ctx,
std::shared_ptr<rapidsmpf::streaming::Channel> ch_in,
std::shared_ptr<rapidsmpf::streaming::Channel> ch_out,
std::vector<cudf::size_type> indices
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this column selection should be a part of post processing in the read_parquet node. Having a node to this immediately after a read_parquet seems a bit redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could apply the selection here

cudf::io::read_parquet(options, stream, ctx->br()->device_mr()).tbl, stream

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we might not need to do the column copy in L204

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about this too, but I wasn't sure whether cudf had a built-in way of saying "read these columns for the filter, but don't include them in the output". If it doesn't have a way of doing that, then I agree doing it after the read but before the return is probably worth doing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomAugspurger I think the filter is for rows. It's more of a post-processing step in read_parquet node AFAIU.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it's a row filter. But in some cases (like this one) we only need a column for the filter. If cudf doesn't have a way to include a column for the purpose of filtering, but exclude it from the result table, then we can update our wrapper to perform that selection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do it in the read_parquet by using column_name_reference in the filter rather than column_reference.

Comment on lines 248 to 257
std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs;
sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>());
std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs;
count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>());
requests.push_back(
cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs))
);
requests.push_back(
cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs))
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be simplified as follows, since both sum and count are happening on col 1.

Suggested change
std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs;
sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>());
std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs;
count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>());
requests.push_back(
cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs))
);
requests.push_back(
cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs))
);
std::vector<std::unique_ptr<cudf::groupby_aggregation>> aggs;
aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>());
aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>());
requests.push_back(
cudf::groupby::aggregation_request(table.column(1), std::move(aggs))
);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then results will have both sum and count in a single vector

Comment on lines 262 to 265
std::vector<std::unique_ptr<cudf::column>> result;
result.push_back(std::move(keys->release()[0]));
result.push_back(std::move(results[0].results[0])); // sum
result.push_back(std::move(results[1].results[0])); // count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Q09 we push back to the vector given by keys->release()

        auto result = keys->release();
        for (auto&& r : results) {
            std::ranges::move(r.results, std::back_inserter(result));
        }

I think this is a neater way

auto chunk_stream = chunk.stream();
auto table = chunk.table_view();

if (!table.is_empty() && table.num_columns() >= 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there be a table.num_columns() != 4 scenario?

Comment on lines 344 to 355
auto sum_scalar = cudf::make_numeric_scalar(
cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr()
);
static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get())
->set_value(local_sum, chunk_stream);

std::vector<std::unique_ptr<cudf::column>> result_cols;
result_cols.push_back(
cudf::make_column_from_scalar(
*sum_scalar, 1, chunk_stream, ctx->br()->device_mr()
)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use a rmm::device_uvector here. This will avoid an extra allocation IINM.

Suggested change
auto sum_scalar = cudf::make_numeric_scalar(
cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr()
);
static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get())
->set_value(local_sum, chunk_stream);
std::vector<std::unique_ptr<cudf::column>> result_cols;
result_cols.push_back(
cudf::make_column_from_scalar(
*sum_scalar, 1, chunk_stream, ctx->br()->device_mr()
)
);
rmm::device_uvector<double> vec(1, chunk_stream, ctx->br()->device_mr());
vec.set_element_async(0, local_sum, chunk_stream);
std::vector<std::unique_ptr<cudf::column>> result_cols {
std::make_unique<cudf::column>(std::move(vec), {}, 0)
};

Comment on lines 455 to 466
auto sum_val =
static_cast<cudf::numeric_scalar<double>*>(
cudf::reduce(
local_result->view().column(0),
*cudf::make_sum_aggregation<cudf::reduce_aggregation>(),
cudf::data_type(cudf::type_id::FLOAT64),
chunk_stream,
ctx->br()->device_mr()
)
.get()
)
->value(chunk_stream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the same as local_sum in a single rank case, isnt it? Seems a bit redundant

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think we can remove the if (local_result) {...} else {...} branch and simply use the local_sum value here.

Comment on lines 468 to 482
auto avg_yearly_val = sum_val / 7.0;
auto avg_yearly_scalar = cudf::make_numeric_scalar(
cudf::data_type(cudf::type_id::FLOAT64),
chunk_stream,
ctx->br()->device_mr()
);
static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get())
->set_value(avg_yearly_val, chunk_stream);

std::vector<std::unique_ptr<cudf::column>> result_cols;
result_cols.push_back(
cudf::make_column_from_scalar(
*avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr()
)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need a util to create a column from a single value of type T

Comment on lines 401 to 414
auto avg_yearly_scalar = cudf::make_numeric_scalar(
cudf::data_type(cudf::type_id::FLOAT64),
chunk_stream,
ctx->br()->device_mr()
);
static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get())
->set_value(avg_yearly_val, chunk_stream);

std::vector<std::unique_ptr<cudf::column>> result_cols;
result_cols.push_back(
cudf::make_column_from_scalar(
*avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr()
)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to a separate util

Comment on lines 872 to 877
[](
std::shared_ptr<rapidsmpf::streaming::Context> ctx,
std::shared_ptr<rapidsmpf::streaming::Channel> ch_in,
std::shared_ptr<rapidsmpf::streaming::Channel> ch_out,
rapidsmpf::OpID tag
) -> rapidsmpf::streaming::Node {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this to a separate method

nirandaperera and others added 4 commits December 9, 2025 23:48
@TomAugspurger
Copy link
Contributor

TomAugspurger commented Dec 11, 2025

@nirandaperera I pushed a couple changes

  1. 0c55117 has Lawrence's suggestion to use column_name_reference in the read_parquet filter. This lets us exclude those filter columns from the result table, so we can remove the projection node after the read_parquet
  2. f513b47 fixes some compilation errors. You might want to take a close look.

I noticed that we're getting a different (presumably incorrect) result now. I'm not sure how long it's been like this. Running this script:

Details
/*

# rapidsmpf
./cpp/build/benchmarks/ndsh/q17 --input-directory /datasets/toaugspurger/tpch-rs/scale-10 --output-file /tmp/q17.parquet

# duckdb
duckdb < q17.sql

# validate
diff -y (uvx parquet-tools show /tmp/q17.parquet | psub) (uvx parquet-tools show duckdb-q17.parquet | psub)
*/
COPY (
    select
        round(sum(l_extendedprice) / 7.0, 2) as avg_yearly
    from
        read_parquet('/datasets/toaugspurger/tpch-rs/scale-10/lineitem/*.parquet') as lineitem,
        read_parquet('/datasets/toaugspurger/tpch-rs/scale-10/part/*.parquet') as part
    where
        p_partkey = l_partkey
        and p_brand = 'Brand#23'
        and p_container = 'MED BOX'
        and l_quantity < (
            select
                0.2 * avg(l_quantity)
            from
                read_parquet('/datasets/toaugspurger/tpch-rs/scale-10/lineitem/*.parquet') as lineitem
            where
                l_partkey = p_partkey
        )
)
TO 'duckdb-q17.parquet'
(FORMAT 'parquet')
;

shows the different result for avg_yearly:

❯ diff -y (uvx parquet-tools show /tmp/q17.parquet | psub) (uvx parquet-tools show duckdb-q17.parquet | psub)                                                                                                                                                                       (rapidsmpf-dev) 
+--------------+                                                +--------------+
|   avg_yearly |                                                |   avg_yearly |
|--------------|                                                |--------------|
|       223947 |                                              | |  3.29549e+06 |
+--------------+                                                +--------------+

@TomAugspurger
Copy link
Contributor

I checked out an earlier commit (c21cb30) and confirmed that it gets the same (probably incorrect) 223947 as HEAD. So the recent changes didn't cause a regression there.

@quasiben
Copy link
Member Author

Shoot! I was testing against SF3K and thought was getting the correct results. I'll double check

@TomAugspurger
Copy link
Contributor

It's also possible I've messed up the expected result. I haven't looked carefully.

@quasiben
Copy link
Member Author

@TomAugspurger and I went through a variety of scale factors: 10, 100, 1K, 3K and validated that all worked multi-gpu. However, with a single GPU run the values were incorrect. We suspect something my be incorrect with reading/distributing parquet files or assumption with distribution are only valid for N+1 ranks

@quasiben
Copy link
Member Author

@TomAugspurger I think I resolved the issue in f67178f -- carved out a special case for single rank computing

Resolved conflict in cpp/benchmarks/streaming/ndsh/CMakeLists.txt by
combining query lists from both branches (q01, q03, q09, q17).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

improvement Improves an existing functionality non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants