Kokkos Core Kernels Package  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Kokkos_WorkGraphPolicy.hpp
1 //@HEADER
2 // ************************************************************************
3 //
4 // Kokkos v. 4.0
5 // Copyright (2022) National Technology & Engineering
6 // Solutions of Sandia, LLC (NTESS).
7 //
8 // Under the terms of Contract DE-NA0003525 with NTESS,
9 // the U.S. Government retains certain rights in this software.
10 //
11 // Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
12 // See https://kokkos.org/LICENSE for license information.
13 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
14 //
15 //@HEADER
16 
17 #ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18 #include <Kokkos_Macros.hpp>
19 static_assert(false,
20  "Including non-public Kokkos header files is not allowed.");
21 #endif
22 #ifndef KOKKOS_WORKGRAPHPOLICY_HPP
23 #define KOKKOS_WORKGRAPHPOLICY_HPP
24 
25 #include <impl/Kokkos_AnalyzePolicy.hpp>
26 #include <Kokkos_Crs.hpp>
27 
28 namespace Kokkos {
29 namespace Impl {
30 
31 template <class functor_type, class execution_space, class... policy_args>
32 class WorkGraphExec;
33 
34 }
35 } // namespace Kokkos
36 
37 namespace Kokkos {
38 
39 template <class... Properties>
40 class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
41  public:
42  using execution_policy = WorkGraphPolicy<Properties...>;
43  using self_type = WorkGraphPolicy<Properties...>;
44  using traits = Kokkos::Impl::PolicyTraits<Properties...>;
45  using index_type = typename traits::index_type;
46  using member_type = index_type;
47  using execution_space = typename traits::execution_space;
48  using memory_space = typename execution_space::memory_space;
50 
51  enum : std::int32_t {
52  END_TOKEN = -1,
53  BEGIN_TOKEN = -2,
54  COMPLETED_TOKEN = -3
55  };
56 
57  private:
59 
60  // Let N = m_graph.numRows(), the total work
61  // m_queue[ 0 .. N-1] = the ready queue
62  // m_queue[ N .. 2*N-1] = the waiting queue counts
63  // m_queue[2*N .. 2*N+2] = the ready queue hints
64 
65  graph_type const m_graph;
66  ints_type m_queue;
67 
68  KOKKOS_INLINE_FUNCTION
69  void push_work(const std::int32_t w) const noexcept {
70  const std::int32_t N = m_graph.numRows();
71 
72  std::int32_t* const ready_queue = &m_queue[0];
73  std::int32_t* const end_hint = &m_queue[2 * N + 1];
74 
75  // Push work to end of queue
76  const std::int32_t j = atomic_fetch_add(end_hint, 1);
77 
78  if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
79  // ERROR: past the end of queue or did not replace END_TOKEN
80  Kokkos::abort("WorkGraphPolicy push_work error");
81  }
82 
83  memory_fence();
84  }
85 
86  public:
101  KOKKOS_INLINE_FUNCTION
102  std::int32_t pop_work() const noexcept {
103  const std::int32_t N = m_graph.numRows();
104 
105  std::int32_t* const ready_queue = &m_queue[0];
106  std::int32_t* const begin_hint = &m_queue[2 * N];
107 
108  // begin hint is guaranteed to be less than or equal to
109  // actual begin location in the queue.
110 
111  for (std::int32_t i = Kokkos::atomic_load(begin_hint); i < N; ++i) {
112  const std::int32_t w = Kokkos::atomic_load(&ready_queue[i]);
113 
114  if (w == END_TOKEN) {
115  return END_TOKEN;
116  }
117 
118  if ((w != BEGIN_TOKEN) &&
119  (w == atomic_compare_exchange(ready_queue + i, w,
120  (std::int32_t)BEGIN_TOKEN))) {
121  // Attempt to claim ready work index succeeded,
122  // update the hint and return work index
123  atomic_increment(begin_hint);
124  return w;
125  }
126  // arrive here when ready_queue[i] == BEGIN_TOKEN
127  }
128 
129  return COMPLETED_TOKEN;
130  }
131 
132  KOKKOS_INLINE_FUNCTION
133  void completed_work(std::int32_t w) const noexcept {
134  Kokkos::memory_fence();
135 
136  // Make sure the completed work function's memory accesses are flushed.
137 
138  const std::int32_t N = m_graph.numRows();
139 
140  std::int32_t* const count_queue = &m_queue[N];
141 
142  const std::int32_t B = m_graph.row_map(w);
143  const std::int32_t E = m_graph.row_map(w + 1);
144 
145  for (std::int32_t i = B; i < E; ++i) {
146  const std::int32_t j = m_graph.entries(i);
147  if (1 == atomic_fetch_add(count_queue + j, -1)) {
148  push_work(j);
149  }
150  }
151  }
152 
153  struct TagInit {};
154  struct TagCount {};
155  struct TagReady {};
156 
163  KOKKOS_INLINE_FUNCTION
164  void operator()(const TagInit, int i) const noexcept {
165  m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
166  }
167 
168  KOKKOS_INLINE_FUNCTION
169  void operator()(const TagCount, int i) const noexcept {
170  std::int32_t* const count_queue = &m_queue[m_graph.numRows()];
171 
172  atomic_increment(count_queue + m_graph.entries[i]);
173  }
174 
175  KOKKOS_INLINE_FUNCTION
176  void operator()(const TagReady, int w) const noexcept {
177  std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
178 
179  if (0 == count_queue[w]) push_work(w);
180  }
181 
182  execution_space space() const { return execution_space(); }
183 
184  WorkGraphPolicy(const graph_type& arg_graph)
185  : m_graph(arg_graph),
186  m_queue(view_alloc("queue", WithoutInitializing),
187  arg_graph.numRows() * 2 + 2) {
188  { // Initialize
189  using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
191  const closure_type closure(*this, policy_type(0, m_queue.size()));
192  closure.execute();
193  execution_space().fence(
194  "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
195  "graph init");
196  }
197 
198  { // execute-after counts
199  using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
201  const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
202  closure.execute();
203  execution_space().fence(
204  "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
205  "graph count");
206  }
207 
208  { // Scheduling ready tasks
209  using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
211  const closure_type closure(*this, policy_type(0, m_graph.numRows()));
212  closure.execute();
213  execution_space().fence(
214  "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
215  "readied graph");
216  }
217  }
218 };
219 
220 } // namespace Kokkos
221 
222 #ifdef KOKKOS_ENABLE_SERIAL
223 #include "Serial/Kokkos_Serial_WorkGraphPolicy.hpp"
224 #endif
225 
226 #ifdef KOKKOS_ENABLE_OPENMP
227 #include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
228 #endif
229 
230 #ifdef KOKKOS_ENABLE_CUDA
231 #include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
232 #endif
233 
234 #ifdef KOKKOS_ENABLE_HIP
235 #include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp"
236 #endif
237 
238 #ifdef KOKKOS_ENABLE_THREADS
239 #include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
240 #endif
241 
242 #ifdef KOKKOS_ENABLE_HPX
243 #include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
244 #endif
245 
246 #endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Compressed row storage array.
Definition: Kokkos_Crs.hpp:63
Implementation of the ParallelFor operator that has a partial specialization for the device...