-
Notifications
You must be signed in to change notification settings - Fork 1k
Refactor distinct using static_map insert_or_apply
#16484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-24.10
Are you sure you want to change the base?
Changes from all commits
eb5a6ea
86c2a22
f2d8eef
5b66240
00fa59b
213e1ce
9f7db16
9d93e60
c10e047
78561d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,118 +22,105 @@ | |
| namespace cudf::detail { | ||
|
|
||
| template <typename RowHasher> | ||
| rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set, | ||
| rmm::device_uvector<size_type> reduce_by_row(hash_map_type<RowHasher>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr); | ||
| if ((keep == duplicate_keep_option::KEEP_FIRST) or (keep == duplicate_keep_option::KEEP_LAST)) { | ||
| auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr); | ||
|
|
||
| auto pairs = | ||
| thrust::make_transform_iterator(thrust::counting_iterator<size_type>(0), | ||
| cuda::proclaim_return_type<cuco::pair<size_type, size_type>>( | ||
| [] __device__(size_type const i) { | ||
| return cuco::pair<size_type, size_type>{i, i}; | ||
| })); | ||
|
|
||
| if (keep == duplicate_keep_option::KEEP_FIRST) { | ||
| map.insert_or_apply_async(pairs, pairs + num_rows, min_op{}, stream.value()); | ||
| } else { | ||
| map.insert_or_apply_async(pairs, pairs + num_rows, max_op{}, stream.value()); | ||
| } | ||
|
|
||
| // If we don't care about order, just gather indices of distinct keys taken from set. | ||
| if (keep == duplicate_keep_option::KEEP_ANY) { | ||
| auto const iter = thrust::counting_iterator<cudf::size_type>{0}; | ||
| set.insert_async(iter, iter + num_rows, stream.value()); | ||
| auto const output_end = set.retrieve_all(output_indices.begin(), stream.value()); | ||
| auto const [_, output_end] = | ||
| map.retrieve_all(thrust::make_discard_iterator(), output_indices.begin(), stream.value()); | ||
| output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream); | ||
| return output_indices; | ||
| } | ||
|
|
||
| auto reduction_results = rmm::device_uvector<size_type>(num_rows, stream, mr); | ||
| thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), | ||
| reduction_results.begin(), | ||
| reduction_results.end(), | ||
| reduction_init_value(keep)); | ||
|
|
||
| auto set_ref = set.ref(cuco::op::insert_and_find); | ||
|
|
||
| thrust::for_each(rmm::exec_policy_nosync(stream), | ||
| thrust::make_counting_iterator(0), | ||
| thrust::make_counting_iterator(num_rows), | ||
| [set_ref, keep, reduction_results = reduction_results.begin()] __device__( | ||
| size_type const idx) mutable { | ||
| auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx); | ||
|
|
||
| auto ref = cuda::atomic_ref<size_type, cuda::thread_scope_device>{ | ||
| reduction_results[*inserted_idx_ptr]}; | ||
| if (keep == duplicate_keep_option::KEEP_FIRST) { | ||
| // Store the smallest index of all rows that are equal. | ||
| ref.fetch_min(idx, cuda::memory_order_relaxed); | ||
| } else if (keep == duplicate_keep_option::KEEP_LAST) { | ||
| // Store the greatest index of all rows that are equal. | ||
| ref.fetch_max(idx, cuda::memory_order_relaxed); | ||
| } else { | ||
| // Count the number of rows in each group of rows that are compared equal. | ||
| ref.fetch_add(size_type{1}, cuda::memory_order_relaxed); | ||
| } | ||
| }); | ||
|
|
||
| auto const map_end = [&] { | ||
| if (keep == duplicate_keep_option::KEEP_NONE) { | ||
| // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. | ||
| // Thus, we only output index of the rows in the groups having group size of `1`. | ||
| return thrust::copy_if( | ||
| rmm::exec_policy(stream), | ||
| thrust::make_counting_iterator(0), | ||
| thrust::make_counting_iterator(num_rows), | ||
| output_indices.begin(), | ||
| cuda::proclaim_return_type<bool>( | ||
| [reduction_results = reduction_results.begin()] __device__(auto const idx) { | ||
| return reduction_results[idx] == size_type{1}; | ||
| })); | ||
| } | ||
| auto keys = rmm::device_uvector<size_type>(num_rows, stream); | ||
| auto values = rmm::device_uvector<size_type>(num_rows, stream); | ||
|
|
||
| auto pairs = thrust::make_transform_iterator( | ||
| thrust::counting_iterator<size_type>(0), | ||
| cuda::proclaim_return_type<cuco::pair<size_type, size_type>>([] __device__(size_type const i) { | ||
| return cuco::pair<size_type, size_type>{i, 1}; | ||
| })); | ||
|
|
||
| map.insert_or_apply_async(pairs, pairs + num_rows, plus_op{}, stream.value()); | ||
| auto const [keys_end, _] = map.retrieve_all(keys.begin(), values.begin(), stream.value()); | ||
|
|
||
| auto num_distinct_keys = thrust::distance(keys.begin(), keys_end); | ||
| keys.resize(num_distinct_keys, stream); | ||
| values.resize(num_distinct_keys, stream); | ||
|
|
||
| auto output_indices = rmm::device_uvector<size_type>(num_distinct_keys, stream, mr); | ||
|
|
||
| auto const output_iter = cudf::detail::make_counting_transform_iterator( | ||
| size_type(0), | ||
| cuda::proclaim_return_type<size_type>( | ||
| [keys = keys.begin(), values = values.begin()] __device__(auto const idx) { | ||
| return values[idx] == size_type{1} ? keys[idx] : -1; | ||
| })); | ||
|
|
||
| // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in | ||
| // each group of equal rows (which are the desired output indices), or the value given by | ||
| // `reduction_init_value()`. | ||
| return thrust::copy_if( | ||
| rmm::exec_policy(stream), | ||
| reduction_results.begin(), | ||
| reduction_results.end(), | ||
| output_indices.begin(), | ||
| cuda::proclaim_return_type<bool>([init_value = reduction_init_value(keep)] __device__( | ||
| auto const idx) { return idx != init_value; })); | ||
| }(); | ||
| auto const map_end = thrust::copy_if( | ||
| rmm::exec_policy_nosync(stream), | ||
| output_iter, | ||
| output_iter + num_distinct_keys, | ||
| output_indices.begin(), | ||
| cuda::proclaim_return_type<bool>([] __device__(auto const idx) { return idx != -1; })); | ||
|
Comment on lines
+78
to
+83
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add back all the explanation comments (and new comments for new code). Although I wrote the original code, now I hardly understand what it is doing without the comments. |
||
|
|
||
| output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); | ||
| return output_indices; | ||
| } | ||
|
|
||
| template rmm::device_uvector<size_type> reduce_by_row( | ||
| hash_set_type<cudf::experimental::row::equality::device_row_comparator< | ||
| hash_map_type<cudf::experimental::row::equality::device_row_comparator< | ||
| false, | ||
| cudf::nullate::DYNAMIC, | ||
| cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set, | ||
| cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr); | ||
|
|
||
| template rmm::device_uvector<size_type> reduce_by_row( | ||
| hash_set_type<cudf::experimental::row::equality::device_row_comparator< | ||
| hash_map_type<cudf::experimental::row::equality::device_row_comparator< | ||
| true, | ||
| cudf::nullate::DYNAMIC, | ||
| cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set, | ||
| cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr); | ||
|
|
||
| template rmm::device_uvector<size_type> reduce_by_row( | ||
| hash_set_type<cudf::experimental::row::equality::device_row_comparator< | ||
| hash_map_type<cudf::experimental::row::equality::device_row_comparator< | ||
| false, | ||
| cudf::nullate::DYNAMIC, | ||
| cudf::experimental::row::equality::physical_equality_comparator>>& set, | ||
| cudf::experimental::row::equality::physical_equality_comparator>>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr); | ||
|
|
||
| template rmm::device_uvector<size_type> reduce_by_row( | ||
| hash_set_type<cudf::experimental::row::equality::device_row_comparator< | ||
| hash_map_type<cudf::experimental::row::equality::device_row_comparator< | ||
| true, | ||
| cudf::nullate::DYNAMIC, | ||
| cudf::experimental::row::equality::physical_equality_comparator>>& set, | ||
| cudf::experimental::row::equality::physical_equality_comparator>>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This header is now empty. Can we delete it and remove it from |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,11 +23,13 @@ | |
| #include <rmm/device_uvector.hpp> | ||
| #include <rmm/resource_ref.hpp> | ||
|
|
||
| #include <cuco/static_map.cuh> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now with including the |
||
| #include <cuco/static_set.cuh> | ||
| #include <cuda/functional> | ||
| #include <thrust/copy.h> | ||
| #include <thrust/distance.h> | ||
| #include <thrust/iterator/counting_iterator.h> | ||
| #include <thrust/iterator/discard_iterator.h> | ||
|
|
||
| namespace cudf::detail { | ||
|
|
||
|
|
@@ -47,6 +49,31 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) | |
| } | ||
| } | ||
|
|
||
| struct plus_op { | ||
| template <cuda::thread_scope Scope> | ||
| __device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val) | ||
|
Comment on lines
+52
to
+54
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please do not put |
||
| { | ||
| ref.fetch_add(static_cast<size_type>(1), cuda::memory_order_relaxed); | ||
| } | ||
| }; | ||
|
|
||
| struct min_op { | ||
| template <cuda::thread_scope Scope> | ||
| __device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val) | ||
| { | ||
| ref.fetch_min(val, cuda::memory_order_relaxed); | ||
| } | ||
| }; | ||
|
|
||
| struct max_op { | ||
| template <cuda::thread_scope Scope> | ||
| __device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val) | ||
| { | ||
| ref.fetch_max(val, cuda::memory_order_relaxed); | ||
| } | ||
| }; | ||
|
|
||
| // The static_set type used to process `keep_any` option | ||
| template <typename RowHasher> | ||
| using hash_set_type = | ||
| cuco::static_set<size_type, | ||
|
|
@@ -60,12 +87,27 @@ using hash_set_type = | |
| cudf::detail::cuco_allocator<char>, | ||
| cuco::storage<1>>; | ||
|
|
||
| // The static_map type used to process `keep_first`, `keep_last` and `keep_none` option | ||
| template <typename RowHasher> | ||
| using hash_map_type = | ||
| cuco::static_map<size_type, | ||
| size_type, | ||
| cuco::extent<int64_t>, | ||
| cuda::thread_scope_device, | ||
| RowHasher, | ||
| cuco::linear_probing<1, | ||
| cudf::experimental::row::hash::device_row_hasher< | ||
| cudf::hashing::detail::default_hash, | ||
| cudf::nullate::DYNAMIC>>, | ||
| cudf::detail::cuco_allocator<char>, | ||
| cuco::storage<1>>; | ||
|
|
||
| /** | ||
| * @brief Perform a reduction on groups of rows that are compared equal and returns output indices | ||
| * of the occurrences of the distinct elements based on `keep` parameter. | ||
| * | ||
| * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared | ||
| * equal. A hash set is used to find groups of equal rows. | ||
| * equal. A hash map is used to find groups of equal rows. | ||
| * | ||
| * Depending on the `keep` parameter, the reduction operation for each row group is: | ||
| * - If `keep == KEEP_ANY` : order does not matter. | ||
|
|
@@ -79,16 +121,15 @@ using hash_set_type = | |
| * the `reduction_init_value()` function. Then, the reduction result for each row group is written | ||
| * into the output array at the index of an unspecified row in the group. | ||
| * | ||
| * @param set The auxiliary set to perform reduction | ||
| * @param set_size The number of elements in set | ||
| * @param map The auxiliary map to perform reduction | ||
| * @param num_rows The number of all input rows | ||
| * @param keep The parameter to determine what type of reduction to perform | ||
| * @param stream CUDA stream used for device memory operations and kernel launches | ||
| * @param mr Device memory resource used to allocate the returned vector | ||
| * @return A device_uvector containing the output indices | ||
| */ | ||
| template <typename RowHasher> | ||
| rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set, | ||
| rmm::device_uvector<size_type> reduce_by_row(hash_map_type<RowHasher>& map, | ||
| size_type num_rows, | ||
| duplicate_keep_option keep, | ||
| rmm::cuda_stream_view stream, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, this file is now empty and should be removed and deleted from CMakeLists.txt.