44 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
45 #define KOKKOS_WORKGRAPHPOLICY_HPP
50 template<
class functor_type ,
class execution_space,
class ... policy_args >
57 template<
class ... Properties >
62 using execution_policy = WorkGraphPolicy<Properties ... >;
63 using self_type = WorkGraphPolicy<Properties ... >;
64 using traits = Kokkos::Impl::PolicyTraits<Properties ... >;
65 using index_type =
typename traits::index_type;
66 using member_type = index_type;
67 using work_tag =
typename traits::work_tag;
68 using execution_space =
typename traits::execution_space;
69 using memory_space =
typename execution_space::memory_space;
75 COMPLETED_TOKEN = -3 };
86 graph_type
const m_graph;
89 KOKKOS_INLINE_FUNCTION
90 void push_work(
const std::int32_t w )
const noexcept
92 const std::int32_t N = m_graph.numRows();
94 std::int32_t
volatile *
const ready_queue = & m_queue[0] ;
95 std::int32_t
volatile *
const end_hint = & m_queue[2*N+1] ;
98 const std::int32_t j = atomic_fetch_add( end_hint , 1 );
101 ( END_TOKEN != atomic_exchange(ready_queue+j,w) ) ) {
103 Kokkos::abort(
"WorkGraphPolicy push_work error");
125 KOKKOS_INLINE_FUNCTION
126 std::int32_t pop_work() const noexcept
128 const std::int32_t N = m_graph.numRows();
130 std::int32_t
volatile *
const ready_queue = & m_queue[0] ;
131 std::int32_t
volatile *
const begin_hint = & m_queue[2*N] ;
136 for ( std::int32_t i = *begin_hint ; i < N ; ++i ) {
138 const std::int32_t w = ready_queue[i] ;
140 if ( w == END_TOKEN ) {
return END_TOKEN ; }
142 if ( ( w != BEGIN_TOKEN ) &&
143 ( w == atomic_compare_exchange(ready_queue+i,w,(std::int32_t)BEGIN_TOKEN) ) ) {
146 atomic_increment( begin_hint );
152 return COMPLETED_TOKEN ;
156 KOKKOS_INLINE_FUNCTION
157 void completed_work( std::int32_t w )
const noexcept
159 Kokkos::memory_fence();
163 const std::int32_t N = m_graph.numRows();
165 std::int32_t
volatile *
const count_queue = & m_queue[N] ;
167 const std::int32_t B = m_graph.row_map(w);
168 const std::int32_t E = m_graph.row_map(w+1);
170 for ( std::int32_t i = B ; i < E ; ++i ) {
171 const std::int32_t j = m_graph.entries(i);
172 if ( 1 == atomic_fetch_add(count_queue+j,-1) ) {
188 KOKKOS_INLINE_FUNCTION
189 void operator()(
const TagInit ,
int i )
const noexcept
190 { m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0 ; }
192 KOKKOS_INLINE_FUNCTION
193 void operator()(
const TagCount ,
int i )
const noexcept
195 std::int32_t
volatile *
const count_queue =
196 & m_queue[ m_graph.numRows() ] ;
198 atomic_increment( count_queue + m_graph.entries[i] );
201 KOKKOS_INLINE_FUNCTION
202 void operator()(
const TagReady ,
int w )
const noexcept
204 std::int32_t
const *
const count_queue =
205 & m_queue[ m_graph.numRows() ] ;
207 if ( 0 == count_queue[w] ) push_work(w);
210 WorkGraphPolicy(
const graph_type & arg_graph )
212 , m_queue(
view_alloc(
"queue" , WithoutInitializing )
213 , arg_graph.numRows() * 2 + 2 )
216 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
218 const closure_type closure(*
this, policy_type(0, m_queue.size()));
220 execution_space::fence();
224 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
226 const closure_type closure(*
this,policy_type(0,m_graph.entries.size()));
228 execution_space::fence();
232 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
234 const closure_type closure(*
this,policy_type(0,m_graph.numRows()));
236 execution_space::fence();
243 #ifdef KOKKOS_ENABLE_SERIAL
244 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
247 #ifdef KOKKOS_ENABLE_OPENMP
248 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
251 #ifdef KOKKOS_ENABLE_CUDA
252 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
255 #ifdef KOKKOS_ENABLE_THREADS
256 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type... > view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Compressed row storage array.
Implementation of the ParallelFor operator that has a partial specialization for the device...