45 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
46 #define KOKKOS_WORKGRAPHPOLICY_HPP
51 template <
class functor_type,
class execution_space,
class... policy_args>
59 template <
class... Properties>
60 class WorkGraphPolicy :
public Kokkos::Impl::PolicyTraits<Properties...> {
62 using execution_policy = WorkGraphPolicy<Properties...>;
63 using self_type = WorkGraphPolicy<Properties...>;
64 using traits = Kokkos::Impl::PolicyTraits<Properties...>;
65 using index_type =
typename traits::index_type;
66 using member_type = index_type;
67 using execution_space =
typename traits::execution_space;
68 using memory_space =
typename execution_space::memory_space;
85 graph_type
const m_graph;
88 KOKKOS_INLINE_FUNCTION
89 void push_work(
const std::int32_t w)
const noexcept {
90 const std::int32_t N = m_graph.numRows();
92 std::int32_t
volatile*
const ready_queue = &m_queue[0];
93 std::int32_t
volatile*
const end_hint = &m_queue[2 * N + 1];
96 const std::int32_t j = atomic_fetch_add(end_hint, 1);
98 if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
100 Kokkos::abort(
"WorkGraphPolicy push_work error");
121 KOKKOS_INLINE_FUNCTION
122 std::int32_t pop_work() const noexcept {
123 const std::int32_t N = m_graph.numRows();
125 std::int32_t
volatile*
const ready_queue = &m_queue[0];
126 std::int32_t
volatile*
const begin_hint = &m_queue[2 * N];
131 for (std::int32_t i = *begin_hint; i < N; ++i) {
132 const std::int32_t w = ready_queue[i];
134 if (w == END_TOKEN) {
138 if ((w != BEGIN_TOKEN) &&
139 (w == atomic_compare_exchange(ready_queue + i, w,
140 (std::int32_t)BEGIN_TOKEN))) {
143 atomic_increment(begin_hint);
149 return COMPLETED_TOKEN;
152 KOKKOS_INLINE_FUNCTION
153 void completed_work(std::int32_t w)
const noexcept {
154 Kokkos::memory_fence();
158 const std::int32_t N = m_graph.numRows();
160 std::int32_t
volatile*
const count_queue = &m_queue[N];
162 const std::int32_t B = m_graph.row_map(w);
163 const std::int32_t E = m_graph.row_map(w + 1);
165 for (std::int32_t i = B; i < E; ++i) {
166 const std::int32_t j = m_graph.entries(i);
167 if (1 == atomic_fetch_add(count_queue + j, -1)) {
183 KOKKOS_INLINE_FUNCTION
184 void operator()(
const TagInit,
int i)
const noexcept {
185 m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
188 KOKKOS_INLINE_FUNCTION
189 void operator()(
const TagCount,
int i)
const noexcept {
190 std::int32_t
volatile*
const count_queue = &m_queue[m_graph.numRows()];
192 atomic_increment(count_queue + m_graph.entries[i]);
195 KOKKOS_INLINE_FUNCTION
196 void operator()(
const TagReady,
int w)
const noexcept {
197 std::int32_t
const*
const count_queue = &m_queue[m_graph.numRows()];
199 if (0 == count_queue[w]) push_work(w);
202 execution_space space()
const {
return execution_space(); }
204 WorkGraphPolicy(
const graph_type& arg_graph)
205 : m_graph(arg_graph),
206 m_queue(
view_alloc(
"queue", WithoutInitializing),
207 arg_graph.numRows() * 2 + 2) {
209 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
211 const closure_type closure(*
this, policy_type(0, m_queue.size()));
213 execution_space().fence();
217 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
219 const closure_type closure(*
this, policy_type(0, m_graph.entries.size()));
221 execution_space().fence();
225 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
227 const closure_type closure(*
this, policy_type(0, m_graph.numRows()));
229 execution_space().fence();
236 #ifdef KOKKOS_ENABLE_SERIAL
237 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
240 #ifdef KOKKOS_ENABLE_OPENMP
241 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
244 #ifdef KOKKOS_ENABLE_CUDA
245 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
248 #ifdef KOKKOS_ENABLE_THREADS
249 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
252 #ifdef KOKKOS_ENABLE_HPX
253 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
Compressed row storage array.
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type...> view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Implementation of the ParallelFor operator that has a partial specialization for the device...