Kokkos Core Kernels Package  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Kokkos_WorkGraphPolicy.hpp
1 /*
2 //@HEADER
3 // ************************************************************************
4 //
5 // Kokkos v. 2.0
6 // Copyright (2014) Sandia Corporation
7 //
8 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
9 // the U.S. Government retains certain rights in this software.
10 //
11 // Redistribution and use in source and binary forms, with or without
12 // modification, are permitted provided that the following conditions are
13 // met:
14 //
15 // 1. Redistributions of source code must retain the above copyright
16 // notice, this list of conditions and the following disclaimer.
17 //
18 // 2. Redistributions in binary form must reproduce the above copyright
19 // notice, this list of conditions and the following disclaimer in the
20 // documentation and/or other materials provided with the distribution.
21 //
22 // 3. Neither the name of the Corporation nor the names of the
23 // contributors may be used to endorse or promote products derived from
24 // this software without specific prior written permission.
25 //
26 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
27 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
29 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
30 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
31 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
32 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
33 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
34 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
35 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
36 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 //
38 // Questions? Contact Christian R. Trott (crtrott@sandia.gov)
39 //
40 // ************************************************************************
41 //@HEADER
42 */
43 
44 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
45 #define KOKKOS_WORKGRAPHPOLICY_HPP
46 
47 namespace Kokkos {
48 namespace Impl {
49 
50 template< class functor_type , class execution_space, class ... policy_args >
51 class WorkGraphExec;
52 
53 }} // namespace Kokkos::Impl
54 
55 namespace Kokkos {
56 
57 template< class ... Properties >
58 class WorkGraphPolicy
59 {
60 public:
61 
62  using execution_policy = WorkGraphPolicy<Properties ... >;
63  using self_type = WorkGraphPolicy<Properties ... >;
64  using traits = Kokkos::Impl::PolicyTraits<Properties ... >;
65  using index_type = typename traits::index_type;
66  using member_type = index_type;
67  using work_tag = typename traits::work_tag;
68  using execution_space = typename traits::execution_space;
69  using memory_space = typename execution_space::memory_space;
71 
72  enum : std::int32_t {
73  END_TOKEN = -1 ,
74  BEGIN_TOKEN = -2 ,
75  COMPLETED_TOKEN = -3 };
76 
77 private:
78 
80 
81  // Let N = m_graph.numRows(), the total work
82  // m_queue[ 0 .. N-1] = the ready queue
83  // m_queue[ N .. 2*N-1] = the waiting queue counts
84  // m_queue[2*N .. 2*N+2] = the ready queue hints
85 
86  graph_type const m_graph;
87  ints_type m_queue ;
88 
89  KOKKOS_INLINE_FUNCTION
90  void push_work( const std::int32_t w ) const noexcept
91  {
92  const std::int32_t N = m_graph.numRows();
93 
94  std::int32_t volatile * const ready_queue = & m_queue[0] ;
95  std::int32_t volatile * const end_hint = & m_queue[2*N+1] ;
96 
97  // Push work to end of queue
98  const std::int32_t j = atomic_fetch_add( end_hint , 1 );
99 
100  if ( ( N <= j ) ||
101  ( END_TOKEN != atomic_exchange(ready_queue+j,w) ) ) {
102  // ERROR: past the end of queue or did not replace END_TOKEN
103  Kokkos::abort("WorkGraphPolicy push_work error");
104  }
105 
106  memory_fence();
107  }
108 
109 public:
110 
125  KOKKOS_INLINE_FUNCTION
126  std::int32_t pop_work() const noexcept
127  {
128  const std::int32_t N = m_graph.numRows();
129 
130  std::int32_t volatile * const ready_queue = & m_queue[0] ;
131  std::int32_t volatile * const begin_hint = & m_queue[2*N] ;
132 
133  // begin hint is guaranteed to be less than or equal to
134  // actual begin location in the queue.
135 
136  for ( std::int32_t i = *begin_hint ; i < N ; ++i ) {
137 
138  const std::int32_t w = ready_queue[i] ;
139 
140  if ( w == END_TOKEN ) { return END_TOKEN ; }
141 
142  if ( ( w != BEGIN_TOKEN ) &&
143  ( w == atomic_compare_exchange(ready_queue+i,w,(std::int32_t)BEGIN_TOKEN) ) ) {
144  // Attempt to claim ready work index succeeded,
145  // update the hint and return work index
146  atomic_increment( begin_hint );
147  return w ;
148  }
149  // arrive here when ready_queue[i] == BEGIN_TOKEN
150  }
151 
152  return COMPLETED_TOKEN ;
153  }
154 
155 
156  KOKKOS_INLINE_FUNCTION
157  void completed_work( std::int32_t w ) const noexcept
158  {
159  Kokkos::memory_fence();
160 
161  // Make sure the completed work function's memory accesses are flushed.
162 
163  const std::int32_t N = m_graph.numRows();
164 
165  std::int32_t volatile * const count_queue = & m_queue[N] ;
166 
167  const std::int32_t B = m_graph.row_map(w);
168  const std::int32_t E = m_graph.row_map(w+1);
169 
170  for ( std::int32_t i = B ; i < E ; ++i ) {
171  const std::int32_t j = m_graph.entries(i);
172  if ( 1 == atomic_fetch_add(count_queue+j,-1) ) {
173  push_work(j);
174  }
175  }
176  }
177 
178  struct TagInit {};
179  struct TagCount {};
180  struct TagReady {};
181 
188  KOKKOS_INLINE_FUNCTION
189  void operator()( const TagInit , int i ) const noexcept
190  { m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0 ; }
191 
192  KOKKOS_INLINE_FUNCTION
193  void operator()( const TagCount , int i ) const noexcept
194  {
195  std::int32_t volatile * const count_queue =
196  & m_queue[ m_graph.numRows() ] ;
197 
198  atomic_increment( count_queue + m_graph.entries[i] );
199  }
200 
201  KOKKOS_INLINE_FUNCTION
202  void operator()( const TagReady , int w ) const noexcept
203  {
204  std::int32_t const * const count_queue =
205  & m_queue[ m_graph.numRows() ] ;
206 
207  if ( 0 == count_queue[w] ) push_work(w);
208  }
209 
210  WorkGraphPolicy( const graph_type & arg_graph )
211  : m_graph(arg_graph)
212  , m_queue( view_alloc( "queue" , WithoutInitializing )
213  , arg_graph.numRows() * 2 + 2 )
214  {
215  { // Initialize
216  using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
218  const closure_type closure(*this, policy_type(0, m_queue.size()));
219  closure.execute();
220  execution_space::fence();
221  }
222 
223  { // execute-after counts
224  using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
226  const closure_type closure(*this,policy_type(0,m_graph.entries.size()));
227  closure.execute();
228  execution_space::fence();
229  }
230 
231  { // Scheduling ready tasks
232  using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
234  const closure_type closure(*this,policy_type(0,m_graph.numRows()));
235  closure.execute();
236  execution_space::fence();
237  }
238  }
239 };
240 
241 } // namespace Kokkos
242 
243 #ifdef KOKKOS_ENABLE_SERIAL
244 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
245 #endif
246 
247 #ifdef KOKKOS_ENABLE_OPENMP
248 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
249 #endif
250 
251 #ifdef KOKKOS_ENABLE_CUDA
252 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
253 #endif
254 
255 #ifdef KOKKOS_ENABLE_THREADS
256 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
257 #endif
258 
259 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type... > view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Compressed row storage array.
Definition: Kokkos_Crs.hpp:83
Implementation of the ParallelFor operator that has a partial specialization for the device...