Kokkos Core Kernels Package  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Kokkos_WorkGraphPolicy.hpp
1 /*
2 //@HEADER
3 // ************************************************************************
4 //
5 // Kokkos v. 3.0
6 // Copyright (2020) National Technology & Engineering
7 // Solutions of Sandia, LLC (NTESS).
8 //
9 // Under the terms of Contract DE-NA0003525 with NTESS,
10 // the U.S. Government retains certain rights in this software.
11 //
12 // Redistribution and use in source and binary forms, with or without
13 // modification, are permitted provided that the following conditions are
14 // met:
15 //
16 // 1. Redistributions of source code must retain the above copyright
17 // notice, this list of conditions and the following disclaimer.
18 //
19 // 2. Redistributions in binary form must reproduce the above copyright
20 // notice, this list of conditions and the following disclaimer in the
21 // documentation and/or other materials provided with the distribution.
22 //
23 // 3. Neither the name of the Corporation nor the names of the
24 // contributors may be used to endorse or promote products derived from
25 // this software without specific prior written permission.
26 //
27 // THIS SOFTWARE IS PROVIDED BY NTESS "AS IS" AND ANY
28 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL NTESS OR THE
31 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
34 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
35 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
36 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
37 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 //
39 // Questions? Contact Christian R. Trott (crtrott@sandia.gov)
40 //
41 // ************************************************************************
42 //@HEADER
43 */
44 
45 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
46 #define KOKKOS_WORKGRAPHPOLICY_HPP
47 
48 namespace Kokkos {
49 namespace Impl {
50 
51 template <class functor_type, class execution_space, class... policy_args>
52 class WorkGraphExec;
53 
54 }
55 } // namespace Kokkos
56 
57 namespace Kokkos {
58 
59 template <class... Properties>
60 class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
61  public:
62  using execution_policy = WorkGraphPolicy<Properties...>;
63  using self_type = WorkGraphPolicy<Properties...>;
64  using traits = Kokkos::Impl::PolicyTraits<Properties...>;
65  using index_type = typename traits::index_type;
66  using member_type = index_type;
67  using execution_space = typename traits::execution_space;
68  using memory_space = typename execution_space::memory_space;
70 
71  enum : std::int32_t {
72  END_TOKEN = -1,
73  BEGIN_TOKEN = -2,
74  COMPLETED_TOKEN = -3
75  };
76 
77  private:
79 
80  // Let N = m_graph.numRows(), the total work
81  // m_queue[ 0 .. N-1] = the ready queue
82  // m_queue[ N .. 2*N-1] = the waiting queue counts
83  // m_queue[2*N .. 2*N+2] = the ready queue hints
84 
85  graph_type const m_graph;
86  ints_type m_queue;
87 
88  KOKKOS_INLINE_FUNCTION
89  void push_work(const std::int32_t w) const noexcept {
90  const std::int32_t N = m_graph.numRows();
91 
92  std::int32_t volatile* const ready_queue = &m_queue[0];
93  std::int32_t volatile* const end_hint = &m_queue[2 * N + 1];
94 
95  // Push work to end of queue
96  const std::int32_t j = atomic_fetch_add(end_hint, 1);
97 
98  if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
99  // ERROR: past the end of queue or did not replace END_TOKEN
100  Kokkos::abort("WorkGraphPolicy push_work error");
101  }
102 
103  memory_fence();
104  }
105 
106  public:
121  KOKKOS_INLINE_FUNCTION
122  std::int32_t pop_work() const noexcept {
123  const std::int32_t N = m_graph.numRows();
124 
125  std::int32_t volatile* const ready_queue = &m_queue[0];
126  std::int32_t volatile* const begin_hint = &m_queue[2 * N];
127 
128  // begin hint is guaranteed to be less than or equal to
129  // actual begin location in the queue.
130 
131  for (std::int32_t i = *begin_hint; i < N; ++i) {
132  const std::int32_t w = ready_queue[i];
133 
134  if (w == END_TOKEN) {
135  return END_TOKEN;
136  }
137 
138  if ((w != BEGIN_TOKEN) &&
139  (w == atomic_compare_exchange(ready_queue + i, w,
140  (std::int32_t)BEGIN_TOKEN))) {
141  // Attempt to claim ready work index succeeded,
142  // update the hint and return work index
143  atomic_increment(begin_hint);
144  return w;
145  }
146  // arrive here when ready_queue[i] == BEGIN_TOKEN
147  }
148 
149  return COMPLETED_TOKEN;
150  }
151 
152  KOKKOS_INLINE_FUNCTION
153  void completed_work(std::int32_t w) const noexcept {
154  Kokkos::memory_fence();
155 
156  // Make sure the completed work function's memory accesses are flushed.
157 
158  const std::int32_t N = m_graph.numRows();
159 
160  std::int32_t volatile* const count_queue = &m_queue[N];
161 
162  const std::int32_t B = m_graph.row_map(w);
163  const std::int32_t E = m_graph.row_map(w + 1);
164 
165  for (std::int32_t i = B; i < E; ++i) {
166  const std::int32_t j = m_graph.entries(i);
167  if (1 == atomic_fetch_add(count_queue + j, -1)) {
168  push_work(j);
169  }
170  }
171  }
172 
173  struct TagInit {};
174  struct TagCount {};
175  struct TagReady {};
176 
183  KOKKOS_INLINE_FUNCTION
184  void operator()(const TagInit, int i) const noexcept {
185  m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
186  }
187 
188  KOKKOS_INLINE_FUNCTION
189  void operator()(const TagCount, int i) const noexcept {
190  std::int32_t volatile* const count_queue = &m_queue[m_graph.numRows()];
191 
192  atomic_increment(count_queue + m_graph.entries[i]);
193  }
194 
195  KOKKOS_INLINE_FUNCTION
196  void operator()(const TagReady, int w) const noexcept {
197  std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
198 
199  if (0 == count_queue[w]) push_work(w);
200  }
201 
202  execution_space space() const { return execution_space(); }
203 
204  WorkGraphPolicy(const graph_type& arg_graph)
205  : m_graph(arg_graph),
206  m_queue(view_alloc("queue", WithoutInitializing),
207  arg_graph.numRows() * 2 + 2) {
208  { // Initialize
209  using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
211  const closure_type closure(*this, policy_type(0, m_queue.size()));
212  closure.execute();
213  execution_space().fence();
214  }
215 
216  { // execute-after counts
217  using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
219  const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
220  closure.execute();
221  execution_space().fence();
222  }
223 
224  { // Scheduling ready tasks
225  using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
227  const closure_type closure(*this, policy_type(0, m_graph.numRows()));
228  closure.execute();
229  execution_space().fence();
230  }
231  }
232 };
233 
234 } // namespace Kokkos
235 
236 #ifdef KOKKOS_ENABLE_SERIAL
237 #include "impl/Kokkos_Serial_WorkGraphPolicy.hpp"
238 #endif
239 
240 #ifdef KOKKOS_ENABLE_OPENMP
241 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
242 #endif
243 
244 #ifdef KOKKOS_ENABLE_CUDA
245 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
246 #endif
247 
248 #ifdef KOKKOS_ENABLE_THREADS
249 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
250 #endif
251 
252 #ifdef KOKKOS_ENABLE_HPX
253 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
254 #endif
255 
256 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Compressed row storage array.
Definition: Kokkos_Crs.hpp:83
Impl::ViewCtorProp< typename Impl::ViewCtorProp< void, Args >::type...> view_alloc(Args const &...args)
Create View allocation parameter bundle from argument list.
Implementation of the ParallelFor operator that has a partial specialization for the device...