44 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
45 #define KOKKOS_WORKGRAPHPOLICY_HPP
50 template<
class functor_type ,
class execution_space,
class ... policy_args >
57 template<
class ... Properties >
58 class WorkGraphPolicy:
public Kokkos::Impl::PolicyTraits<Properties ... >
62 using execution_policy = WorkGraphPolicy<Properties ... >;
63 using self_type = WorkGraphPolicy<Properties ... >;
64 using traits = Kokkos::Impl::PolicyTraits<Properties ... >;
65 using index_type =
typename traits::index_type;
66 using member_type = index_type;
67 using execution_space =
typename traits::execution_space;
68 using memory_space =
typename execution_space::memory_space;
74 COMPLETED_TOKEN = -3 };
85 graph_type
const m_graph;
88 KOKKOS_INLINE_FUNCTION
89 void push_work(
const std::int32_t w )
const noexcept
91 const std::int32_t N = m_graph.numRows();
93 std::int32_t
volatile *
const ready_queue = & m_queue[0] ;
94 std::int32_t
volatile *
const end_hint = & m_queue[2*N+1] ;
97 const std::int32_t j = atomic_fetch_add( end_hint , 1 );
100 ( END_TOKEN != atomic_exchange(ready_queue+j,w) ) ) {
102 Kokkos::abort(
"WorkGraphPolicy push_work error");
124 KOKKOS_INLINE_FUNCTION
125 std::int32_t pop_work() const noexcept
127 const std::int32_t N = m_graph.numRows();
129 std::int32_t
volatile *
const ready_queue = & m_queue[0] ;
130 std::int32_t
volatile *
const begin_hint = & m_queue[2*N] ;
135 for ( std::int32_t i = *begin_hint ; i < N ; ++i ) {
137 const std::int32_t w = ready_queue[i] ;
139 if ( w == END_TOKEN ) {
return END_TOKEN ; }
141 if ( ( w != BEGIN_TOKEN ) &&
142 ( w == atomic_compare_exchange(ready_queue+i,w,(std::int32_t)BEGIN_TOKEN) ) ) {
145 atomic_increment( begin_hint );
151 return COMPLETED_TOKEN ;
155 KOKKOS_INLINE_FUNCTION
156 void completed_work( std::int32_t w )
const noexcept
158 Kokkos::memory_fence();
162 const std::int32_t N = m_graph.numRows();
164 std::int32_t
volatile *
const count_queue = & m_queue[N] ;
166 const std::int32_t B = m_graph.row_map(w);
167 const std::int32_t E = m_graph.row_map(w+1);
169 for ( std::int32_t i = B ; i < E ; ++i ) {
170 const std::int32_t j = m_graph.entries(i);
171 if ( 1 == atomic_fetch_add(count_queue+j,-1) ) {
187 KOKKOS_INLINE_FUNCTION
188 void operator()(
const TagInit ,
int i )
const noexcept
189 { m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0 ; }
191 KOKKOS_INLINE_FUNCTION
192 void operator()(
const TagCount ,
int i )
const noexcept
194 std::int32_t
volatile *
const count_queue =
195 & m_queue[ m_graph.numRows() ] ;
197 atomic_increment( count_queue + m_graph.entries[i] );
200 KOKKOS_INLINE_FUNCTION
201 void operator()(
const TagReady ,
int w )
const noexcept
203 std::int32_t
const *
const count_queue =
204 & m_queue[ m_graph.numRows() ] ;
206 if ( 0 == count_queue[w] ) push_work(w);
209 WorkGraphPolicy(
const graph_type & arg_graph )
211 , m_queue(
view_alloc(
"queue" , WithoutInitializing )
212 , arg_graph.numRows() * 2 + 2 )
215 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
217 const closure_type closure(*
this, policy_type(0, m_queue.size()));
219 execution_space().fence();
223 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
225 const closure_type closure(*
this,policy_type(0,m_graph.entries.size()));
227 execution_space().fence();
231 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
233 const closure_type closure(*
this,policy_type(0,m_graph.numRows()));
235 execution_space().fence();
242 #ifdef KOKKOS_ENABLE_SERIAL
243 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
246 #ifdef KOKKOS_ENABLE_OPENMP
247 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
250 #ifdef KOKKOS_ENABLE_CUDA
251 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
254 #ifdef KOKKOS_ENABLE_THREADS
255 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
258 #ifdef KOKKOS_ENABLE_HPX
259 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type... > view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Compressed row storage array.
Implementation of the ParallelFor operator that has a partial specialization for the device...