Kokkos Core Kernels Package  Version of the Day
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Groups Pages
Kokkos_HPX.hpp
1 //@HEADER
2 // ************************************************************************
3 //
4 // Kokkos v. 4.0
5 // Copyright (2022) National Technology & Engineering
6 // Solutions of Sandia, LLC (NTESS).
7 //
8 // Under the terms of Contract DE-NA0003525 with NTESS,
9 // the U.S. Government retains certain rights in this software.
10 //
11 // Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
12 // See https://kokkos.org/LICENSE for license information.
13 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
14 //
15 //@HEADER
16 
17 #ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18 #include <Kokkos_Macros.hpp>
19 static_assert(false,
20  "Including non-public Kokkos header files is not allowed.");
21 #endif
22 #ifndef KOKKOS_HPX_HPP
23 #define KOKKOS_HPX_HPP
24 
25 #include <Kokkos_Macros.hpp>
26 #if defined(KOKKOS_ENABLE_HPX)
27 
28 #include <Kokkos_Core_fwd.hpp>
29 
30 #include <Kokkos_HostSpace.hpp>
31 #include <cstddef>
32 #include <iosfwd>
33 
34 #ifdef KOKKOS_ENABLE_HBWSPACE
35 #include <Kokkos_HBWSpace.hpp>
36 #endif
37 
38 #include <Kokkos_HostSpace.hpp>
39 #include <Kokkos_Layout.hpp>
40 #include <Kokkos_MemoryTraits.hpp>
41 #include <Kokkos_Parallel.hpp>
42 #include <Kokkos_ScratchSpace.hpp>
43 #include <Kokkos_TaskScheduler.hpp>
44 #include <impl/Kokkos_ConcurrentBitset.hpp>
45 #include <impl/Kokkos_FunctorAnalysis.hpp>
46 #include <impl/Kokkos_Tools.hpp>
47 #include <impl/Kokkos_TaskQueue.hpp>
48 #include <impl/Kokkos_InitializationSettings.hpp>
49 
50 #include <KokkosExp_MDRangePolicy.hpp>
51 
52 #include <hpx/local/algorithm.hpp>
53 #include <hpx/local/barrier.hpp>
54 #include <hpx/local/condition_variable.hpp>
55 #include <hpx/local/execution.hpp>
56 #include <hpx/local/future.hpp>
57 #include <hpx/local/init.hpp>
58 #include <hpx/local/mutex.hpp>
59 #include <hpx/local/runtime.hpp>
60 #include <hpx/local/thread.hpp>
61 
62 #include <Kokkos_UniqueToken.hpp>
63 
64 #include <functional>
65 #include <iostream>
66 #include <memory>
67 #include <sstream>
68 #include <type_traits>
69 #include <vector>
70 
71 // There are currently two different implementations for the parallel dispatch
72 // functions:
73 //
74 // - 0: The HPX way. Unfortunately, this comes with unnecessary
75 // overheads at the moment, so there is
76 // - 1: The manual way. This uses for_loop, but only spawns one task per worker
77 // thread. This is significantly faster in most cases.
78 //
79 // In the long run 0 should be the preferred implementation, but until HPX is
80 // improved 1 will be the default.
81 #ifndef KOKKOS_HPX_IMPLEMENTATION
82 #define KOKKOS_HPX_IMPLEMENTATION 1
83 #endif
84 
85 #if (KOKKOS_HPX_IMPLEMENTATION < 0) || (KOKKOS_HPX_IMPLEMENTATION > 1)
86 #error "You have chosen an invalid value for KOKKOS_HPX_IMPLEMENTATION"
87 #endif
88 
89 // [note 1]
90 //
91 // When using the asynchronous backend and independent instances, we explicitly
92 // reset the shared data at the end of a parallel task (execute_task). We do
93 // this to avoid circular references with shared pointers that would otherwise
94 // never be released.
95 //
96 // The HPX instance holds shared data for the instance in a shared_ptr. One of
97 // the pieces of shared data is the future that we use to sequence parallel
98 // dispatches. When a parallel task is launched, a copy of the closure
99 // (ParallelFor, ParallelReduce, etc.) is captured in the task. The closure
100 // also holds the policy, the policy holds the HPX instance, the instance holds
101 // the shared data (for use of buffers in the parallel task). When attaching a
102 // continuation to a future, the continuation is stored in the future (shared
103 // state). This means that there is a cycle future -> continuation -> closure
104 // -> policy -> HPX -> shared data -> future. We break this by releasing the
105 // shared data early, as (the pointer to) the shared data will not be used
106 // anymore by the closure at the end of execute_task.
107 //
108 // We also mark the shared instance data as mutable so that we can reset it
109 // from the const execute_task member function.
110 
111 namespace Kokkos {
112 namespace Impl {
113 class thread_buffer {
114  static constexpr std::size_t m_cache_line_size = 64;
115 
116  std::size_t m_num_threads;
117  std::size_t m_size_per_thread;
118  std::size_t m_size_total;
119  char *m_data;
120 
121  void pad_to_cache_line(std::size_t &size) {
122  size = ((size + m_cache_line_size - 1) / m_cache_line_size) *
123  m_cache_line_size;
124  }
125 
126  public:
127  thread_buffer()
128  : m_num_threads(0),
129  m_size_per_thread(0),
130  m_size_total(0),
131  m_data(nullptr) {}
132  thread_buffer(const std::size_t num_threads,
133  const std::size_t size_per_thread) {
134  resize(num_threads, size_per_thread);
135  }
136  ~thread_buffer() { delete[] m_data; }
137 
138  thread_buffer(const thread_buffer &) = delete;
139  thread_buffer(thread_buffer &&) = delete;
140  thread_buffer &operator=(const thread_buffer &) = delete;
141  thread_buffer &operator=(thread_buffer) = delete;
142 
143  void resize(const std::size_t num_threads,
144  const std::size_t size_per_thread) {
145  m_num_threads = num_threads;
146  m_size_per_thread = size_per_thread;
147 
148  pad_to_cache_line(m_size_per_thread);
149 
150  std::size_t size_total_new = m_num_threads * m_size_per_thread;
151 
152  if (m_size_total < size_total_new) {
153  delete[] m_data;
154  m_data = new char[size_total_new];
155  m_size_total = size_total_new;
156  }
157  }
158 
159  char *get(std::size_t thread_num) {
160  assert(thread_num < m_num_threads);
161  if (m_data == nullptr) {
162  return nullptr;
163  }
164  return &m_data[thread_num * m_size_per_thread];
165  }
166 
167  std::size_t size_per_thread() const noexcept { return m_size_per_thread; }
168  std::size_t size_total() const noexcept { return m_size_total; }
169 };
170 } // namespace Impl
171 
172 namespace Experimental {
173 class HPX {
174  public:
175  static constexpr uint32_t impl_default_instance_id() { return 1; }
176 
177  private:
178  static bool m_hpx_initialized;
179  uint32_t m_instance_id = impl_default_instance_id();
180 
181 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
182  static std::atomic<uint32_t> m_next_instance_id;
183 
184  public:
185  enum class instance_mode { default_, independent };
186 
187  private:
188  static uint32_t m_active_parallel_region_count;
189  static hpx::spinlock m_active_parallel_region_count_mutex;
190  static hpx::condition_variable_any m_active_parallel_region_count_cond;
191 
192  struct instance_data {
193  instance_data() = default;
194  instance_data(hpx::shared_future<void> future) : m_future(future) {}
195  Kokkos::Impl::thread_buffer m_buffer;
196  hpx::shared_future<void> m_future = hpx::make_ready_future<void>();
197  hpx::spinlock m_future_mutex;
198  };
199 
200  mutable std::shared_ptr<instance_data> m_independent_instance_data;
201  static instance_data m_default_instance_data;
202 
203  std::reference_wrapper<Kokkos::Impl::thread_buffer> m_buffer;
204  std::reference_wrapper<hpx::shared_future<void>> m_future;
205  std::reference_wrapper<hpx::spinlock> m_future_mutex;
206 #else
207  static Kokkos::Impl::thread_buffer m_default_buffer;
208 #endif
209 
210  public:
211  using execution_space = HPX;
212  using memory_space = HostSpace;
213  using device_type = Kokkos::Device<execution_space, memory_space>;
214  using array_layout = LayoutRight;
215  using size_type = memory_space::size_type;
216  using scratch_memory_space = ScratchMemorySpace<HPX>;
217 
218 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
219  HPX()
220  noexcept
221  : m_instance_id(impl_default_instance_id()),
222  m_buffer(m_default_instance_data.m_buffer),
223  m_future(m_default_instance_data.m_future),
224  m_future_mutex(m_default_instance_data.m_future_mutex) {}
225 
226  HPX(instance_mode mode)
227  : m_instance_id(mode == instance_mode::independent
228  ? m_next_instance_id++
229  : impl_default_instance_id()),
230  m_independent_instance_data(mode == instance_mode::independent
231  ? (new instance_data())
232  : nullptr),
233  m_buffer(mode == instance_mode::independent
234  ? m_independent_instance_data->m_buffer
235  : m_default_instance_data.m_buffer),
236  m_future(mode == instance_mode::independent
237  ? m_independent_instance_data->m_future
238  : m_default_instance_data.m_future),
239  m_future_mutex(mode == instance_mode::independent
240  ? m_independent_instance_data->m_future_mutex
241  : m_default_instance_data.m_future_mutex) {}
242 
243  HPX(hpx::shared_future<void> future)
244  : m_instance_id(m_next_instance_id++),
245 
246  m_independent_instance_data(new instance_data(future)),
247  m_buffer(m_independent_instance_data->m_buffer),
248  m_future(m_independent_instance_data->m_future),
249  m_future_mutex(m_independent_instance_data->m_future_mutex) {}
250 
251  HPX(HPX &&other) = default;
252  HPX &operator=(HPX &&other) = default;
253  HPX(const HPX &other) = default;
254  HPX &operator=(const HPX &other) = default;
255 #else
256  HPX() noexcept {}
257 #endif
258 
259  void print_configuration(std::ostream &os, bool /*verbose*/ = false) const {
260  os << "HPX backend\n";
261  os << "HPX Execution Space:\n";
262  os << " KOKKOS_ENABLE_HPX: yes\n";
263  os << "\nHPX Runtime Configuration:\n";
264  }
265  uint32_t impl_instance_id() const noexcept { return m_instance_id; }
266 
267 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
268  static bool in_parallel(HPX const &instance = HPX()) noexcept {
269  return !instance.impl_get_future().is_ready();
270  }
271 #else
272  static bool in_parallel(HPX const & = HPX()) noexcept { return false; }
273 #endif
274 
275 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
276  static void impl_decrement_active_parallel_region_count() {
277  std::unique_lock<hpx::spinlock> l(m_active_parallel_region_count_mutex);
278  if (--m_active_parallel_region_count == 0) {
279  l.unlock();
280  m_active_parallel_region_count_cond.notify_all();
281  };
282  }
283 
284  static void impl_increment_active_parallel_region_count() {
285  std::unique_lock<hpx::spinlock> l(m_active_parallel_region_count_mutex);
286  ++m_active_parallel_region_count;
287  }
288 #endif
289 
290  void fence(
291  const std::string &name =
292  "Kokkos::Experimental::HPX::fence: Unnamed Instance Fence") const {
293  Kokkos::Tools::Experimental::Impl::profile_fence_event<
294  Kokkos::Experimental::HPX>(
295  name,
296  Kokkos::Tools::Experimental::Impl::DirectFenceIDHandle{
297  impl_instance_id()},
298  [&]() {
299 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
300  impl_get_future().wait();
301  // Reset the future to free variables that may have been captured in
302  // parallel regions.
303  impl_get_future() = hpx::make_ready_future<void>();
304 #endif
305  });
306  }
307 
308  static void impl_static_fence(const std::string &name) {
309  Kokkos::Tools::Experimental::Impl::profile_fence_event<
310  Kokkos::Experimental::HPX>(
311  name,
312  Kokkos::Tools::Experimental::SpecialSynchronizationCases::
313  GlobalDeviceSynchronization,
314  [&]() {
315 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
316  std::unique_lock<hpx::spinlock> l(
317  m_active_parallel_region_count_mutex);
318  m_active_parallel_region_count_cond.wait(
319  l, [&]() { return m_active_parallel_region_count == 0; });
320  // Reset the future to free variables that may have been captured in
321  // parallel regions (however, we don't have access to futures from
322  // instances other than the default instances, they will only be
323  // released by fence).
324  HPX().impl_get_future() = hpx::make_ready_future<void>();
325 #endif
326  });
327  }
328 
329  static hpx::execution::parallel_executor impl_get_executor() {
330  return hpx::execution::parallel_executor();
331  }
332 
333  static bool is_asynchronous(HPX const & = HPX()) noexcept {
334 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
335  return true;
336 #else
337  return false;
338 #endif
339  }
340 
341 #ifdef KOKKOS_ENABLE_DEPRECATED_CODE_3
342  template <typename F>
343  KOKKOS_DEPRECATED static void partition_master(
344  F const &, int requested_num_partitions = 0, int = 0) {
345  if (requested_num_partitions > 1) {
346  Kokkos::abort(
347  "Kokkos::Experimental::HPX::partition_master: can't partition an "
348  "HPX instance\n");
349  }
350  }
351 #endif
352 
353  static int concurrency();
354  static void impl_initialize(InitializationSettings const &);
355  static bool impl_is_initialized() noexcept;
356  static void impl_finalize();
357 
358  static int impl_thread_pool_size() noexcept {
359  hpx::runtime *rt = hpx::get_runtime_ptr();
360  if (rt == nullptr) {
361  return 0;
362  } else {
363  if (hpx::threads::get_self_ptr() == nullptr) {
364  return hpx::resource::get_thread_pool(0).get_os_thread_count();
365  } else {
366  return hpx::this_thread::get_pool()->get_os_thread_count();
367  }
368  }
369  }
370 
371  static int impl_thread_pool_rank() noexcept {
372  hpx::runtime *rt = hpx::get_runtime_ptr();
373  if (rt == nullptr) {
374  return 0;
375  } else {
376  if (hpx::threads::get_self_ptr() == nullptr) {
377  return 0;
378  } else {
379  return hpx::this_thread::get_pool()->get_pool_index();
380  }
381  }
382  }
383 
384  static int impl_thread_pool_size(int depth) {
385  if (depth == 0) {
386  return impl_thread_pool_size();
387  } else {
388  return 1;
389  }
390  }
391 
392  static int impl_max_hardware_threads() noexcept {
393  return hpx::threads::hardware_concurrency();
394  }
395 
396  static int impl_hardware_thread_id() noexcept {
397  return hpx::get_worker_thread_num();
398  }
399 
400  Kokkos::Impl::thread_buffer &impl_get_buffer() const noexcept {
401 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
402  return m_buffer.get();
403 #else
404  return m_default_buffer;
405 #endif
406  }
407 
408 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
409  hpx::shared_future<void> &impl_get_future() const noexcept {
410  return m_future;
411  }
412 
413  hpx::spinlock &impl_get_future_mutex() const noexcept {
414  return m_future_mutex;
415  }
416 #endif
417 
418 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
419  struct [[nodiscard]] reset_on_exit_parallel {
420  HPX const &m_space;
421  reset_on_exit_parallel(HPX const &space) : m_space(space) {}
422  ~reset_on_exit_parallel() {
423  // See [note 1] for an explanation. m_independent_instance_data is
424  // marked mutable.
425  m_space.m_independent_instance_data.reset();
426 
427  HPX::impl_decrement_active_parallel_region_count();
428  }
429  };
430 
431  // This struct is identical to the above except it does not reset the shared
432  // data. It does, however, still decrement the parallel region count. It is
433  // meant for use in parallel regions which do not capture the execution space
434  // instance.
435  struct [[nodiscard]] reset_count_on_exit_parallel {
436  reset_count_on_exit_parallel() = default;
437  ~reset_count_on_exit_parallel() {
438  HPX::impl_decrement_active_parallel_region_count();
439  }
440  };
441 #else
442  struct [[nodiscard]] reset_on_exit_parallel {
443  reset_on_exit_parallel(HPX const &) = default;
444  ~reset_on_exit_parallel() = default;
445  };
446 
447  struct [[nodiscard]] reset_count_on_exit_parallel {
448  reset_count_on_exit_parallel() = default;
449  ~reset_count_on_exit_parallel() = default;
450  };
451 #endif
452 
453  static constexpr const char *name() noexcept { return "HPX"; }
454 
455  private:
456  friend bool operator==(HPX const &lhs, HPX const &rhs) {
457  return lhs.m_instance_id == rhs.m_instance_id;
458  }
459  friend bool operator!=(HPX const &lhs, HPX const &rhs) {
460  return !(lhs == rhs);
461  }
462 };
463 } // namespace Experimental
464 
465 namespace Tools {
466 namespace Experimental {
467 template <>
468 struct DeviceTypeTraits<Kokkos::Experimental::HPX> {
469  static constexpr DeviceType id = DeviceType::HPX;
470  static int device_id(const Kokkos::Experimental::HPX &) { return 0; }
471 };
472 } // namespace Experimental
473 } // namespace Tools
474 
475 namespace Impl {
476 
477 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
478 template <typename Closure>
479 inline void dispatch_execute_task(Closure *closure,
480  Kokkos::Experimental::HPX const &instance,
481  bool force_synchronous = false) {
482  Kokkos::Experimental::HPX::impl_increment_active_parallel_region_count();
483 
484  Closure closure_copy = *closure;
485 
486  {
487  std::unique_lock<hpx::spinlock> l(instance.impl_get_future_mutex());
488  hpx::util::ignore_lock(&instance.impl_get_future_mutex());
489  hpx::shared_future<void> &fut = instance.impl_get_future();
490 
491  fut = fut.then(hpx::execution::parallel_executor(
492  hpx::threads::thread_schedule_hint(0)),
493  [closure_copy](hpx::shared_future<void> &&) {
494  return closure_copy.execute_task();
495  });
496  }
497 
498  if (force_synchronous) {
499  instance.fence(
500  "Kokkos::Experimental::Impl::HPX::dispatch_execute_task: fence due to "
501  "forced syncronizations");
502  }
503 }
504 #else
505 template <typename Closure>
506 inline void dispatch_execute_task(Closure *closure,
507  Kokkos::Experimental::HPX const &,
508  bool = false) {
509  closure->execute_task();
510 }
511 #endif
512 } // namespace Impl
513 } // namespace Kokkos
514 
515 namespace Kokkos {
516 namespace Impl {
517 template <>
518 struct MemorySpaceAccess<Kokkos::Experimental::HPX::memory_space,
519  Kokkos::Experimental::HPX::scratch_memory_space> {
520  enum : bool { assignable = false };
521  enum : bool { accessible = true };
522  enum : bool { deepcopy = false };
523 };
524 
525 } // namespace Impl
526 } // namespace Kokkos
527 
528 namespace Kokkos {
529 namespace Experimental {
530 template <>
531 class UniqueToken<HPX, UniqueTokenScope::Instance> {
532  private:
534  int m_count;
535  buffer_type m_buffer_view;
536  uint32_t volatile *m_buffer;
537 
538  public:
539  using execution_space = HPX;
540  using size_type = int;
541 
545  UniqueToken(execution_space const & = execution_space()) noexcept
546  : m_count(execution_space::impl_max_hardware_threads()),
547  m_buffer_view(buffer_type()),
548  m_buffer(nullptr) {}
549 
550  UniqueToken(size_type max_size, execution_space const & = execution_space())
551  : m_count(max_size > execution_space::impl_max_hardware_threads()
552  ? execution_space::impl_max_hardware_threads()
553  : max_size),
554  m_buffer_view(
555  max_size > execution_space::impl_max_hardware_threads()
556  ? buffer_type()
557  : buffer_type("UniqueToken::m_buffer_view",
558  ::Kokkos::Impl::concurrent_bitset::buffer_bound(
559  m_count))),
560  m_buffer(m_buffer_view.data()) {}
561 
563  KOKKOS_INLINE_FUNCTION
564  int size() const noexcept { return m_count; }
565 
567  KOKKOS_INLINE_FUNCTION
568  int acquire() const noexcept {
569  KOKKOS_IF_ON_HOST((
570  if (m_buffer == nullptr) {
571  return execution_space::impl_hardware_thread_id();
572  } else {
573  const ::Kokkos::pair<int, int> result =
574  ::Kokkos::Impl::concurrent_bitset::acquire_bounded(
575  m_buffer, m_count, ::Kokkos::Impl::clock_tic() % m_count);
576 
577  if (result.first < 0) {
578  ::Kokkos::abort(
579  "UniqueToken<HPX> failure to acquire tokens, no tokens "
580  "available");
581  }
582  return result.first;
583  }))
584 
585  KOKKOS_IF_ON_DEVICE((return 0;))
586  }
587 
589  KOKKOS_INLINE_FUNCTION
590  void release(int i) const noexcept {
591  KOKKOS_IF_ON_HOST((if (m_buffer != nullptr) {
592  ::Kokkos::Impl::concurrent_bitset::release(m_buffer, i);
593  }))
594 
595  KOKKOS_IF_ON_DEVICE(((void)i;))
596  }
597 };
598 
599 template <>
600 class UniqueToken<HPX, UniqueTokenScope::Global> {
601  public:
602  using execution_space = HPX;
603  using size_type = int;
604  UniqueToken(execution_space const & = execution_space()) noexcept {}
605 
606  // NOTE: Currently this assumes that there is no oversubscription.
607  // hpx::get_num_worker_threads can't be used directly because it may yield
608  // it's task (problematic if called after hpx::get_worker_thread_num).
609  int size() const noexcept { return HPX::impl_max_hardware_threads(); }
610  int acquire() const noexcept { return HPX::impl_hardware_thread_id(); }
611  void release(int) const noexcept {}
612 };
613 } // namespace Experimental
614 } // namespace Kokkos
615 
616 namespace Kokkos {
617 namespace Impl {
618 
619 struct HPXTeamMember {
620  public:
621  using execution_space = Kokkos::Experimental::HPX;
622  using scratch_memory_space =
624  using team_handle = HPXTeamMember;
625 
626  private:
627  scratch_memory_space m_team_shared;
628 
629  int m_league_size;
630  int m_league_rank;
631  int m_team_size;
632  int m_team_rank;
633 
634  public:
635  KOKKOS_INLINE_FUNCTION
636  const scratch_memory_space &team_shmem() const {
637  return m_team_shared.set_team_thread_mode(0, 1, 0);
638  }
639 
640  KOKKOS_INLINE_FUNCTION
641  const execution_space::scratch_memory_space &team_scratch(const int) const {
642  return m_team_shared.set_team_thread_mode(0, 1, 0);
643  }
644 
645  KOKKOS_INLINE_FUNCTION
646  const execution_space::scratch_memory_space &thread_scratch(const int) const {
647  return m_team_shared.set_team_thread_mode(0, team_size(), team_rank());
648  }
649 
650  KOKKOS_INLINE_FUNCTION int league_rank() const noexcept {
651  return m_league_rank;
652  }
653 
654  KOKKOS_INLINE_FUNCTION int league_size() const noexcept {
655  return m_league_size;
656  }
657 
658  KOKKOS_INLINE_FUNCTION int team_rank() const noexcept { return m_team_rank; }
659  KOKKOS_INLINE_FUNCTION int team_size() const noexcept { return m_team_size; }
660 
661  template <class... Properties>
662  constexpr KOKKOS_INLINE_FUNCTION HPXTeamMember(
663  const TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
664  &policy,
665  const int team_rank, const int league_rank, void *scratch,
666  size_t scratch_size) noexcept
667  : m_team_shared(scratch, scratch_size, scratch, scratch_size),
668  m_league_size(policy.league_size()),
669  m_league_rank(league_rank),
670  m_team_size(policy.team_size()),
671  m_team_rank(team_rank) {}
672 
673  KOKKOS_INLINE_FUNCTION
674  void team_barrier() const {}
675 
676  template <class ValueType>
677  KOKKOS_INLINE_FUNCTION void team_broadcast(ValueType &, const int &) const {}
678 
679  template <class Closure, class ValueType>
680  KOKKOS_INLINE_FUNCTION void team_broadcast(const Closure &closure,
681  ValueType &value,
682  const int &) const {
683  closure(value);
684  }
685 
686  template <class ValueType, class JoinOp>
687  KOKKOS_INLINE_FUNCTION ValueType team_reduce(const ValueType &value,
688  const JoinOp &) const {
689  return value;
690  }
691 
692  template <class ReducerType>
693  KOKKOS_INLINE_FUNCTION std::enable_if_t<is_reducer<ReducerType>::value>
694  team_reduce(const ReducerType &) const {}
695 
696  template <typename Type>
697  KOKKOS_INLINE_FUNCTION Type
698  team_scan(const Type &value, Type *const global_accum = nullptr) const {
699  if (global_accum) {
700  Kokkos::atomic_fetch_add(global_accum, value);
701  }
702 
703  return 0;
704  }
705 };
706 
707 template <class... Properties>
708 class TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
709  : public PolicyTraits<Properties...> {
710  int m_league_size;
711  int m_team_size;
712  std::size_t m_team_scratch_size[2];
713  std::size_t m_thread_scratch_size[2];
714  int m_chunk_size;
715 
716  public:
717  using traits = PolicyTraits<Properties...>;
718 
720  using execution_policy = TeamPolicyInternal;
721 
722  using member_type = HPXTeamMember;
723 
725  using execution_space = Kokkos::Experimental::HPX;
726 
727  // NOTE: Max size is 1 for simplicity. In most cases more than 1 is not
728  // necessary on CPU. Implement later if there is a need.
729  template <class FunctorType>
730  inline static int team_size_max(const FunctorType &) {
731  return 1;
732  }
733 
734  template <class FunctorType>
735  inline static int team_size_recommended(const FunctorType &) {
736  return 1;
737  }
738 
739  template <class FunctorType>
740  inline static int team_size_recommended(const FunctorType &, const int &) {
741  return 1;
742  }
743 
744  template <class FunctorType>
745  int team_size_max(const FunctorType &, const ParallelForTag &) const {
746  return 1;
747  }
748 
749  template <class FunctorType>
750  int team_size_max(const FunctorType &, const ParallelReduceTag &) const {
751  return 1;
752  }
753 
754  template <class FunctorType, class ReducerType>
755  int team_size_max(const FunctorType &, const ReducerType &,
756  const ParallelReduceTag &) const {
757  return 1;
758  }
759 
760  template <class FunctorType>
761  int team_size_recommended(const FunctorType &, const ParallelForTag &) const {
762  return 1;
763  }
764 
765  template <class FunctorType>
766  int team_size_recommended(const FunctorType &,
767  const ParallelReduceTag &) const {
768  return 1;
769  }
770 
771  template <class FunctorType, class ReducerType>
772  int team_size_recommended(const FunctorType &, const ReducerType &,
773  const ParallelReduceTag &) const {
774  return 1;
775  }
776 
777  static int vector_length_max() { return 1; }
778 
779  inline int impl_vector_length() noexcept { return 1; }
780  inline bool impl_auto_team_size() noexcept { return false; }
781  inline bool impl_auto_vector_length() noexcept { return false; }
782  inline void impl_set_vector_length(int) noexcept {}
783  inline void impl_set_team_size(int) noexcept {}
784 
785  private:
786  inline void init(const int league_size_request, const int team_size_request) {
787  m_league_size = league_size_request;
788  const int max_team_size = 1; // TODO: Can't use team_size_max(...) because
789  // it requires a functor as argument.
790  m_team_size =
791  team_size_request > max_team_size ? max_team_size : team_size_request;
792 
793  if (m_chunk_size > 0) {
794  if (!Impl::is_integral_power_of_two(m_chunk_size))
795  Kokkos::abort("TeamPolicy blocking granularity must be power of two");
796  } else {
797  int new_chunk_size = 1;
798  while (new_chunk_size * 4 * Kokkos::Experimental::HPX::concurrency() <
799  m_league_size) {
800  new_chunk_size *= 2;
801  }
802 
803  if (new_chunk_size < 128) {
804  new_chunk_size = 1;
805  while ((new_chunk_size * Kokkos::Experimental::HPX::concurrency() <
806  m_league_size) &&
807  (new_chunk_size < 128))
808  new_chunk_size *= 2;
809  }
810 
811  m_chunk_size = new_chunk_size;
812  }
813  }
814 
815  public:
816  inline int team_size() const { return m_team_size; }
817  inline int league_size() const { return m_league_size; }
818 
819  size_t scratch_size(const int &level, int team_size_ = -1) const {
820  if (team_size_ < 0) {
821  team_size_ = m_team_size;
822  }
823  return m_team_scratch_size[level] +
824  team_size_ * m_thread_scratch_size[level];
825  }
826 
827  inline static int scratch_size_max(int level) {
828  return (level == 0 ? 1024 * 32 : // Roughly L1 size
829  20 * 1024 * 1024); // Limit to keep compatibility with CUDA
830  }
831 
832  public:
833  template <class ExecSpace, class... OtherProperties>
834  friend class TeamPolicyInternal;
835 
836  const typename traits::execution_space &space() const {
837  static typename traits::execution_space m_space;
838  return m_space;
839  }
840 
841  template <class... OtherProperties>
842  TeamPolicyInternal(const TeamPolicyInternal<Kokkos::Experimental::HPX,
843  OtherProperties...> &p) {
844  m_league_size = p.m_league_size;
845  m_team_size = p.m_team_size;
846  m_team_scratch_size[0] = p.m_team_scratch_size[0];
847  m_thread_scratch_size[0] = p.m_thread_scratch_size[0];
848  m_team_scratch_size[1] = p.m_team_scratch_size[1];
849  m_thread_scratch_size[1] = p.m_thread_scratch_size[1];
850  m_chunk_size = p.m_chunk_size;
851  }
852 
853  TeamPolicyInternal(const typename traits::execution_space &,
854  int league_size_request, int team_size_request,
855  int /* vector_length_request */ = 1)
856  : m_team_scratch_size{0, 0},
857  m_thread_scratch_size{0, 0},
858  m_chunk_size(0) {
859  init(league_size_request, team_size_request);
860  }
861 
862  TeamPolicyInternal(const typename traits::execution_space &,
863  int league_size_request, const Kokkos::AUTO_t &,
864  int /* vector_length_request */ = 1)
865  : m_team_scratch_size{0, 0},
866  m_thread_scratch_size{0, 0},
867  m_chunk_size(0) {
868  init(league_size_request, 1);
869  }
870 
871  TeamPolicyInternal(const typename traits::execution_space &,
872  int league_size_request,
873  const Kokkos::AUTO_t &, /* team_size_request */
874  const Kokkos::AUTO_t & /* vector_length_request */)
875  : m_team_scratch_size{0, 0},
876  m_thread_scratch_size{0, 0},
877  m_chunk_size(0) {
878  init(league_size_request, 1);
879  }
880 
881  TeamPolicyInternal(const typename traits::execution_space &,
882  int league_size_request, int team_size_request,
883  const Kokkos::AUTO_t & /* vector_length_request */
884  )
885  : m_team_scratch_size{0, 0},
886  m_thread_scratch_size{0, 0},
887  m_chunk_size(0) {
888  init(league_size_request, team_size_request);
889  }
890 
891  TeamPolicyInternal(int league_size_request,
892  const Kokkos::AUTO_t &, /* team_size_request */
893  const Kokkos::AUTO_t & /* vector_length_request */)
894  : m_team_scratch_size{0, 0},
895  m_thread_scratch_size{0, 0},
896  m_chunk_size(0) {
897  init(league_size_request, 1);
898  }
899 
900  TeamPolicyInternal(int league_size_request, int team_size_request,
901  const Kokkos::AUTO_t & /* vector_length_request */
902  )
903  : m_team_scratch_size{0, 0},
904  m_thread_scratch_size{0, 0},
905  m_chunk_size(0) {
906  init(league_size_request, team_size_request);
907  }
908 
909  TeamPolicyInternal(int league_size_request, int team_size_request,
910  int /* vector_length_request */ = 1)
911  : m_team_scratch_size{0, 0},
912  m_thread_scratch_size{0, 0},
913  m_chunk_size(0) {
914  init(league_size_request, team_size_request);
915  }
916 
917  TeamPolicyInternal(int league_size_request, const Kokkos::AUTO_t &,
918  int /* vector_length_request */ = 1)
919  : m_team_scratch_size{0, 0},
920  m_thread_scratch_size{0, 0},
921  m_chunk_size(0) {
922  init(league_size_request, 1);
923  }
924 
925  inline int chunk_size() const { return m_chunk_size; }
926 
927  inline TeamPolicyInternal &set_chunk_size(
928  typename traits::index_type chunk_size_) {
929  m_chunk_size = chunk_size_;
930  return *this;
931  }
932 
933  inline TeamPolicyInternal &set_scratch_size(const int &level,
934  const PerTeamValue &per_team) {
935  m_team_scratch_size[level] = per_team.value;
936  return *this;
937  }
938 
939  inline TeamPolicyInternal &set_scratch_size(
940  const int &level, const PerThreadValue &per_thread) {
941  m_thread_scratch_size[level] = per_thread.value;
942  return *this;
943  }
944 
945  inline TeamPolicyInternal &set_scratch_size(
946  const int &level, const PerTeamValue &per_team,
947  const PerThreadValue &per_thread) {
948  m_team_scratch_size[level] = per_team.value;
949  m_thread_scratch_size[level] = per_thread.value;
950  return *this;
951  }
952 };
953 } // namespace Impl
954 } // namespace Kokkos
955 
956 namespace Kokkos {
957 namespace Impl {
958 
959 template <typename Policy>
960 typename Policy::member_type get_hpx_adjusted_chunk_size(Policy const &policy) {
961  const int concurrency = Kokkos::Experimental::HPX::concurrency();
962  const typename Policy::member_type n = policy.end() - policy.begin();
963  typename Policy::member_type new_chunk_size = policy.chunk_size();
964 
965  while (n >= 4 * concurrency * new_chunk_size) {
966  new_chunk_size *= 2;
967  }
968 
969  return new_chunk_size;
970 }
971 
972 template <class FunctorType, class... Traits>
973 class ParallelFor<FunctorType, Kokkos::RangePolicy<Traits...>,
974  Kokkos::Experimental::HPX> {
975  private:
976  using Policy = Kokkos::RangePolicy<Traits...>;
977  using WorkTag = typename Policy::work_tag;
978  using WorkRange = typename Policy::WorkRange;
979  using Member = typename Policy::member_type;
980 
981  const FunctorType m_functor;
982  const Policy m_policy;
983 
984  template <class TagType>
985  static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
986  const FunctorType &functor, const Member i) {
987  functor(i);
988  }
989 
990  template <class TagType>
991  static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
992  const FunctorType &functor, const Member i) {
993  const TagType t{};
994  functor(t, i);
995  }
996 
997  template <class TagType>
998  static std::enable_if_t<std::is_void<TagType>::value> execute_functor_range(
999  const FunctorType &functor, const Member i_begin, const Member i_end) {
1000  for (Member i = i_begin; i < i_end; ++i) {
1001  functor(i);
1002  }
1003  }
1004 
1005  template <class TagType>
1006  static std::enable_if_t<!std::is_void<TagType>::value> execute_functor_range(
1007  const FunctorType &functor, const Member i_begin, const Member i_end) {
1008  const TagType t{};
1009  for (Member i = i_begin; i < i_end; ++i) {
1010  functor(t, i);
1011  }
1012  }
1013 
1014  public:
1015  void execute() const {
1016  Kokkos::Impl::dispatch_execute_task(this, m_policy.space());
1017  }
1018 
1019  void execute_task() const {
1020  // See [note 1] for an explanation.
1021  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1022  m_policy.space());
1023 
1024  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1025 
1026  using hpx::execution::par;
1027  using hpx::execution::static_chunk_size;
1028 
1029 #if KOKKOS_HPX_IMPLEMENTATION == 0
1030  using hpx::for_loop;
1031 
1032  for_loop(par.on(exec).with(static_chunk_size(m_policy.chunk_size())),
1033  m_policy.begin(), m_policy.end(), [this](const Member i) {
1034  execute_functor<WorkTag>(m_functor, i);
1035  });
1036 
1037 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1038  using hpx::for_loop_strided;
1039 
1040  const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1041 
1042  for_loop_strided(
1043  par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1044  [this, chunk_size](const Member i_begin) {
1045  const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1046  execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
1047  });
1048 #endif
1049  }
1050 
1051  inline ParallelFor(const FunctorType &arg_functor, Policy arg_policy)
1052  : m_functor(arg_functor), m_policy(arg_policy) {}
1053 };
1054 
1055 template <class FunctorType, class... Traits>
1056 class ParallelFor<FunctorType, Kokkos::MDRangePolicy<Traits...>,
1057  Kokkos::Experimental::HPX> {
1058  private:
1059  using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1060  using Policy = typename MDRangePolicy::impl_range_policy;
1061  using WorkTag = typename MDRangePolicy::work_tag;
1062  using WorkRange = typename Policy::WorkRange;
1063  using Member = typename Policy::member_type;
1064  using iterate_type =
1065  typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1066  WorkTag, void>;
1067 
1068  const iterate_type m_iter;
1069  const Policy m_policy;
1070 
1071  public:
1072  void execute() const { dispatch_execute_task(this, m_iter.m_rp.space()); }
1073 
1074  inline void execute_task() const {
1075  // See [note 1] for an explanation.
1076  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1077  m_iter.m_rp.space());
1078 
1079  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1080 
1081  using hpx::execution::par;
1082  using hpx::execution::static_chunk_size;
1083 
1084 #if KOKKOS_HPX_IMPLEMENTATION == 0
1085  using hpx::for_loop;
1086 
1087  for_loop(par.on(exec).with(
1088  static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1089  m_policy.begin(), m_policy.end(),
1090  [this](const Member i) { iterate_type(i); });
1091 
1092 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1093  using hpx::for_loop_strided;
1094 
1095  const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1096 
1097  for_loop_strided(par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1098  [this, chunk_size](const Member i_begin) {
1099  const Member i_end =
1100  (std::min)(i_begin + chunk_size, m_policy.end());
1101  for (Member i = i_begin; i < i_end; ++i) {
1102  m_iter(i);
1103  }
1104  });
1105 #endif
1106  }
1107 
1108  inline ParallelFor(const FunctorType &arg_functor, MDRangePolicy arg_policy)
1109  : m_iter(arg_policy, arg_functor),
1110  m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)) {}
1111  template <typename Policy, typename Functor>
1112  static int max_tile_size_product(const Policy &, const Functor &) {
1118  return 1024;
1119  }
1120 };
1121 } // namespace Impl
1122 } // namespace Kokkos
1123 
1124 namespace Kokkos {
1125 namespace Impl {
1126 template <class FunctorType, class ReducerType, class... Traits>
1127 class ParallelReduce<FunctorType, Kokkos::RangePolicy<Traits...>, ReducerType,
1128  Kokkos::Experimental::HPX> {
1129  private:
1130  using Policy = Kokkos::RangePolicy<Traits...>;
1131  using WorkTag = typename Policy::work_tag;
1132  using WorkRange = typename Policy::WorkRange;
1133  using Member = typename Policy::member_type;
1134  using ReducerConditional =
1135  Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1136  FunctorType, ReducerType>;
1137  using ReducerTypeFwd = typename ReducerConditional::type;
1138  using Analysis =
1139  FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, ReducerTypeFwd>;
1140  using value_type = typename Analysis::value_type;
1141  using pointer_type = typename Analysis::pointer_type;
1142  using reference_type = typename Analysis::reference_type;
1143 
1144  const FunctorType m_functor;
1145  const Policy m_policy;
1146  const ReducerType m_reducer;
1147  const pointer_type m_result_ptr;
1148 
1149  bool m_force_synchronous;
1150 
1151  template <class TagType>
1152  inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1153  const FunctorType &functor, const Member i, reference_type update) {
1154  functor(i, update);
1155  }
1156 
1157  template <class TagType>
1158  inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1159  const FunctorType &functor, const Member i, reference_type update) {
1160  const TagType t{};
1161  functor(t, i, update);
1162  }
1163 
1164  template <class TagType>
1165  inline std::enable_if_t<std::is_void<TagType>::value> execute_functor_range(
1166  reference_type update, const Member i_begin, const Member i_end) const {
1167  for (Member i = i_begin; i < i_end; ++i) {
1168  m_functor(i, update);
1169  }
1170  }
1171 
1172  template <class TagType>
1173  inline std::enable_if_t<!std::is_void<TagType>::value> execute_functor_range(
1174  reference_type update, const Member i_begin, const Member i_end) const {
1175  const TagType t{};
1176 
1177  for (Member i = i_begin; i < i_end; ++i) {
1178  m_functor(t, i, update);
1179  }
1180  }
1181 
1182  class value_type_wrapper {
1183  private:
1184  std::size_t m_value_size;
1185  char *m_value_buffer;
1186 
1187  public:
1188  value_type_wrapper() : m_value_size(0), m_value_buffer(nullptr) {}
1189 
1190  value_type_wrapper(const std::size_t value_size)
1191  : m_value_size(value_size), m_value_buffer(new char[m_value_size]) {}
1192 
1193  value_type_wrapper(const value_type_wrapper &other)
1194  : m_value_size(0), m_value_buffer(nullptr) {
1195  if (this != &other) {
1196  m_value_buffer = new char[other.m_value_size];
1197  m_value_size = other.m_value_size;
1198 
1199  std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1200  m_value_buffer);
1201  }
1202  }
1203 
1204  ~value_type_wrapper() { delete[] m_value_buffer; }
1205 
1206  value_type_wrapper(value_type_wrapper &&other)
1207  : m_value_size(0), m_value_buffer(nullptr) {
1208  if (this != &other) {
1209  m_value_buffer = other.m_value_buffer;
1210  m_value_size = other.m_value_size;
1211 
1212  other.m_value_buffer = nullptr;
1213  other.m_value_size = 0;
1214  }
1215  }
1216 
1217  value_type_wrapper &operator=(const value_type_wrapper &other) {
1218  if (this != &other) {
1219  delete[] m_value_buffer;
1220  m_value_buffer = new char[other.m_value_size];
1221  m_value_size = other.m_value_size;
1222 
1223  std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
1224  m_value_buffer);
1225  }
1226 
1227  return *this;
1228  }
1229 
1230  value_type_wrapper &operator=(value_type_wrapper &&other) {
1231  if (this != &other) {
1232  delete[] m_value_buffer;
1233  m_value_buffer = other.m_value_buffer;
1234  m_value_size = other.m_value_size;
1235 
1236  other.m_value_buffer = nullptr;
1237  other.m_value_size = 0;
1238  }
1239 
1240  return *this;
1241  }
1242 
1243  pointer_type pointer() const {
1244  return reinterpret_cast<pointer_type>(m_value_buffer);
1245  }
1246 
1247  reference_type reference() const {
1248  return Analysis::Reducer::reference(
1249  reinterpret_cast<pointer_type>(m_value_buffer));
1250  }
1251  };
1252 
1253  public:
1254  void execute() const {
1255  if (m_policy.end() <= m_policy.begin()) {
1256  if (m_result_ptr) {
1257  typename Analysis::Reducer final_reducer(
1258  &ReducerConditional::select(m_functor, m_reducer));
1259 
1260  final_reducer.init(m_result_ptr);
1261  final_reducer.final(m_result_ptr);
1262  }
1263  return;
1264  }
1265  dispatch_execute_task(this, m_policy.space(), m_force_synchronous);
1266  }
1267 
1268  inline void execute_task() const {
1269  // See [note 1] for an explanation.
1270  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1271  m_policy.space());
1272 
1273  typename Analysis::Reducer final_reducer(
1274  &ReducerConditional::select(m_functor, m_reducer));
1275 
1276  const std::size_t value_size =
1277  Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1278 
1279  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1280 
1281  using hpx::for_loop;
1282  using hpx::execution::par;
1283  using hpx::execution::static_chunk_size;
1284 
1285 #if KOKKOS_HPX_IMPLEMENTATION == 0
1286  // NOTE: This version makes the most use of HPX functionality, but
1287  // requires the struct value_type_wrapper to handle different
1288  // reference_types. It is also significantly slower than the version
1289  // below due to not reusing the buffer used by other functions.
1290  using hpx::parallel::reduction;
1291 
1292  value_type_wrapper final_value(value_size);
1293  value_type_wrapper identity(value_size);
1294 
1295  final_reducer.init(final_value.pointer());
1296  final_reducer.init(identity.pointer());
1297 
1298  for_loop(par.on(exec).with(
1299  static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1300  m_policy.begin(), m_policy.end(),
1301  reduction(final_value, identity,
1302  [final_reducer](
1303  value_type_wrapper &a,
1304  value_type_wrapper &b) -> value_type_wrapper & {
1305  final_reducer.join(a.pointer(), b.pointer());
1306  return a;
1307  }),
1308  [this](Member i, value_type_wrapper &update) {
1309  execute_functor<WorkTag>(m_functor, i, update.reference());
1310  });
1311 
1312  pointer_type final_value_ptr = final_value.pointer();
1313 
1314 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1315  using hpx::for_loop_strided;
1316 
1317  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1318 
1319  thread_buffer &buffer = m_policy.space().impl_get_buffer();
1320  buffer.resize(num_worker_threads, value_size);
1321 
1322  for_loop(
1323  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1324  [&buffer, final_reducer ](const int t) noexcept {
1325  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1326  });
1327 
1328  const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1329 
1330  for_loop_strided(
1331  par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1332  [this, &buffer, chunk_size](const Member i_begin) {
1333  reference_type update = Analysis::Reducer::reference(
1334  reinterpret_cast<pointer_type>(buffer.get(
1335  Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1336  const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1337  execute_functor_range<WorkTag>(update, i_begin, i_end);
1338  });
1339 
1340  for (int i = 1; i < num_worker_threads; ++i) {
1341  final_reducer.join(reinterpret_cast<pointer_type>(buffer.get(0)),
1342  reinterpret_cast<pointer_type>(buffer.get(i)));
1343  }
1344 
1345  pointer_type final_value_ptr =
1346  reinterpret_cast<pointer_type>(buffer.get(0));
1347 #endif
1348 
1349  final_reducer.final(final_value_ptr);
1350 
1351  if (m_result_ptr != nullptr) {
1352  const int n = Analysis::value_count(
1353  ReducerConditional::select(m_functor, m_reducer));
1354 
1355  for (int j = 0; j < n; ++j) {
1356  m_result_ptr[j] = final_value_ptr[j];
1357  }
1358  }
1359  }
1360 
1361  template <class ViewType>
1362  inline ParallelReduce(
1363  const FunctorType &arg_functor, Policy arg_policy,
1364  const ViewType &arg_view,
1365  std::enable_if_t<Kokkos::is_view<ViewType>::value &&
1366  !Kokkos::is_reducer<ReducerType>::value,
1367  void *> = nullptr)
1368  : m_functor(arg_functor),
1369  m_policy(arg_policy),
1370  m_reducer(InvalidType()),
1371  m_result_ptr(arg_view.data()),
1372  m_force_synchronous(!arg_view.impl_track().has_record()) {}
1373 
1374  inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
1375  const ReducerType &reducer)
1376  : m_functor(arg_functor),
1377  m_policy(arg_policy),
1378  m_reducer(reducer),
1379  m_result_ptr(reducer.view().data()),
1380  m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1381 };
1382 
1383 template <class FunctorType, class ReducerType, class... Traits>
1384 class ParallelReduce<FunctorType, Kokkos::MDRangePolicy<Traits...>, ReducerType,
1385  Kokkos::Experimental::HPX> {
1386  private:
1387  using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1388  using Policy = typename MDRangePolicy::impl_range_policy;
1389  using WorkTag = typename MDRangePolicy::work_tag;
1390  using WorkRange = typename Policy::WorkRange;
1391  using Member = typename Policy::member_type;
1392  using ReducerConditional =
1393  Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1394  FunctorType, ReducerType>;
1395  using ReducerTypeFwd = typename ReducerConditional::type;
1396  using Analysis = FunctorAnalysis<FunctorPatternInterface::REDUCE,
1397  MDRangePolicy, ReducerTypeFwd>;
1398 
1399  using pointer_type = typename Analysis::pointer_type;
1400  using value_type = typename Analysis::value_type;
1401  using reference_type = typename Analysis::reference_type;
1402  using iterate_type =
1403  typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1404  WorkTag, reference_type>;
1405 
1406  const iterate_type m_iter;
1407  const Policy m_policy;
1408  const ReducerType m_reducer;
1409  const pointer_type m_result_ptr;
1410 
1411  bool m_force_synchronous;
1412 
1413  public:
1414  void execute() const {
1415  dispatch_execute_task(this, m_iter.m_rp.space(), m_force_synchronous);
1416  }
1417 
1418  inline void execute_task() const {
1419  // See [note 1] for an explanation.
1420  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1421  m_iter.m_rp.space());
1422 
1423  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1424  const std::size_t value_size = Analysis::value_size(
1425  ReducerConditional::select(m_iter.m_func, m_reducer));
1426 
1427  thread_buffer &buffer = m_iter.m_rp.space().impl_get_buffer();
1428  buffer.resize(num_worker_threads, value_size);
1429 
1430  using hpx::for_loop;
1431  using hpx::execution::par;
1432  using hpx::execution::static_chunk_size;
1433 
1434  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1435 
1436  typename Analysis::Reducer final_reducer(
1437  &ReducerConditional::select(m_iter.m_func, m_reducer));
1438 
1439 #if KOKKOS_HPX_IMPLEMENTATION == 0
1440 
1441  for_loop(
1442  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1443  [&buffer, final_reducer](std::size_t t) {
1444  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1445  });
1446 
1447  for_loop(par.on(exec).with(
1448  static_chunk_size(get_hpx_adjusted_chunk_size(m_policy))),
1449  m_policy.begin(), m_policy.end(), [this, &buffer](const Member i) {
1450  reference_type update = Analysis::Reducer::reference(
1451  reinterpret_cast<pointer_type>(buffer.get(
1452  Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1453  m_iter(i, update);
1454  });
1455 
1456 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1457  using hpx::for_loop_strided;
1458 
1459  for_loop(
1460  par.on(exec).with(static_chunk_size(1)), std::size_t(0),
1461  num_worker_threads, [&buffer, final_reducer](const std::size_t t) {
1462  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1463  });
1464 
1465  const Member chunk_size = get_hpx_adjusted_chunk_size(m_policy);
1466 
1467  for_loop_strided(
1468  par.on(exec), m_policy.begin(), m_policy.end(), chunk_size,
1469  [this, &buffer, chunk_size](const Member i_begin) {
1470  reference_type update = Analysis::Reducer::reference(
1471  reinterpret_cast<pointer_type>(buffer.get(
1472  Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1473  const Member i_end = (std::min)(i_begin + chunk_size, m_policy.end());
1474 
1475  for (Member i = i_begin; i < i_end; ++i) {
1476  m_iter(i, update);
1477  }
1478  });
1479 #endif
1480 
1481  for (int i = 1; i < num_worker_threads; ++i) {
1482  final_reducer.join(reinterpret_cast<pointer_type>(buffer.get(0)),
1483  reinterpret_cast<pointer_type>(buffer.get(i)));
1484  }
1485 
1486  final_reducer.final(reinterpret_cast<pointer_type>(buffer.get(0)));
1487 
1488  if (m_result_ptr != nullptr) {
1489  const int n = Analysis::value_count(
1490  ReducerConditional::select(m_iter.m_func, m_reducer));
1491 
1492  for (int j = 0; j < n; ++j) {
1493  m_result_ptr[j] = reinterpret_cast<pointer_type>(buffer.get(0))[j];
1494  }
1495  }
1496  }
1497 
1498  template <class ViewType>
1499  inline ParallelReduce(
1500  const FunctorType &arg_functor, MDRangePolicy arg_policy,
1501  const ViewType &arg_view,
1502  std::enable_if_t<Kokkos::is_view<ViewType>::value &&
1503  !Kokkos::is_reducer<ReducerType>::value,
1504  void *> = nullptr)
1505  : m_iter(arg_policy, arg_functor),
1506  m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)),
1507  m_reducer(InvalidType()),
1508  m_result_ptr(arg_view.data()),
1509  m_force_synchronous(!arg_view.impl_track().has_record()) {}
1510 
1511  inline ParallelReduce(const FunctorType &arg_functor,
1512  MDRangePolicy arg_policy, const ReducerType &reducer)
1513  : m_iter(arg_policy, arg_functor),
1514  m_policy(Policy(0, arg_policy.m_num_tiles).set_chunk_size(1)),
1515  m_reducer(reducer),
1516  m_result_ptr(reducer.view().data()),
1517  m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1518  template <typename Policy, typename Functor>
1519  static int max_tile_size_product(const Policy &, const Functor &) {
1525  return 1024;
1526  }
1527 };
1528 } // namespace Impl
1529 } // namespace Kokkos
1530 
1531 namespace Kokkos {
1532 namespace Impl {
1533 
1534 template <class FunctorType, class... Traits>
1535 class ParallelScan<FunctorType, Kokkos::RangePolicy<Traits...>,
1536  Kokkos::Experimental::HPX> {
1537  private:
1538  using Policy = Kokkos::RangePolicy<Traits...>;
1539  using WorkTag = typename Policy::work_tag;
1540  using WorkRange = typename Policy::WorkRange;
1541  using Member = typename Policy::member_type;
1542  using Analysis =
1543  FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1544  using pointer_type = typename Analysis::pointer_type;
1545  using reference_type = typename Analysis::reference_type;
1546  using value_type = typename Analysis::value_type;
1547 
1548  const FunctorType m_functor;
1549  const Policy m_policy;
1550 
1551  template <class TagType>
1552  inline static std::enable_if_t<std::is_void<TagType>::value>
1553  execute_functor_range(const FunctorType &functor, const Member i_begin,
1554  const Member i_end, reference_type update,
1555  const bool final) {
1556  for (Member i = i_begin; i < i_end; ++i) {
1557  functor(i, update, final);
1558  }
1559  }
1560 
1561  template <class TagType>
1562  inline static std::enable_if_t<!std::is_void<TagType>::value>
1563  execute_functor_range(const FunctorType &functor, const Member i_begin,
1564  const Member i_end, reference_type update,
1565  const bool final) {
1566  const TagType t{};
1567  for (Member i = i_begin; i < i_end; ++i) {
1568  functor(t, i, update, final);
1569  }
1570  }
1571 
1572  public:
1573  void execute() const { dispatch_execute_task(this, m_policy.space()); }
1574 
1575  inline void execute_task() const {
1576  // See [note 1] for an explanation.
1577  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1578  m_policy.space());
1579 
1580  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1581  const int value_count = Analysis::value_count(m_functor);
1582  const std::size_t value_size = Analysis::value_size(m_functor);
1583 
1584  thread_buffer &buffer = m_policy.space().impl_get_buffer();
1585  buffer.resize(num_worker_threads, 2 * value_size);
1586 
1587  using hpx::barrier;
1588  using hpx::for_loop;
1589  using hpx::execution::par;
1590  using hpx::execution::static_chunk_size;
1591 
1592  barrier<> bar(num_worker_threads);
1593  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1594 
1595  typename Analysis::Reducer final_reducer(&m_functor);
1596 
1597  for_loop(
1598  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1599  [this, &bar, &buffer, num_worker_threads, value_count, value_size,
1600  final_reducer](int t) {
1601  reference_type update_sum =
1602  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1603 
1604  const WorkRange range(m_policy, t, num_worker_threads);
1605  execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1606  update_sum, false);
1607 
1608  bar.arrive_and_wait();
1609 
1610  if (t == 0) {
1611  final_reducer.init(
1612  reinterpret_cast<pointer_type>(buffer.get(0) + value_size));
1613 
1614  for (int i = 1; i < num_worker_threads; ++i) {
1615  pointer_type ptr_1_prev =
1616  reinterpret_cast<pointer_type>(buffer.get(i - 1));
1617  pointer_type ptr_2_prev = reinterpret_cast<pointer_type>(
1618  buffer.get(i - 1) + value_size);
1619  pointer_type ptr_2 =
1620  reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1621 
1622  for (int j = 0; j < value_count; ++j) {
1623  ptr_2[j] = ptr_2_prev[j];
1624  }
1625 
1626  final_reducer.join(ptr_2, ptr_1_prev);
1627  }
1628  }
1629 
1630  bar.arrive_and_wait();
1631 
1632  reference_type update_base = Analysis::Reducer::reference(
1633  reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1634 
1635  execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1636  update_base, true);
1637  });
1638  }
1639 
1640  inline ParallelScan(const FunctorType &arg_functor, const Policy &arg_policy)
1641  : m_functor(arg_functor), m_policy(arg_policy) {}
1642 };
1643 
1644 template <class FunctorType, class ReturnType, class... Traits>
1645 class ParallelScanWithTotal<FunctorType, Kokkos::RangePolicy<Traits...>,
1646  ReturnType, Kokkos::Experimental::HPX> {
1647  private:
1648  using Policy = Kokkos::RangePolicy<Traits...>;
1649  using WorkTag = typename Policy::work_tag;
1650  using WorkRange = typename Policy::WorkRange;
1651  using Member = typename Policy::member_type;
1652  using Analysis =
1653  FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1654  using pointer_type = typename Analysis::pointer_type;
1655  using reference_type = typename Analysis::reference_type;
1656  using value_type = typename Analysis::value_type;
1657 
1658  const FunctorType m_functor;
1659  const Policy m_policy;
1660  pointer_type m_result_ptr;
1661 
1662  template <class TagType>
1663  inline static std::enable_if_t<std::is_void<TagType>::value>
1664  execute_functor_range(const FunctorType &functor, const Member i_begin,
1665  const Member i_end, reference_type update,
1666  const bool final) {
1667  for (Member i = i_begin; i < i_end; ++i) {
1668  functor(i, update, final);
1669  }
1670  }
1671 
1672  template <class TagType>
1673  inline static std::enable_if_t<!std::is_void<TagType>::value>
1674  execute_functor_range(const FunctorType &functor, const Member i_begin,
1675  const Member i_end, reference_type update,
1676  const bool final) {
1677  const TagType t{};
1678  for (Member i = i_begin; i < i_end; ++i) {
1679  functor(t, i, update, final);
1680  }
1681  }
1682 
1683  public:
1684  void execute() const { dispatch_execute_task(this, m_policy.space()); }
1685 
1686  inline void execute_task() const {
1687  // See [note 1] for an explanation.
1688  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1689  m_policy.space());
1690 
1691  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1692  const int value_count = Analysis::value_count(m_functor);
1693  const std::size_t value_size = Analysis::value_size(m_functor);
1694 
1695  thread_buffer &buffer = m_policy.space().impl_get_buffer();
1696  buffer.resize(num_worker_threads, 2 * value_size);
1697 
1698  using hpx::barrier;
1699  using hpx::for_loop;
1700  using hpx::execution::par;
1701  using hpx::execution::static_chunk_size;
1702 
1703  barrier<> bar(num_worker_threads);
1704  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1705 
1706  typename Analysis::Reducer final_reducer(&m_functor);
1707 
1708  for_loop(
1709  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1710  [this, &bar, &buffer, num_worker_threads, value_count, value_size,
1711  final_reducer](int t) {
1712  reference_type update_sum =
1713  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1714 
1715  const WorkRange range(m_policy, t, num_worker_threads);
1716  execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1717  update_sum, false);
1718 
1719  bar.arrive_and_wait();
1720 
1721  if (t == 0) {
1722  final_reducer.init(
1723  reinterpret_cast<pointer_type>(buffer.get(0) + value_size));
1724 
1725  for (int i = 1; i < num_worker_threads; ++i) {
1726  pointer_type ptr_1_prev =
1727  reinterpret_cast<pointer_type>(buffer.get(i - 1));
1728  pointer_type ptr_2_prev = reinterpret_cast<pointer_type>(
1729  buffer.get(i - 1) + value_size);
1730  pointer_type ptr_2 =
1731  reinterpret_cast<pointer_type>(buffer.get(i) + value_size);
1732 
1733  for (int j = 0; j < value_count; ++j) {
1734  ptr_2[j] = ptr_2_prev[j];
1735  }
1736 
1737  final_reducer.join(ptr_2, ptr_1_prev);
1738  }
1739  }
1740 
1741  bar.arrive_and_wait();
1742 
1743  reference_type update_base = Analysis::Reducer::reference(
1744  reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1745 
1746  execute_functor_range<WorkTag>(m_functor, range.begin(), range.end(),
1747  update_base, true);
1748 
1749  if (t == num_worker_threads - 1) {
1750  *m_result_ptr = update_base;
1751  }
1752  });
1753  }
1754 
1755  template <class ViewType>
1756  ParallelScanWithTotal(const FunctorType &arg_functor,
1757  const Policy &arg_policy,
1758  const ViewType &arg_result_view)
1759  : m_functor(arg_functor),
1760  m_policy(arg_policy),
1761  m_result_ptr(arg_result_view.data()) {
1762  static_assert(
1763  Kokkos::Impl::MemorySpaceAccess<typename ViewType::memory_space,
1764  Kokkos::HostSpace>::accessible,
1765  "Kokkos::HPX parallel_scan result must be host-accessible!");
1766  }
1767 };
1768 } // namespace Impl
1769 } // namespace Kokkos
1770 
1771 namespace Kokkos {
1772 namespace Impl {
1773 template <class FunctorType, class... Properties>
1774 class ParallelFor<FunctorType, Kokkos::TeamPolicy<Properties...>,
1775  Kokkos::Experimental::HPX> {
1776  private:
1777  using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1778  using WorkTag = typename Policy::work_tag;
1779  using Member = typename Policy::member_type;
1780  using memory_space = Kokkos::HostSpace;
1781 
1782  const FunctorType m_functor;
1783  const Policy m_policy;
1784  const int m_league;
1785  const std::size_t m_shared;
1786 
1787  template <class TagType>
1788  inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1789  const FunctorType &functor, const Policy &policy, const int league_rank,
1790  char *local_buffer, const std::size_t local_buffer_size) {
1791  functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1792  }
1793 
1794  template <class TagType>
1795  inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1796  const FunctorType &functor, const Policy &policy, const int league_rank,
1797  char *local_buffer, const std::size_t local_buffer_size) {
1798  const TagType t{};
1799  functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1800  }
1801 
1802  template <class TagType>
1803  inline static std::enable_if_t<std::is_void<TagType>::value>
1804  execute_functor_range(const FunctorType &functor, const Policy &policy,
1805  const int league_rank_begin, const int league_rank_end,
1806  char *local_buffer,
1807  const std::size_t local_buffer_size) {
1808  for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1809  ++league_rank) {
1810  functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1811  }
1812  }
1813 
1814  template <class TagType>
1815  inline static std::enable_if_t<!std::is_void<TagType>::value>
1816  execute_functor_range(const FunctorType &functor, const Policy &policy,
1817  const int league_rank_begin, const int league_rank_end,
1818  char *local_buffer,
1819  const std::size_t local_buffer_size) {
1820  const TagType t{};
1821  for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1822  ++league_rank) {
1823  functor(t,
1824  Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1825  }
1826  }
1827 
1828  public:
1829  void execute() const { dispatch_execute_task(this, m_policy.space()); }
1830 
1831  inline void execute_task() const {
1832  // See [note 1] for an explanation.
1833  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1834  m_policy.space());
1835 
1836  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1837 
1838  thread_buffer &buffer = m_policy.space().impl_get_buffer();
1839  buffer.resize(num_worker_threads, m_shared);
1840 
1841  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1842 
1843  using hpx::execution::par;
1844  using hpx::execution::static_chunk_size;
1845 
1846 #if KOKKOS_HPX_IMPLEMENTATION == 0
1847  using hpx::for_loop;
1848 
1849  for_loop(
1850  par.on(exec).with(static_chunk_size(m_policy.chunk_size())), 0,
1851  m_policy.league_size(), [this, &buffer](const int league_rank) {
1852  execute_functor<WorkTag>(
1853  m_functor, m_policy, league_rank,
1854  buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
1855  m_shared);
1856  });
1857 
1858 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1859  using hpx::for_loop_strided;
1860 
1861  for_loop_strided(
1862  par.on(exec), 0, m_policy.league_size(), m_policy.chunk_size(),
1863  [this, &buffer](const int league_rank_begin) {
1864  const int league_rank_end =
1865  (std::min)(league_rank_begin + m_policy.chunk_size(),
1866  m_policy.league_size());
1867  execute_functor_range<WorkTag>(
1868  m_functor, m_policy, league_rank_begin, league_rank_end,
1869  buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
1870  m_shared);
1871  });
1872 #endif
1873  }
1874 
1875  ParallelFor(const FunctorType &arg_functor, const Policy &arg_policy)
1876  : m_functor(arg_functor),
1877  m_policy(arg_policy),
1878  m_league(arg_policy.league_size()),
1879  m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
1880  FunctorTeamShmemSize<FunctorType>::value(
1881  arg_functor, arg_policy.team_size())) {}
1882 };
1883 
1884 template <class FunctorType, class ReducerType, class... Properties>
1885 class ParallelReduce<FunctorType, Kokkos::TeamPolicy<Properties...>,
1886  ReducerType, Kokkos::Experimental::HPX> {
1887  private:
1888  using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1889  using Member = typename Policy::member_type;
1890  using WorkTag = typename Policy::work_tag;
1891  using ReducerConditional =
1892  Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1893  FunctorType, ReducerType>;
1894  using ReducerTypeFwd = typename ReducerConditional::type;
1895  using Analysis =
1896  FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, ReducerTypeFwd>;
1897  using pointer_type = typename Analysis::pointer_type;
1898  using reference_type = typename Analysis::reference_type;
1899  using value_type = typename Analysis::value_type;
1900 
1901  const FunctorType m_functor;
1902  const int m_league;
1903  const Policy m_policy;
1904  const ReducerType m_reducer;
1905  pointer_type m_result_ptr;
1906  const std::size_t m_shared;
1907 
1908  bool m_force_synchronous;
1909 
1910  template <class TagType>
1911  inline static std::enable_if_t<std::is_void<TagType>::value> execute_functor(
1912  const FunctorType &functor, const Policy &policy, const int league_rank,
1913  char *local_buffer, const std::size_t local_buffer_size,
1914  reference_type update) {
1915  functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1916  update);
1917  }
1918 
1919  template <class TagType>
1920  inline static std::enable_if_t<!std::is_void<TagType>::value> execute_functor(
1921  const FunctorType &functor, const Policy &policy, const int league_rank,
1922  char *local_buffer, const std::size_t local_buffer_size,
1923  reference_type update) {
1924  const TagType t{};
1925  functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1926  update);
1927  }
1928 
1929  template <class TagType>
1930  inline static std::enable_if_t<std::is_void<TagType>::value>
1931  execute_functor_range(const FunctorType &functor, const Policy &policy,
1932  const int league_rank_begin, const int league_rank_end,
1933  char *local_buffer, const std::size_t local_buffer_size,
1934  reference_type update) {
1935  for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1936  ++league_rank) {
1937  functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1938  update);
1939  }
1940  }
1941 
1942  template <class TagType>
1943  inline static std::enable_if_t<!std::is_void<TagType>::value>
1944  execute_functor_range(const FunctorType &functor, const Policy &policy,
1945  const int league_rank_begin, const int league_rank_end,
1946  char *local_buffer, const std::size_t local_buffer_size,
1947  reference_type update) {
1948  const TagType t{};
1949  for (int league_rank = league_rank_begin; league_rank < league_rank_end;
1950  ++league_rank) {
1951  functor(t,
1952  Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1953  update);
1954  }
1955  }
1956 
1957  public:
1958  void execute() const {
1959  if (m_policy.league_size() * m_policy.team_size() == 0) {
1960  if (m_result_ptr) {
1961  typename Analysis::Reducer final_reducer(
1962  &ReducerConditional::select(m_functor, m_reducer));
1963  final_reducer.init(m_result_ptr);
1964  final_reducer.final(m_result_ptr);
1965  }
1966  return;
1967  }
1968  dispatch_execute_task(this, m_policy.space());
1969  }
1970 
1971  inline void execute_task() const {
1972  // See [note 1] for an explanation.
1973  Kokkos::Experimental::HPX::reset_on_exit_parallel reset_on_exit(
1974  m_policy.space());
1975 
1976  const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1977  const std::size_t value_size =
1978  Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1979 
1980  thread_buffer &buffer = m_policy.space().impl_get_buffer();
1981  buffer.resize(num_worker_threads, value_size + m_shared);
1982 
1983  auto exec = Kokkos::Experimental::HPX::impl_get_executor();
1984 
1985  using hpx::for_loop;
1986  using hpx::execution::par;
1987  using hpx::execution::static_chunk_size;
1988 
1989  typename Analysis::Reducer final_reducer(
1990  &ReducerConditional::select(m_functor, m_reducer));
1991 
1992 #if KOKKOS_HPX_IMPLEMENTATION == 0
1993 
1994  for_loop(
1995  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
1996  [&buffer, final_reducer](const std::size_t t) {
1997  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
1998  });
1999 
2000  for_loop(par.on(exec).with(static_chunk_size(m_policy.chunk_size())), 0,
2001  m_policy.league_size(),
2002  [this, &buffer, value_size](const int league_rank) {
2003  std::size_t t =
2004  Kokkos::Experimental::HPX::impl_hardware_thread_id();
2005  reference_type update = Analysis::Reducer::reference(
2006  reinterpret_cast<pointer_type>(buffer.get(t)));
2007 
2008  execute_functor<WorkTag>(m_functor, m_policy, league_rank,
2009  buffer.get(t) + value_size, m_shared,
2010  update);
2011  });
2012 
2013 #elif KOKKOS_HPX_IMPLEMENTATION == 1
2014  using hpx::for_loop_strided;
2015 
2016  for_loop(
2017  par.on(exec).with(static_chunk_size(1)), 0, num_worker_threads,
2018  [&buffer, final_reducer](std::size_t const t) {
2019  final_reducer.init(reinterpret_cast<pointer_type>(buffer.get(t)));
2020  });
2021 
2022  for_loop_strided(
2023  par.on(exec), 0, m_policy.league_size(), m_policy.chunk_size(),
2024  [this, &buffer, value_size](int const league_rank_begin) {
2025  std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
2026  reference_type update = Analysis::Reducer::reference(
2027  reinterpret_cast<pointer_type>(buffer.get(t)));
2028  const int league_rank_end =
2029  (std::min)(league_rank_begin + m_policy.chunk_size(),
2030  m_policy.league_size());
2031  execute_functor_range<WorkTag>(
2032  m_functor, m_policy, league_rank_begin, league_rank_end,
2033  buffer.get(t) + value_size, m_shared, update);
2034  });
2035 #endif
2036 
2037  const pointer_type ptr = reinterpret_cast<pointer_type>(buffer.get(0));
2038  for (int t = 1; t < num_worker_threads; ++t) {
2039  final_reducer.join(ptr, reinterpret_cast<pointer_type>(buffer.get(t)));
2040  }
2041 
2042  final_reducer.final(ptr);
2043 
2044  if (m_result_ptr) {
2045  const int n = Analysis::value_count(
2046  ReducerConditional::select(m_functor, m_reducer));
2047 
2048  for (int j = 0; j < n; ++j) {
2049  m_result_ptr[j] = ptr[j];
2050  }
2051  }
2052  }
2053 
2054  template <class ViewType>
2055  ParallelReduce(const FunctorType &arg_functor, const Policy &arg_policy,
2056  const ViewType &arg_result,
2057  std::enable_if_t<Kokkos::is_view<ViewType>::value &&
2058  !Kokkos::is_reducer<ReducerType>::value,
2059  void *> = nullptr)
2060  : m_functor(arg_functor),
2061  m_league(arg_policy.league_size()),
2062  m_policy(arg_policy),
2063  m_reducer(InvalidType()),
2064  m_result_ptr(arg_result.data()),
2065  m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2066  FunctorTeamShmemSize<FunctorType>::value(
2067  m_functor, arg_policy.team_size())),
2068  m_force_synchronous(!arg_result.impl_track().has_record()) {}
2069 
2070  inline ParallelReduce(const FunctorType &arg_functor, Policy arg_policy,
2071  const ReducerType &reducer)
2072  : m_functor(arg_functor),
2073  m_league(arg_policy.league_size()),
2074  m_policy(arg_policy),
2075  m_reducer(reducer),
2076  m_result_ptr(reducer.view().data()),
2077  m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
2078  FunctorTeamShmemSize<FunctorType>::value(
2079  arg_functor, arg_policy.team_size())),
2080  m_force_synchronous(!reducer.view().impl_track().has_record()) {}
2081 };
2082 } // namespace Impl
2083 } // namespace Kokkos
2084 
2085 namespace Kokkos {
2086 
2087 template <typename iType>
2088 KOKKOS_INLINE_FUNCTION
2089  Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2090  TeamThreadRange(const Impl::HPXTeamMember &thread, const iType &count) {
2091  return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2092  thread, count);
2093 }
2094 
2095 template <typename iType1, typename iType2>
2096 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2097  std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2098 TeamThreadRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2099  const iType2 &i_end) {
2100  using iType = std::common_type_t<iType1, iType2>;
2101  return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2102  thread, iType(i_begin), iType(i_end));
2103 }
2104 
2105 template <typename iType>
2106 KOKKOS_INLINE_FUNCTION
2107  Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2108  TeamVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2109  return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2110  thread, count);
2111 }
2112 
2113 template <typename iType1, typename iType2>
2114 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
2115  std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2116 TeamVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2117  const iType2 &i_end) {
2118  using iType = std::common_type_t<iType1, iType2>;
2119  return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2120  thread, iType(i_begin), iType(i_end));
2121 }
2122 
2123 template <typename iType>
2124 KOKKOS_INLINE_FUNCTION
2125  Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2126  ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType &count) {
2127  return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2128  thread, count);
2129 }
2130 
2131 template <typename iType1, typename iType2>
2132 KOKKOS_INLINE_FUNCTION Impl::ThreadVectorRangeBoundariesStruct<
2133  std::common_type_t<iType1, iType2>, Impl::HPXTeamMember>
2134 ThreadVectorRange(const Impl::HPXTeamMember &thread, const iType1 &i_begin,
2135  const iType2 &i_end) {
2136  using iType = std::common_type_t<iType1, iType2>;
2137  return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
2138  thread, iType(i_begin), iType(i_end));
2139 }
2140 
2141 KOKKOS_INLINE_FUNCTION
2142 Impl::ThreadSingleStruct<Impl::HPXTeamMember> PerTeam(
2143  const Impl::HPXTeamMember &thread) {
2144  return Impl::ThreadSingleStruct<Impl::HPXTeamMember>(thread);
2145 }
2146 
2147 KOKKOS_INLINE_FUNCTION
2148 Impl::VectorSingleStruct<Impl::HPXTeamMember> PerThread(
2149  const Impl::HPXTeamMember &thread) {
2150  return Impl::VectorSingleStruct<Impl::HPXTeamMember>(thread);
2151 }
2152 
2158 template <typename iType, class Lambda>
2159 KOKKOS_INLINE_FUNCTION void parallel_for(
2160  const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2161  &loop_boundaries,
2162  const Lambda &lambda) {
2163  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2164  i += loop_boundaries.increment)
2165  lambda(i);
2166 }
2167 
2174 template <typename iType, class Lambda, typename ValueType>
2175 KOKKOS_INLINE_FUNCTION void parallel_reduce(
2176  const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2177  &loop_boundaries,
2178  const Lambda &lambda, ValueType &result) {
2179  result = ValueType();
2180  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2181  i += loop_boundaries.increment) {
2182  lambda(i, result);
2183  }
2184 }
2185 
2191 template <typename iType, class Lambda>
2192 KOKKOS_INLINE_FUNCTION void parallel_for(
2193  const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2194  &loop_boundaries,
2195  const Lambda &lambda) {
2196 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2197 #pragma ivdep
2198 #endif
2199  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2200  i += loop_boundaries.increment) {
2201  lambda(i);
2202  }
2203 }
2204 
2211 template <typename iType, class Lambda, typename ValueType>
2212 KOKKOS_INLINE_FUNCTION void parallel_reduce(
2213  const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2214  &loop_boundaries,
2215  const Lambda &lambda, ValueType &result) {
2216  result = ValueType();
2217 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2218 #pragma ivdep
2219 #endif
2220  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2221  i += loop_boundaries.increment) {
2222  lambda(i, result);
2223  }
2224 }
2225 
2226 template <typename iType, class Lambda, typename ReducerType>
2227 KOKKOS_INLINE_FUNCTION void parallel_reduce(
2228  const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2229  &loop_boundaries,
2230  const Lambda &lambda, const ReducerType &reducer) {
2231  reducer.init(reducer.reference());
2232  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2233  i += loop_boundaries.increment) {
2234  lambda(i, reducer.reference());
2235  }
2236 }
2237 
2238 template <typename iType, class Lambda, typename ReducerType>
2239 KOKKOS_INLINE_FUNCTION void parallel_reduce(
2240  const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2241  &loop_boundaries,
2242  const Lambda &lambda, const ReducerType &reducer) {
2243  reducer.init(reducer.reference());
2244 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2245 #pragma ivdep
2246 #endif
2247  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2248  i += loop_boundaries.increment) {
2249  lambda(i, reducer.reference());
2250  }
2251 }
2252 
2253 template <typename iType, class FunctorType>
2254 KOKKOS_INLINE_FUNCTION void parallel_scan(
2255  Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember> const
2256  &loop_boundaries,
2257  const FunctorType &lambda) {
2258  using value_type = typename Kokkos::Impl::FunctorAnalysis<
2259  Kokkos::Impl::FunctorPatternInterface::SCAN, void,
2260  FunctorType>::value_type;
2261 
2262  value_type scan_val = value_type();
2263 
2264  // Intra-member scan
2265  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2266  i += loop_boundaries.increment) {
2267  lambda(i, scan_val, false);
2268  }
2269 
2270  // 'scan_val' output is the exclusive prefix sum
2271  scan_val = loop_boundaries.thread.team_scan(scan_val);
2272 
2273  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2274  i += loop_boundaries.increment) {
2275  lambda(i, scan_val, true);
2276  }
2277 }
2278 
2290 template <typename iType, class FunctorType>
2291 KOKKOS_INLINE_FUNCTION void parallel_scan(
2292  const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2293  &loop_boundaries,
2294  const FunctorType &lambda) {
2295  using value_type =
2296  typename Impl::FunctorAnalysis<Impl::FunctorPatternInterface::SCAN,
2297  TeamPolicy<Experimental::HPX>,
2298  FunctorType>::value_type;
2299 
2300  value_type scan_val = value_type();
2301 
2302 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2303 #pragma ivdep
2304 #endif
2305  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2306  i += loop_boundaries.increment) {
2307  lambda(i, scan_val, true);
2308  }
2309 }
2310 
2314 template <typename iType, class FunctorType, typename ReducerType>
2315 KOKKOS_INLINE_FUNCTION std::enable_if_t<Kokkos::is_reducer<ReducerType>::value>
2316 parallel_scan(
2317  const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
2318  &loop_boundaries,
2319  const FunctorType &lambda, const ReducerType &reducer) {
2320  typename ReducerType::value_type scan_val;
2321  reducer.init(scan_val);
2322 
2323 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
2324 #pragma ivdep
2325 #endif
2326  for (iType i = loop_boundaries.start; i < loop_boundaries.end;
2327  i += loop_boundaries.increment) {
2328  lambda(i, scan_val, true);
2329  }
2330 }
2331 
2332 template <class FunctorType>
2333 KOKKOS_INLINE_FUNCTION void single(
2334  const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2335  const FunctorType &lambda) {
2336  lambda();
2337 }
2338 
2339 template <class FunctorType>
2340 KOKKOS_INLINE_FUNCTION void single(
2341  const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2342  const FunctorType &lambda) {
2343  lambda();
2344 }
2345 
2346 template <class FunctorType, class ValueType>
2347 KOKKOS_INLINE_FUNCTION void single(
2348  const Impl::VectorSingleStruct<Impl::HPXTeamMember> &,
2349  const FunctorType &lambda, ValueType &val) {
2350  lambda(val);
2351 }
2352 
2353 template <class FunctorType, class ValueType>
2354 KOKKOS_INLINE_FUNCTION void single(
2355  const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &,
2356  const FunctorType &lambda, ValueType &val) {
2357  lambda(val);
2358 }
2359 
2360 } // namespace Kokkos
2361 
2362 #include <HPX/Kokkos_HPX_Task.hpp>
2363 
2364 #endif /* #if defined( KOKKOS_ENABLE_HPX ) */
2365 #endif /* #ifndef KOKKOS_HPX_HPP */
KOKKOS_INLINE_FUNCTION size_type acquire() const
Scratch memory space associated with an execution space.
View to an array of data.
Memory management for host memory.
UniqueToken(execution_space const &=execution_space())
KOKKOS_INLINE_FUNCTION void release(size_type) const
ReturnType
KOKKOS_INLINE_FUNCTION size_type size() const
Execution policy for work over a range of an integral type.
Access relationship between DstMemorySpace and SrcMemorySpace.