-
Notifications
You must be signed in to change notification settings - Fork 28
Q17 #656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Q17 #656
Conversation
| ); | ||
| } | ||
|
|
||
| rapidsmpf::streaming::Node filter_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the filter can be done in the read_parquet with something similar to https://github.com/rapidsai/rapidsmpf/pull/710/changes#diff-9743b2e766c061cb2f29e446eb28ac8761389f601474ecfd11cac9971deb81f7R262-R343. I can try implementing that if you'd like, since I just did it for query 4.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in ed723dc
| // Select specific columns from the input table | ||
| rapidsmpf::streaming::Node select_columns( | ||
| std::shared_ptr<rapidsmpf::streaming::Context> ctx, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_in, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_out, | ||
| std::vector<cudf::size_type> indices | ||
| ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this column selection should be a part of post processing in the read_parquet node. Having a node to this immediately after a read_parquet seems a bit redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could apply the selection here
| cudf::io::read_parquet(options, stream, ctx->br()->device_mr()).tbl, stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we might not need to do the column copy in L204
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering about this too, but I wasn't sure whether cudf had a built-in way of saying "read these columns for the filter, but don't include them in the output". If it doesn't have a way of doing that, then I agree doing it after the read but before the return is probably worth doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TomAugspurger I think the filter is for rows. It's more of a post-processing step in read_parquet node AFAIU.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep it's a row filter. But in some cases (like this one) we only need a column for the filter. If cudf doesn't have a way to include a column for the purpose of filtering, but exclude it from the result table, then we can update our wrapper to perform that selection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do it in the read_parquet by using column_name_reference in the filter rather than column_reference.
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs; | ||
| sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | ||
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs; | ||
| count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | ||
| requests.push_back( | ||
| cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) | ||
| ); | ||
| requests.push_back( | ||
| cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could be simplified as follows, since both sum and count are happening on col 1.
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> sum_aggs; | |
| sum_aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | |
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> count_aggs; | |
| count_aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(sum_aggs)) | |
| ); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(count_aggs)) | |
| ); | |
| std::vector<std::unique_ptr<cudf::groupby_aggregation>> aggs; | |
| aggs.push_back(cudf::make_sum_aggregation<cudf::groupby_aggregation>()); | |
| aggs.push_back(cudf::make_count_aggregation<cudf::groupby_aggregation>()); | |
| requests.push_back( | |
| cudf::groupby::aggregation_request(table.column(1), std::move(aggs)) | |
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then results will have both sum and count in a single vector
| std::vector<std::unique_ptr<cudf::column>> result; | ||
| result.push_back(std::move(keys->release()[0])); | ||
| result.push_back(std::move(results[0].results[0])); // sum | ||
| result.push_back(std::move(results[1].results[0])); // count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in Q09 we push back to the vector given by keys->release()
auto result = keys->release();
for (auto&& r : results) {
std::ranges::move(r.results, std::back_inserter(result));
}I think this is a neater way
| auto chunk_stream = chunk.stream(); | ||
| auto table = chunk.table_view(); | ||
|
|
||
| if (!table.is_empty() && table.num_columns() >= 4) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can there be a table.num_columns() != 4 scenario?
| auto sum_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get()) | ||
| ->set_value(local_sum, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use a rmm::device_uvector here. This will avoid an extra allocation IINM.
| auto sum_scalar = cudf::make_numeric_scalar( | |
| cudf::data_type(cudf::type_id::FLOAT64), chunk_stream, ctx->br()->device_mr() | |
| ); | |
| static_cast<cudf::numeric_scalar<double>*>(sum_scalar.get()) | |
| ->set_value(local_sum, chunk_stream); | |
| std::vector<std::unique_ptr<cudf::column>> result_cols; | |
| result_cols.push_back( | |
| cudf::make_column_from_scalar( | |
| *sum_scalar, 1, chunk_stream, ctx->br()->device_mr() | |
| ) | |
| ); | |
| rmm::device_uvector<double> vec(1, chunk_stream, ctx->br()->device_mr()); | |
| vec.set_element_async(0, local_sum, chunk_stream); | |
| std::vector<std::unique_ptr<cudf::column>> result_cols { | |
| std::make_unique<cudf::column>(std::move(vec), {}, 0) | |
| }; |
| auto sum_val = | ||
| static_cast<cudf::numeric_scalar<double>*>( | ||
| cudf::reduce( | ||
| local_result->view().column(0), | ||
| *cudf::make_sum_aggregation<cudf::reduce_aggregation>(), | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ) | ||
| .get() | ||
| ) | ||
| ->value(chunk_stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the same as local_sum in a single rank case, isnt it? Seems a bit redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think we can remove the if (local_result) {...} else {...} branch and simply use the local_sum value here.
| auto avg_yearly_val = sum_val / 7.0; | ||
| auto avg_yearly_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get()) | ||
| ->set_value(avg_yearly_val, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we need a util to create a column from a single value of type T
| auto avg_yearly_scalar = cudf::make_numeric_scalar( | ||
| cudf::data_type(cudf::type_id::FLOAT64), | ||
| chunk_stream, | ||
| ctx->br()->device_mr() | ||
| ); | ||
| static_cast<cudf::numeric_scalar<double>*>(avg_yearly_scalar.get()) | ||
| ->set_value(avg_yearly_val, chunk_stream); | ||
|
|
||
| std::vector<std::unique_ptr<cudf::column>> result_cols; | ||
| result_cols.push_back( | ||
| cudf::make_column_from_scalar( | ||
| *avg_yearly_scalar, 1, chunk_stream, ctx->br()->device_mr() | ||
| ) | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to a separate util
| []( | ||
| std::shared_ptr<rapidsmpf::streaming::Context> ctx, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_in, | ||
| std::shared_ptr<rapidsmpf::streaming::Channel> ch_out, | ||
| rapidsmpf::OpID tag | ||
| ) -> rapidsmpf::streaming::Node { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move this to a separate method
Signed-off-by: niranda perera <[email protected]>
Avoid including the filter columns in the output table.
|
@nirandaperera I pushed a couple changes
I noticed that we're getting a different (presumably incorrect) result now. I'm not sure how long it's been like this. Running this script: Detailsshows the different result for |
|
I checked out an earlier commit (c21cb30) and confirmed that it gets the same (probably incorrect) 223947 as HEAD. So the recent changes didn't cause a regression there. |
|
Shoot! I was testing against SF3K and thought was getting the correct results. I'll double check |
|
It's also possible I've messed up the expected result. I haven't looked carefully. |
|
@TomAugspurger and I went through a variety of scale factors: 10, 100, 1K, 3K and validated that all worked multi-gpu. However, with a single GPU run the values were incorrect. We suspect something my be incorrect with reading/distributing parquet files or assumption with distribution are only valid for N+1 ranks |
|
@TomAugspurger I think I resolved the issue in f67178f -- carved out a special case for single rank computing |
Resolved conflict in cpp/benchmarks/streaming/ndsh/CMakeLists.txt by combining query lists from both branches (q01, q03, q09, q17).
No description provided.