Kokkos Core Kernels Package  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Kokkos_WorkGraphPolicy.hpp
1 /*
2 //@HEADER
3 // ************************************************************************
4 //
5 // Kokkos v. 2.0
6 // Copyright (2014) Sandia Corporation
7 //
8 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9 // the U.S. Government retains certain rights in this software.
10 //
11 // Redistribution and use in source and binary forms, with or without
12 // modification, are permitted provided that the following conditions are
13 // met:
14 //
15 // 1. Redistributions of source code must retain the above copyright
16 // notice, this list of conditions and the following disclaimer.
17 //
18 // 2. Redistributions in binary form must reproduce the above copyright
19 // notice, this list of conditions and the following disclaimer in the
20 // documentation and/or other materials provided with the distribution.
21 //
22 // 3. Neither the name of the Corporation nor the names of the
23 // contributors may be used to endorse or promote products derived from
24 // this software without specific prior written permission.
25 //
26 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 //
38 // Questions? Contact Christian R. Trott (crtrott@sandia.gov)
39 //
40 // ************************************************************************
41 //@HEADER
42 */
43 
44 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
45 #define KOKKOS_WORKGRAPHPOLICY_HPP
46 
47 namespace Kokkos {
48 namespace Impl {
49 
50 template< class functor_type , class execution_space, class ... policy_args >
51 class WorkGraphExec;
52 
53 }} // namespace Kokkos::Impl
54 
55 namespace Kokkos {
56 
57 template< class ... Properties >
58 class WorkGraphPolicy: public Kokkos::Impl::PolicyTraits<Properties ... >
59 {
60 public:
61 
62  using execution_policy = WorkGraphPolicy<Properties ... >;
63  using self_type = WorkGraphPolicy<Properties ... >;
64  using traits = Kokkos::Impl::PolicyTraits<Properties ... >;
65  using index_type = typename traits::index_type;
66  using member_type = index_type;
67  using execution_space = typename traits::execution_space;
68  using memory_space = typename execution_space::memory_space;
70 
71  enum : std::int32_t {
72  END_TOKEN = -1 ,
73  BEGIN_TOKEN = -2 ,
74  COMPLETED_TOKEN = -3 };
75 
76 private:
77 
79 
80  // Let N = m_graph.numRows(), the total work
81  // m_queue[ 0 .. N-1] = the ready queue
82  // m_queue[ N .. 2*N-1] = the waiting queue counts
83  // m_queue[2*N .. 2*N+2] = the ready queue hints
84 
85  graph_type const m_graph;
86  ints_type m_queue ;
87 
88  KOKKOS_INLINE_FUNCTION
89  void push_work( const std::int32_t w ) const noexcept
90  {
91  const std::int32_t N = m_graph.numRows();
92 
93  std::int32_t volatile * const ready_queue = & m_queue[0] ;
94  std::int32_t volatile * const end_hint = & m_queue[2*N+1] ;
95 
96  // Push work to end of queue
97  const std::int32_t j = atomic_fetch_add( end_hint , 1 );
98 
99  if ( ( N <= j ) ||
100  ( END_TOKEN != atomic_exchange(ready_queue+j,w) ) ) {
101  // ERROR: past the end of queue or did not replace END_TOKEN
102  Kokkos::abort("WorkGraphPolicy push_work error");
103  }
104 
105  memory_fence();
106  }
107 
108 public:
109 
124  KOKKOS_INLINE_FUNCTION
125  std::int32_t pop_work() const noexcept
126  {
127  const std::int32_t N = m_graph.numRows();
128 
129  std::int32_t volatile * const ready_queue = & m_queue[0] ;
130  std::int32_t volatile * const begin_hint = & m_queue[2*N] ;
131 
132  // begin hint is guaranteed to be less than or equal to
133  // actual begin location in the queue.
134 
135  for ( std::int32_t i = *begin_hint ; i < N ; ++i ) {
136 
137  const std::int32_t w = ready_queue[i] ;
138 
139  if ( w == END_TOKEN ) { return END_TOKEN ; }
140 
141  if ( ( w != BEGIN_TOKEN ) &&
142  ( w == atomic_compare_exchange(ready_queue+i,w,(std::int32_t)BEGIN_TOKEN) ) ) {
143  // Attempt to claim ready work index succeeded,
144  // update the hint and return work index
145  atomic_increment( begin_hint );
146  return w ;
147  }
148  // arrive here when ready_queue[i] == BEGIN_TOKEN
149  }
150 
151  return COMPLETED_TOKEN ;
152  }
153 
154 
155  KOKKOS_INLINE_FUNCTION
156  void completed_work( std::int32_t w ) const noexcept
157  {
158  Kokkos::memory_fence();
159 
160  // Make sure the completed work function's memory accesses are flushed.
161 
162  const std::int32_t N = m_graph.numRows();
163 
164  std::int32_t volatile * const count_queue = & m_queue[N] ;
165 
166  const std::int32_t B = m_graph.row_map(w);
167  const std::int32_t E = m_graph.row_map(w+1);
168 
169  for ( std::int32_t i = B ; i < E ; ++i ) {
170  const std::int32_t j = m_graph.entries(i);
171  if ( 1 == atomic_fetch_add(count_queue+j,-1) ) {
172  push_work(j);
173  }
174  }
175  }
176 
177  struct TagInit {};
178  struct TagCount {};
179  struct TagReady {};
180 
187  KOKKOS_INLINE_FUNCTION
188  void operator()( const TagInit , int i ) const noexcept
189  { m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0 ; }
190 
191  KOKKOS_INLINE_FUNCTION
192  void operator()( const TagCount , int i ) const noexcept
193  {
194  std::int32_t volatile * const count_queue =
195  & m_queue[ m_graph.numRows() ] ;
196 
197  atomic_increment( count_queue + m_graph.entries[i] );
198  }
199 
200  KOKKOS_INLINE_FUNCTION
201  void operator()( const TagReady , int w ) const noexcept
202  {
203  std::int32_t const * const count_queue =
204  & m_queue[ m_graph.numRows() ] ;
205 
206  if ( 0 == count_queue[w] ) push_work(w);
207  }
208 
209  WorkGraphPolicy( const graph_type & arg_graph )
210  : m_graph(arg_graph)
211  , m_queue( view_alloc( "queue" , WithoutInitializing )
212  , arg_graph.numRows() * 2 + 2 )
213  {
214  { // Initialize
215  using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
217  const closure_type closure(*this, policy_type(0, m_queue.size()));
218  closure.execute();
219  execution_space().fence();
220  }
221 
222  { // execute-after counts
223  using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
225  const closure_type closure(*this,policy_type(0,m_graph.entries.size()));
226  closure.execute();
227  execution_space().fence();
228  }
229 
230  { // Scheduling ready tasks
231  using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
233  const closure_type closure(*this,policy_type(0,m_graph.numRows()));
234  closure.execute();
235  execution_space().fence();
236  }
237  }
238 };
239 
240 } // namespace Kokkos
241 
242 #ifdef KOKKOS_ENABLE_SERIAL
243 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
244 #endif
245 
246 #ifdef KOKKOS_ENABLE_OPENMP
247 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
248 #endif
249 
250 #ifdef KOKKOS_ENABLE_CUDA
251 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
252 #endif
253 
254 #ifdef KOKKOS_ENABLE_THREADS
255 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
256 #endif
257 
258 #ifdef KOKKOS_ENABLE_HPX
259 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
260 #endif
261 
262 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type... > view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Compressed row storage array.
Definition: Kokkos_Crs.hpp:83
Implementation of the ParallelFor operator that has a partial specialization for the device...