Skip to content
32 changes: 29 additions & 3 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <cuco/static_set.cuh>

#include <functional>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -96,18 +99,41 @@ rmm::device_uvector<size_type> distinct_indices(table_view const& input,
auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const helper_func = [&](auto const& d_equal) {
using RowHasher = std::decay_t<decltype(d_equal)>;
auto set = hash_set_type<RowHasher>{
using RowHasher = cuda::std::decay_t<decltype(d_equal)>;

// If we don't care about order, just gather indices of distinct keys taken from set.
if (keep == duplicate_keep_option::KEEP_ANY) {
auto set = hash_set_type<RowHasher>{
num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_equal,
{row_hash.device_hasher(has_nulls)},
{},
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};

auto const iter = thrust::counting_iterator<cudf::size_type>{0};
set.insert_async(iter, iter + num_rows, stream.value());
auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr);
auto const output_end = set.retrieve_all(output_indices.begin(), stream.value());
output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream);
return output_indices;
}

auto map = hash_map_type<RowHasher>{
num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
cuco::empty_value{reduction_init_value(keep)},
d_equal,
{row_hash.device_hasher(has_nulls)},
{},
{},
cudf::detail::cuco_allocator<char>{rmm::mr::polymorphic_allocator<char>{}, stream},
stream.value()};
return detail::reduce_by_row(set, num_rows, keep, stream, mr);
return reduce_by_row(map, num_rows, keep, stream, mr);
};

if (cudf::detail::has_nested_columns(input)) {
Expand Down
125 changes: 56 additions & 69 deletions cpp/src/stream_compaction/distinct_helpers.cu
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, this file is now empty and should be removed and deleted from CMakeLists.txt.

Original file line number Diff line number Diff line change
Expand Up @@ -22,118 +22,105 @@
namespace cudf::detail {

template <typename RowHasher>
rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set,
rmm::device_uvector<size_type> reduce_by_row(hash_map_type<RowHasher>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr);
if ((keep == duplicate_keep_option::KEEP_FIRST) or (keep == duplicate_keep_option::KEEP_LAST)) {
auto output_indices = rmm::device_uvector<size_type>(num_rows, stream, mr);

auto pairs =
thrust::make_transform_iterator(thrust::counting_iterator<size_type>(0),
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>(
[] __device__(size_type const i) {
return cuco::pair<size_type, size_type>{i, i};
}));

if (keep == duplicate_keep_option::KEEP_FIRST) {
map.insert_or_apply_async(pairs, pairs + num_rows, min_op{}, stream.value());
} else {
map.insert_or_apply_async(pairs, pairs + num_rows, max_op{}, stream.value());
}

// If we don't care about order, just gather indices of distinct keys taken from set.
if (keep == duplicate_keep_option::KEEP_ANY) {
auto const iter = thrust::counting_iterator<cudf::size_type>{0};
set.insert_async(iter, iter + num_rows, stream.value());
auto const output_end = set.retrieve_all(output_indices.begin(), stream.value());
auto const [_, output_end] =
map.retrieve_all(thrust::make_discard_iterator(), output_indices.begin(), stream.value());
output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream);
return output_indices;
}

auto reduction_results = rmm::device_uvector<size_type>(num_rows, stream, mr);
thrust::uninitialized_fill(rmm::exec_policy_nosync(stream),
reduction_results.begin(),
reduction_results.end(),
reduction_init_value(keep));

auto set_ref = set.ref(cuco::op::insert_and_find);

thrust::for_each(rmm::exec_policy_nosync(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
[set_ref, keep, reduction_results = reduction_results.begin()] __device__(
size_type const idx) mutable {
auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx);

auto ref = cuda::atomic_ref<size_type, cuda::thread_scope_device>{
reduction_results[*inserted_idx_ptr]};
if (keep == duplicate_keep_option::KEEP_FIRST) {
// Store the smallest index of all rows that are equal.
ref.fetch_min(idx, cuda::memory_order_relaxed);
} else if (keep == duplicate_keep_option::KEEP_LAST) {
// Store the greatest index of all rows that are equal.
ref.fetch_max(idx, cuda::memory_order_relaxed);
} else {
// Count the number of rows in each group of rows that are compared equal.
ref.fetch_add(size_type{1}, cuda::memory_order_relaxed);
}
});

auto const map_end = [&] {
if (keep == duplicate_keep_option::KEEP_NONE) {
// Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`.
// Thus, we only output index of the rows in the groups having group size of `1`.
return thrust::copy_if(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(num_rows),
output_indices.begin(),
cuda::proclaim_return_type<bool>(
[reduction_results = reduction_results.begin()] __device__(auto const idx) {
return reduction_results[idx] == size_type{1};
}));
}
auto keys = rmm::device_uvector<size_type>(num_rows, stream);
auto values = rmm::device_uvector<size_type>(num_rows, stream);

auto pairs = thrust::make_transform_iterator(
thrust::counting_iterator<size_type>(0),
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>([] __device__(size_type const i) {
return cuco::pair<size_type, size_type>{i, 1};
}));

map.insert_or_apply_async(pairs, pairs + num_rows, plus_op{}, stream.value());
auto const [keys_end, _] = map.retrieve_all(keys.begin(), values.begin(), stream.value());

auto num_distinct_keys = thrust::distance(keys.begin(), keys_end);
keys.resize(num_distinct_keys, stream);
values.resize(num_distinct_keys, stream);

auto output_indices = rmm::device_uvector<size_type>(num_distinct_keys, stream, mr);

auto const output_iter = cudf::detail::make_counting_transform_iterator(
size_type(0),
cuda::proclaim_return_type<size_type>(
[keys = keys.begin(), values = values.begin()] __device__(auto const idx) {
return values[idx] == size_type{1} ? keys[idx] : -1;
}));

// Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in
// each group of equal rows (which are the desired output indices), or the value given by
// `reduction_init_value()`.
return thrust::copy_if(
rmm::exec_policy(stream),
reduction_results.begin(),
reduction_results.end(),
output_indices.begin(),
cuda::proclaim_return_type<bool>([init_value = reduction_init_value(keep)] __device__(
auto const idx) { return idx != init_value; }));
}();
auto const map_end = thrust::copy_if(
rmm::exec_policy_nosync(stream),
output_iter,
output_iter + num_distinct_keys,
output_indices.begin(),
cuda::proclaim_return_type<bool>([] __device__(auto const idx) { return idx != -1; }));
Comment on lines +78 to +83
Copy link
Copy Markdown
Contributor

@ttnghia ttnghia Aug 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add back all the explanation comments (and new comments for new code). Although I wrote the original code, now I hardly understand what it is doing without the comments.


output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream);
return output_indices;
}

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
hash_map_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
hash_map_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& set,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator>>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
hash_map_type<cudf::experimental::row::equality::device_row_comparator<
false,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>>& set,
cudf::experimental::row::equality::physical_equality_comparator>>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

template rmm::device_uvector<size_type> reduce_by_row(
hash_set_type<cudf::experimental::row::equality::device_row_comparator<
hash_map_type<cudf::experimental::row::equality::device_row_comparator<
true,
cudf::nullate::DYNAMIC,
cudf::experimental::row::equality::physical_equality_comparator>>& set,
cudf::experimental::row::equality::physical_equality_comparator>>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
Expand Down
49 changes: 45 additions & 4 deletions cpp/src/stream_compaction/distinct_helpers.hpp
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header is now empty. Can we delete it and remove it from #includes?

Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
#include <rmm/device_uvector.hpp>
#include <rmm/resource_ref.hpp>

#include <cuco/static_map.cuh>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now with including the cuco header, this file should be renamed into _helper.cuh. Otherwise please move the these header and their relevant code into _helper.cuh and keep this file containing host-only code.

#include <cuco/static_set.cuh>
#include <cuda/functional>
#include <thrust/copy.h>
#include <thrust/distance.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>

namespace cudf::detail {

Expand All @@ -47,6 +49,31 @@ auto constexpr reduction_init_value(duplicate_keep_option keep)
}
}

struct plus_op {
template <cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val)
Comment on lines +52 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not put __device__ code in hpp file.

{
ref.fetch_add(static_cast<size_type>(1), cuda::memory_order_relaxed);
}
};

struct min_op {
template <cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val)
{
ref.fetch_min(val, cuda::memory_order_relaxed);
}
};

struct max_op {
template <cuda::thread_scope Scope>
__device__ void operator()(cuda::atomic_ref<size_type, Scope> ref, size_type const val)
{
ref.fetch_max(val, cuda::memory_order_relaxed);
}
};

// The static_set type used to process `keep_any` option
template <typename RowHasher>
using hash_set_type =
cuco::static_set<size_type,
Expand All @@ -60,12 +87,27 @@ using hash_set_type =
cudf::detail::cuco_allocator<char>,
cuco::storage<1>>;

// The static_map type used to process `keep_first`, `keep_last` and `keep_none` option
template <typename RowHasher>
using hash_map_type =
cuco::static_map<size_type,
size_type,
cuco::extent<int64_t>,
cuda::thread_scope_device,
RowHasher,
cuco::linear_probing<1,
cudf::experimental::row::hash::device_row_hasher<
cudf::hashing::detail::default_hash,
cudf::nullate::DYNAMIC>>,
cudf::detail::cuco_allocator<char>,
cuco::storage<1>>;

/**
* @brief Perform a reduction on groups of rows that are compared equal and returns output indices
* of the occurrences of the distinct elements based on `keep` parameter.
*
* This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared
* equal. A hash set is used to find groups of equal rows.
* equal. A hash map is used to find groups of equal rows.
*
* Depending on the `keep` parameter, the reduction operation for each row group is:
* - If `keep == KEEP_ANY` : order does not matter.
Expand All @@ -79,16 +121,15 @@ using hash_set_type =
* the `reduction_init_value()` function. Then, the reduction result for each row group is written
* into the output array at the index of an unspecified row in the group.
*
* @param set The auxiliary set to perform reduction
* @param set_size The number of elements in set
* @param map The auxiliary map to perform reduction
* @param num_rows The number of all input rows
* @param keep The parameter to determine what type of reduction to perform
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A device_uvector containing the output indices
*/
template <typename RowHasher>
rmm::device_uvector<size_type> reduce_by_row(hash_set_type<RowHasher>& set,
rmm::device_uvector<size_type> reduce_by_row(hash_map_type<RowHasher>& map,
size_type num_rows,
duplicate_keep_option keep,
rmm::cuda_stream_view stream,
Expand Down