44 #ifndef KOKKOS_HPX_HPP
45 #define KOKKOS_HPX_HPP
47 #include <Kokkos_Macros.hpp>
48 #if defined(KOKKOS_ENABLE_HPX)
50 #include <Kokkos_Core_fwd.hpp>
52 #include <Kokkos_HostSpace.hpp>
56 #ifdef KOKKOS_ENABLE_HBWSPACE
57 #include <Kokkos_HBWSpace.hpp>
60 #include <Kokkos_HostSpace.hpp>
62 #include <Kokkos_MemoryTraits.hpp>
64 #include <Kokkos_ScratchSpace.hpp>
65 #include <Kokkos_TaskScheduler.hpp>
66 #include <impl/Kokkos_FunctorAdapter.hpp>
67 #include <impl/Kokkos_FunctorAnalysis.hpp>
68 #include <impl/Kokkos_Profiling_Interface.hpp>
69 #include <impl/Kokkos_Tags.hpp>
70 #include <impl/Kokkos_TaskQueue.hpp>
72 #include <KokkosExp_MDRangePolicy.hpp>
74 #include <hpx/apply.hpp>
75 #include <hpx/hpx_start.hpp>
76 #include <hpx/lcos/local/barrier.hpp>
77 #include <hpx/lcos/local/counting_semaphore.hpp>
78 #include <hpx/parallel/algorithms/for_loop.hpp>
79 #include <hpx/parallel/algorithms/reduce.hpp>
80 #include <hpx/parallel/executors/static_chunk_size.hpp>
81 #include <hpx/runtime.hpp>
82 #include <hpx/runtime/threads/run_as_hpx_thread.hpp>
83 #include <hpx/runtime/threads/threadmanager.hpp>
89 #include <type_traits>
103 #ifndef KOKKOS_HPX_IMPLEMENTATION
104 #define KOKKOS_HPX_IMPLEMENTATION 1
107 #if (KOKKOS_HPX_IMPLEMENTATION < 0) || (KOKKOS_HPX_IMPLEMENTATION > 1)
108 #error "You have chosen an invalid value for KOKKOS_HPX_IMPLEMENTATION"
113 class thread_buffer {
114 static constexpr std::size_t m_cache_line_size = 64;
116 std::size_t m_num_threads;
117 std::size_t m_size_per_thread;
118 std::size_t m_size_total;
121 void pad_to_cache_line(std::size_t &size) {
122 size = ((size + m_cache_line_size - 1) / m_cache_line_size) *
128 : m_num_threads(0), m_size_per_thread(0), m_size_total(0),
130 thread_buffer(
const std::size_t num_threads,
131 const std::size_t size_per_thread) {
132 resize(num_threads, size_per_thread);
134 ~thread_buffer() {
delete[] m_data; }
136 thread_buffer(
const thread_buffer &) =
delete;
137 thread_buffer(thread_buffer &&) =
delete;
138 thread_buffer &operator=(
const thread_buffer &) =
delete;
139 thread_buffer &operator=(thread_buffer) =
delete;
141 void resize(
const std::size_t num_threads,
142 const std::size_t size_per_thread) {
143 m_num_threads = num_threads;
144 m_size_per_thread = size_per_thread;
146 pad_to_cache_line(m_size_per_thread);
148 std::size_t size_total_new = m_num_threads * m_size_per_thread;
150 if (m_size_total < size_total_new) {
152 m_data =
new char[size_total_new];
153 m_size_total = size_total_new;
157 char *
get(std::size_t thread_num) {
158 assert(thread_num < m_num_threads);
159 if (m_data ==
nullptr) {
162 return &m_data[thread_num * m_size_per_thread];
165 std::size_t size_per_thread() const noexcept {
return m_size_per_thread; }
166 std::size_t size_total() const noexcept {
return m_size_total; }
170 namespace Experimental {
173 static bool m_hpx_initialized;
174 static Kokkos::Impl::thread_buffer m_buffer;
175 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
176 static hpx::future<void> m_future;
180 using execution_space = HPX;
181 using memory_space = HostSpace;
182 using device_type = Kokkos::Device<execution_space, memory_space>;
183 using array_layout = LayoutRight;
184 using size_type = memory_space::size_type;
185 using scratch_memory_space = ScratchMemorySpace<HPX>;
189 const bool =
false) {
190 std::cout <<
"HPX backend" << std::endl;
193 static bool in_parallel(HPX
const & = HPX()) noexcept {
return false; }
194 static void impl_static_fence(HPX
const & = HPX())
195 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
197 if (hpx::threads::get_self_ptr() ==
nullptr) {
198 hpx::threads::run_as_hpx_thread([]() { impl_get_future().wait(); });
200 impl_get_future().wait();
208 #ifdef KOKKOS_ENABLE_DEPRECATED_CODE
209 static void fence(HPX
const & = HPX()) {
216 static bool is_asynchronous(HPX
const & = HPX()) noexcept {
217 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
224 static std::vector<HPX> partition(...) {
225 Kokkos::abort(
"Kokkos::Experimental::HPX::partition_master: can't partition an HPX "
227 return std::vector<HPX>();
230 template <
typename F>
231 static void partition_master(F
const &f,
int requested_num_partitions = 0,
232 int requested_partition_size = 0) {
233 if (requested_num_partitions > 1) {
234 Kokkos::abort(
"Kokkos::Experimental::HPX::partition_master: can't partition an "
239 static int concurrency();
240 static void impl_initialize(
int thread_count);
241 static void impl_initialize();
242 static bool impl_is_initialized() noexcept;
243 static
void impl_finalize();
245 static
int impl_thread_pool_size() noexcept {
246 hpx::runtime *rt = hpx::get_runtime_ptr();
250 if (hpx::threads::get_self_ptr() ==
nullptr) {
251 return hpx::resource::get_thread_pool(0).get_os_thread_count();
253 return hpx::this_thread::get_pool()->get_os_thread_count();
258 static int impl_thread_pool_rank() noexcept {
259 hpx::runtime *rt = hpx::get_runtime_ptr();
263 if (hpx::threads::get_self_ptr() ==
nullptr) {
266 return hpx::this_thread::get_pool()->get_pool_index();
271 static int impl_thread_pool_size(
int depth) {
273 return impl_thread_pool_size();
279 static int impl_max_hardware_threads() noexcept {
280 return hpx::threads::hardware_concurrency();
283 static int impl_hardware_thread_id() noexcept {
284 return hpx::get_worker_thread_num();
287 static Kokkos::Impl::thread_buffer &impl_get_buffer() noexcept {
290 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
291 static hpx::future<void> &impl_get_future() noexcept {
return m_future; }
294 static constexpr
const char *name() noexcept {
return "HPX"; }
299 template <
typename Closure>
300 inline void dispatch_execute_task(Closure *closure) {
301 #if defined(KOKKOS_ENABLE_HPX_ASYNC_DISPATCH)
302 if (hpx::threads::get_self_ptr() ==
nullptr) {
303 hpx::threads::run_as_hpx_thread([closure]() {
304 hpx::future<void> &fut = Kokkos::Experimental::HPX::impl_get_future();
305 Closure closure_copy = *closure;
306 fut = fut.then([closure_copy](hpx::future<void> &&) {
307 closure_copy.execute_task();
311 hpx::future<void> &fut = Kokkos::Experimental::HPX::impl_get_future();
312 Closure closure_copy = *closure;
314 [closure_copy](hpx::future<void> &&) { closure_copy.execute_task(); });
317 if (hpx::threads::get_self_ptr() ==
nullptr) {
318 hpx::threads::run_as_hpx_thread([closure]() { closure->execute_task(); });
320 closure->execute_task();
330 struct MemorySpaceAccess<Kokkos::Experimental::HPX::memory_space,
331 Kokkos::Experimental::HPX::scratch_memory_space> {
332 enum { assignable =
false };
333 enum { accessible =
true };
334 enum { deepcopy =
false };
338 struct VerifyExecutionCanAccessMemorySpace<
339 Kokkos::Experimental::HPX::memory_space,
340 Kokkos::Experimental::HPX::scratch_memory_space> {
341 enum { value =
true };
342 inline static void verify(
void) {}
343 inline static void verify(
const void *) {}
349 namespace Experimental {
350 template <>
class UniqueToken<HPX, UniqueTokenScope::Instance> {
352 using execution_space = HPX;
353 using size_type = int;
354 UniqueToken(execution_space
const & = execution_space()) noexcept {}
359 int size() const noexcept {
return HPX::impl_max_hardware_threads(); }
360 int acquire() const noexcept {
return HPX::impl_hardware_thread_id(); }
361 void release(
int) const noexcept {}
364 template <>
class UniqueToken<HPX, UniqueTokenScope::Global> {
366 using execution_space = HPX;
367 using size_type = int;
368 UniqueToken(execution_space
const & = execution_space()) noexcept {}
373 int size() const noexcept {
return HPX::impl_max_hardware_threads(); }
374 int acquire() const noexcept {
return HPX::impl_hardware_thread_id(); }
375 void release(
int) const noexcept {}
383 struct HPXTeamMember {
385 using execution_space = Kokkos::Experimental::HPX;
386 using scratch_memory_space =
390 scratch_memory_space m_team_shared;
391 std::size_t m_team_shared_size;
399 KOKKOS_INLINE_FUNCTION
400 const scratch_memory_space &team_shmem()
const {
401 return m_team_shared.set_team_thread_mode(0, 1, 0);
404 KOKKOS_INLINE_FUNCTION
405 const execution_space::scratch_memory_space &team_scratch(
const int)
const {
406 return m_team_shared.set_team_thread_mode(0, 1, 0);
409 KOKKOS_INLINE_FUNCTION
410 const execution_space::scratch_memory_space &thread_scratch(
const int)
const {
411 return m_team_shared.set_team_thread_mode(0, team_size(), team_rank());
414 KOKKOS_INLINE_FUNCTION
int league_rank() const noexcept {
415 return m_league_rank;
418 KOKKOS_INLINE_FUNCTION
int league_size() const noexcept {
419 return m_league_size;
422 KOKKOS_INLINE_FUNCTION
int team_rank() const noexcept {
return m_team_rank; }
423 KOKKOS_INLINE_FUNCTION
int team_size() const noexcept {
return m_team_size; }
425 template <
class... Properties>
426 constexpr KOKKOS_INLINE_FUNCTION
427 HPXTeamMember(
const TeamPolicyInternal<Kokkos::Experimental::HPX,
428 Properties...> &policy,
429 const int team_rank,
const int league_rank,
void *scratch,
430 int scratch_size) noexcept
431 : m_team_shared(scratch, scratch_size, scratch, scratch_size),
432 m_team_shared_size(scratch_size), m_league_size(policy.league_size()),
433 m_league_rank(league_rank), m_team_size(policy.team_size()),
434 m_team_rank(team_rank) {}
436 KOKKOS_INLINE_FUNCTION
437 void team_barrier()
const {}
439 template <
class ValueType>
440 KOKKOS_INLINE_FUNCTION
void team_broadcast(ValueType &,
const int &)
const {
441 static_assert(std::is_trivially_default_constructible<ValueType>(),
442 "Only trivial constructible types can be broadcasted");
445 template <
class Closure,
class ValueType>
446 KOKKOS_INLINE_FUNCTION
void team_broadcast(
const Closure &, ValueType &,
448 static_assert(std::is_trivially_default_constructible<ValueType>(),
449 "Only trivial constructible types can be broadcasted");
452 template <
class ValueType,
class JoinOp>
453 KOKKOS_INLINE_FUNCTION ValueType team_reduce(
const ValueType &value,
454 const JoinOp &)
const {
458 template <
class ReducerType>
459 KOKKOS_INLINE_FUNCTION
460 typename std::enable_if<is_reducer<ReducerType>::value>::type
461 team_reduce(
const ReducerType &reducer)
const {}
463 template <
typename Type>
464 KOKKOS_INLINE_FUNCTION Type
465 team_scan(
const Type &value, Type *
const global_accum =
nullptr)
const {
467 Kokkos::atomic_fetch_add(global_accum, value);
474 template <
class... Properties>
475 class TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>
476 :
public PolicyTraits<Properties...> {
477 using traits = PolicyTraits<Properties...>;
481 std::size_t m_team_scratch_size[2];
482 std::size_t m_thread_scratch_size[2];
486 using member_type = HPXTeamMember;
490 template <
class FunctorType>
491 inline static int team_size_max(
const FunctorType &) {
495 template <
class FunctorType>
496 inline static int team_size_recommended(
const FunctorType &) {
500 template <
class FunctorType>
501 inline static int team_size_recommended(
const FunctorType &,
const int &) {
505 template <
class FunctorType>
506 int team_size_max(
const FunctorType &,
const ParallelForTag &)
const {
510 template <
class FunctorType>
511 int team_size_max(
const FunctorType &,
const ParallelReduceTag &)
const {
514 template <
class FunctorType>
515 int team_size_recommended(
const FunctorType &,
const ParallelForTag &)
const {
518 template <
class FunctorType>
519 int team_size_recommended(
const FunctorType &,
520 const ParallelReduceTag &)
const {
525 inline void init(
const int league_size_request,
const int team_size_request) {
526 m_league_size = league_size_request;
527 const int max_team_size = 1;
530 team_size_request > max_team_size ? max_team_size : team_size_request;
532 if (m_chunk_size > 0) {
533 if (!Impl::is_integral_power_of_two(m_chunk_size))
534 Kokkos::abort(
"TeamPolicy blocking granularity must be power of two");
536 int new_chunk_size = 1;
537 while (new_chunk_size * 4 * Kokkos::Experimental::HPX::concurrency() <
542 if (new_chunk_size < 128) {
544 while ((new_chunk_size * Kokkos::Experimental::HPX::concurrency() <
546 (new_chunk_size < 128))
550 m_chunk_size = new_chunk_size;
555 inline int team_size()
const {
return m_team_size; }
556 inline int league_size()
const {
return m_league_size; }
558 inline size_t scratch_size(
const int &level,
int team_size_ = -1)
const {
559 if (team_size_ < 0) {
560 team_size_ = m_team_size;
562 return m_team_scratch_size[level] +
563 team_size_ * m_thread_scratch_size[level];
567 template <
class ExecSpace,
class... OtherProperties>
568 friend class TeamPolicyInternal;
570 template <
class... OtherProperties>
572 const TeamPolicyInternal<Kokkos::Experimental::HPX, OtherProperties...> &p) {
573 m_league_size = p.m_league_size;
574 m_team_size = p.m_team_size;
575 m_team_scratch_size[0] = p.m_team_scratch_size[0];
576 m_thread_scratch_size[0] = p.m_thread_scratch_size[0];
577 m_team_scratch_size[1] = p.m_team_scratch_size[1];
578 m_thread_scratch_size[1] = p.m_thread_scratch_size[1];
579 m_chunk_size = p.m_chunk_size;
582 TeamPolicyInternal(
const typename traits::execution_space &,
583 int league_size_request,
int team_size_request,
585 : m_team_scratch_size{0, 0}, m_thread_scratch_size{0, 0},
587 init(league_size_request, team_size_request);
590 TeamPolicyInternal(
const typename traits::execution_space &,
591 int league_size_request,
592 const Kokkos::AUTO_t &team_size_request,
594 : m_team_scratch_size{0, 0}, m_thread_scratch_size{0, 0},
596 init(league_size_request, 1);
599 TeamPolicyInternal(
int league_size_request,
int team_size_request,
601 : m_team_scratch_size{0, 0}, m_thread_scratch_size{0, 0},
603 init(league_size_request, team_size_request);
606 TeamPolicyInternal(
int league_size_request,
607 const Kokkos::AUTO_t &team_size_request,
609 : m_team_scratch_size{0, 0}, m_thread_scratch_size{0, 0},
611 init(league_size_request, 1);
614 inline int chunk_size()
const {
return m_chunk_size; }
616 inline TeamPolicyInternal &
617 set_chunk_size(
typename traits::index_type chunk_size_) {
618 m_chunk_size = chunk_size_;
622 inline TeamPolicyInternal &set_scratch_size(
const int &level,
623 const PerTeamValue &per_team) {
624 m_team_scratch_size[level] = per_team.value;
628 inline TeamPolicyInternal &
629 set_scratch_size(
const int &level,
const PerThreadValue &per_thread) {
630 m_thread_scratch_size[level] = per_thread.value;
634 inline TeamPolicyInternal &
635 set_scratch_size(
const int &level,
const PerTeamValue &per_team,
636 const PerThreadValue &per_thread) {
637 m_team_scratch_size[level] = per_team.value;
638 m_thread_scratch_size[level] = per_thread.value;
648 template <
class FunctorType,
class... Traits>
649 class ParallelFor<FunctorType, Kokkos::RangePolicy<Traits...>,
650 Kokkos::Experimental::HPX> {
653 using WorkTag =
typename Policy::work_tag;
654 using WorkRange =
typename Policy::WorkRange;
655 using Member =
typename Policy::member_type;
657 const FunctorType m_functor;
658 const Policy m_policy;
660 template <
class TagType>
661 static typename std::enable_if<std::is_same<TagType, void>::value>::type
662 execute_functor(
const FunctorType &functor,
const Member i) {
666 template <
class TagType>
667 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
668 execute_functor(
const FunctorType &functor,
const Member i) {
673 template <
class TagType>
674 static typename std::enable_if<std::is_same<TagType, void>::value>::type
675 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
676 const Member i_end) {
677 for (Member i = i_begin; i < i_end; ++i) {
682 template <
class TagType>
683 static typename std::enable_if<!std::is_same<TagType, void>::value>::type
684 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
685 const Member i_end) {
687 for (Member i = i_begin; i < i_end; ++i) {
693 void execute()
const { Kokkos::Impl::dispatch_execute_task(
this); }
695 void execute_task()
const {
696 #if KOKKOS_HPX_IMPLEMENTATION == 0
697 using hpx::parallel::for_loop;
698 using hpx::parallel::execution::par;
699 using hpx::parallel::execution::static_chunk_size;
701 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
702 m_policy.begin(), m_policy.end(), [
this](
const Member i) {
703 execute_functor<WorkTag>(m_functor, i);
706 #elif KOKKOS_HPX_IMPLEMENTATION == 1
708 using hpx::lcos::local::counting_semaphore;
710 counting_semaphore sem(0);
711 std::size_t num_tasks = 0;
713 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
714 i_begin += m_policy.chunk_size()) {
715 apply([
this, &sem, i_begin]() {
717 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
718 execute_functor_range<WorkTag>(m_functor, i_begin, i_end);
730 inline ParallelFor(
const FunctorType &arg_functor, Policy arg_policy)
731 : m_functor(arg_functor), m_policy(arg_policy) {}
734 template <
class FunctorType,
class... Traits>
735 class ParallelFor<FunctorType, Kokkos::MDRangePolicy<Traits...>,
736 Kokkos::Experimental::HPX> {
738 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
739 using Policy =
typename MDRangePolicy::impl_range_policy;
740 using WorkTag =
typename MDRangePolicy::work_tag;
741 using WorkRange =
typename Policy::WorkRange;
742 using Member =
typename Policy::member_type;
744 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
747 const FunctorType m_functor;
748 const MDRangePolicy m_mdr_policy;
749 const Policy m_policy;
752 void execute()
const { dispatch_execute_task(
this); }
754 inline void execute_task()
const {
755 #if KOKKOS_HPX_IMPLEMENTATION == 0
756 using hpx::parallel::for_loop;
757 using hpx::parallel::execution::par;
758 using hpx::parallel::execution::static_chunk_size;
760 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
761 m_policy.begin(), m_policy.end(), [
this](
const Member i) {
762 iterate_type(m_mdr_policy, m_functor)(i);
765 #elif KOKKOS_HPX_IMPLEMENTATION == 1
767 using hpx::lcos::local::counting_semaphore;
769 counting_semaphore sem(0);
770 std::size_t num_tasks = 0;
772 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
773 i_begin += m_policy.chunk_size()) {
774 apply([
this, &sem, i_begin]() {
776 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
777 for (Member i = i_begin; i < i_end; ++i) {
778 iterate_type(m_mdr_policy, m_functor)(i);
791 inline ParallelFor(
const FunctorType &arg_functor, MDRangePolicy arg_policy)
792 : m_functor(arg_functor), m_mdr_policy(arg_policy),
793 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)) {}
800 template <
class FunctorType,
class ReducerType,
class... Traits>
801 class ParallelReduce<FunctorType, Kokkos::RangePolicy<Traits...>, ReducerType,
802 Kokkos::Experimental::HPX> {
805 using WorkTag =
typename Policy::work_tag;
806 using WorkRange =
typename Policy::WorkRange;
807 using Member =
typename Policy::member_type;
809 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
810 using ReducerConditional =
811 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
812 FunctorType, ReducerType>;
813 using ReducerTypeFwd =
typename ReducerConditional::type;
815 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
816 WorkTag,
void>::type;
817 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
818 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
819 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
820 using value_type =
typename Analysis::value_type;
821 using pointer_type =
typename Analysis::pointer_type;
822 using reference_type =
typename Analysis::reference_type;
824 const FunctorType m_functor;
825 const Policy m_policy;
826 const ReducerType m_reducer;
827 const pointer_type m_result_ptr;
829 bool m_force_synchronous;
831 template <
class TagType>
833 typename std::enable_if<std::is_same<TagType, void>::value>::type
834 execute_functor(
const FunctorType &functor,
const Member i,
835 reference_type update) {
839 template <
class TagType>
841 typename std::enable_if<!std::is_same<TagType, void>::value>::type
842 execute_functor(
const FunctorType &functor,
const Member i,
843 reference_type update) {
845 functor(t, i, update);
848 template <
class TagType>
849 inline typename std::enable_if<std::is_same<TagType, void>::value>::type
850 execute_functor_range(reference_type update,
const Member i_begin,
851 const Member i_end)
const {
852 for (Member i = i_begin; i < i_end; ++i) {
853 m_functor(i, update);
857 template <
class TagType>
858 inline typename std::enable_if<!std::is_same<TagType, void>::value>::type
859 execute_functor_range(reference_type update,
const Member i_begin,
860 const Member i_end)
const {
863 for (Member i = i_begin; i < i_end; ++i) {
864 m_functor(t, i, update);
868 class value_type_wrapper {
870 std::size_t m_value_size;
871 char *m_value_buffer;
874 value_type_wrapper() : m_value_size(0), m_value_buffer(nullptr) {}
876 value_type_wrapper(
const std::size_t value_size)
877 : m_value_size(value_size), m_value_buffer(new char[m_value_size]) {}
879 value_type_wrapper(
const value_type_wrapper &other)
880 : m_value_size(0), m_value_buffer(nullptr) {
881 if (
this != &other) {
882 m_value_buffer =
new char[other.m_value_size];
883 m_value_size = other.m_value_size;
885 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
890 ~value_type_wrapper() {
delete[] m_value_buffer; }
892 value_type_wrapper(value_type_wrapper &&other)
893 : m_value_size(0), m_value_buffer(nullptr) {
894 if (
this != &other) {
895 m_value_buffer = other.m_value_buffer;
896 m_value_size = other.m_value_size;
898 other.m_value_buffer =
nullptr;
899 other.m_value_size = 0;
903 value_type_wrapper &operator=(
const value_type_wrapper &other) {
904 if (
this != &other) {
905 delete[] m_value_buffer;
906 m_value_buffer =
new char[other.m_value_size];
907 m_value_size = other.m_value_size;
909 std::copy(other.m_value_buffer, other.m_value_buffer + m_value_size,
916 value_type_wrapper &operator=(value_type_wrapper &&other) {
917 if (
this != &other) {
918 delete[] m_value_buffer;
919 m_value_buffer = other.m_value_buffer;
920 m_value_size = other.m_value_size;
922 other.m_value_buffer =
nullptr;
923 other.m_value_size = 0;
929 pointer_type pointer()
const {
930 return reinterpret_cast<pointer_type
>(m_value_buffer);
933 reference_type reference()
const {
934 return ValueOps::reference(
935 reinterpret_cast<pointer_type>(m_value_buffer));
940 void execute()
const {
941 dispatch_execute_task(
this);
944 inline void execute_task()
const {
945 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
947 std::size_t value_size =
948 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
950 using hpx::parallel::for_loop;
951 using hpx::parallel::execution::par;
953 #if KOKKOS_HPX_IMPLEMENTATION == 0
958 using hpx::parallel::reduction;
959 using hpx::parallel::execution::static_chunk_size;
961 value_type_wrapper final_value(value_size);
962 value_type_wrapper identity(value_size);
964 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
965 final_value.pointer());
966 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
969 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
970 m_policy.begin(), m_policy.end(),
971 reduction(final_value, identity,
972 [
this](value_type_wrapper &a,
973 value_type_wrapper &b) -> value_type_wrapper & {
975 ReducerConditional::select(m_functor, m_reducer),
976 a.pointer(), b.pointer());
979 [
this](Member i, value_type_wrapper &update) {
980 execute_functor<WorkTag>(m_functor, i, update.reference());
983 pointer_type final_value_ptr = final_value.pointer();
985 #elif KOKKOS_HPX_IMPLEMENTATION == 1
986 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
987 buffer.resize(num_worker_threads, value_size);
989 for_loop(par, 0, num_worker_threads, [
this, &buffer](std::size_t t) {
990 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
991 reinterpret_cast<pointer_type>(buffer.get(t)));
995 using hpx::lcos::local::counting_semaphore;
997 counting_semaphore sem(0);
998 std::size_t num_tasks = 0;
1000 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1001 i_begin += m_policy.chunk_size()) {
1002 apply([
this, &buffer, &sem, i_begin]() {
1003 reference_type update =
1004 ValueOps::reference(reinterpret_cast<pointer_type>(
1005 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1006 const Member i_end =
1007 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1008 execute_functor_range<WorkTag>(update, i_begin, i_end);
1016 sem.wait(num_tasks);
1018 for (
int i = 1; i < num_worker_threads; ++i) {
1019 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1020 reinterpret_cast<pointer_type>(buffer.get(0)),
1021 reinterpret_cast<pointer_type>(buffer.get(i)));
1024 pointer_type final_value_ptr =
1025 reinterpret_cast<pointer_type
>(buffer.get(0));
1028 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1029 ReducerConditional::select(m_functor, m_reducer), final_value_ptr);
1031 if (m_result_ptr !=
nullptr) {
1032 const int n = Analysis::value_count(
1033 ReducerConditional::select(m_functor, m_reducer));
1035 for (
int j = 0; j < n; ++j) {
1036 m_result_ptr[j] = final_value_ptr[j];
1041 template <
class ViewType>
1042 inline ParallelReduce(
1043 const FunctorType &arg_functor, Policy arg_policy,
1044 const ViewType &arg_view,
1045 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1046 !Kokkos::is_reducer_type<ReducerType>::value,
1047 void *>::type = NULL)
1048 : m_functor(arg_functor), m_policy(arg_policy), m_reducer(InvalidType()),
1049 m_result_ptr(arg_view.data()),
1050 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1052 inline ParallelReduce(
const FunctorType &arg_functor, Policy arg_policy,
1053 const ReducerType &reducer)
1054 : m_functor(arg_functor), m_policy(arg_policy), m_reducer(reducer),
1055 m_result_ptr(reducer.view().data()),
1056 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1059 template <
class FunctorType,
class ReducerType,
class... Traits>
1060 class ParallelReduce<FunctorType, Kokkos::MDRangePolicy<Traits...>, ReducerType,
1061 Kokkos::Experimental::HPX> {
1063 using MDRangePolicy = Kokkos::MDRangePolicy<Traits...>;
1064 using Policy =
typename MDRangePolicy::impl_range_policy;
1065 using WorkTag =
typename MDRangePolicy::work_tag;
1066 using WorkRange =
typename Policy::WorkRange;
1067 using Member =
typename Policy::member_type;
1068 using Analysis = FunctorAnalysis<FunctorPatternInterface::REDUCE,
1069 MDRangePolicy, FunctorType>;
1070 using ReducerConditional =
1071 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1072 FunctorType, ReducerType>;
1073 using ReducerTypeFwd =
typename ReducerConditional::type;
1075 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1076 WorkTag,
void>::type;
1077 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1078 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1079 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1080 using pointer_type =
typename Analysis::pointer_type;
1081 using value_type =
typename Analysis::value_type;
1082 using reference_type =
typename Analysis::reference_type;
1083 using iterate_type =
1084 typename Kokkos::Impl::HostIterateTile<MDRangePolicy, FunctorType,
1085 WorkTag, reference_type>;
1087 const FunctorType m_functor;
1088 const MDRangePolicy m_mdr_policy;
1089 const Policy m_policy;
1090 const ReducerType m_reducer;
1091 const pointer_type m_result_ptr;
1093 bool m_force_synchronous;
1096 void execute()
const {
1097 dispatch_execute_task(
this);
1100 inline void execute_task()
const {
1101 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1102 const std::size_t value_size =
1103 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1105 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
1106 buffer.resize(num_worker_threads, value_size);
1108 using hpx::parallel::for_loop;
1109 using hpx::parallel::execution::par;
1111 for_loop(par, 0, num_worker_threads, [
this, &buffer](std::size_t t) {
1112 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1113 reinterpret_cast<pointer_type>(buffer.get(t)));
1116 #if KOKKOS_HPX_IMPLEMENTATION == 0
1117 using hpx::parallel::execution::static_chunk_size;
1119 for_loop(par.with(static_chunk_size(m_policy.chunk_size())),
1120 m_policy.begin(), m_policy.end(), [
this, &buffer](
const Member i) {
1121 reference_type update = ValueOps::reference(
1122 reinterpret_cast<pointer_type>(buffer.get(
1123 Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1124 iterate_type(m_mdr_policy, m_functor, update)(i);
1127 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1129 using hpx::lcos::local::counting_semaphore;
1131 counting_semaphore sem(0);
1132 std::size_t num_tasks = 0;
1134 for (Member i_begin = m_policy.begin(); i_begin < m_policy.end();
1135 i_begin += m_policy.chunk_size()) {
1136 apply([
this, &buffer, &sem, i_begin]() {
1137 reference_type update =
1138 ValueOps::reference(reinterpret_cast<pointer_type>(
1139 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id())));
1140 const Member i_end =
1141 (std::min)(i_begin + m_policy.chunk_size(), m_policy.end());
1143 for (Member i = i_begin; i < i_end; ++i) {
1144 iterate_type(m_mdr_policy, m_functor, update)(i);
1153 sem.wait(num_tasks);
1156 for (
int i = 1; i < num_worker_threads; ++i) {
1157 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer),
1158 reinterpret_cast<pointer_type>(buffer.get(0)),
1159 reinterpret_cast<pointer_type>(buffer.get(i)));
1162 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1163 ReducerConditional::select(m_functor, m_reducer),
1164 reinterpret_cast<pointer_type>(buffer.get(0)));
1166 if (m_result_ptr !=
nullptr) {
1167 const int n = Analysis::value_count(
1168 ReducerConditional::select(m_functor, m_reducer));
1170 for (
int j = 0; j < n; ++j) {
1171 m_result_ptr[j] =
reinterpret_cast<pointer_type
>(buffer.get(0))[j];
1176 template <
class ViewType>
1177 inline ParallelReduce(
1178 const FunctorType &arg_functor, MDRangePolicy arg_policy,
1179 const ViewType &arg_view,
1180 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1181 !Kokkos::is_reducer_type<ReducerType>::value,
1182 void *>::type = NULL)
1183 : m_functor(arg_functor), m_mdr_policy(arg_policy),
1184 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1185 m_reducer(InvalidType()), m_result_ptr(arg_view.data()),
1186 m_force_synchronous(!arg_view.impl_track().has_record()) {}
1188 inline ParallelReduce(
const FunctorType &arg_functor,
1189 MDRangePolicy arg_policy,
const ReducerType &reducer)
1190 : m_functor(arg_functor), m_mdr_policy(arg_policy),
1191 m_policy(Policy(0, m_mdr_policy.m_num_tiles).set_chunk_size(1)),
1192 m_reducer(reducer), m_result_ptr(reducer.view().data()),
1193 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1201 template <
class FunctorType,
class... Traits>
1202 class ParallelScan<FunctorType, Kokkos::RangePolicy<Traits...>,
1203 Kokkos::Experimental::HPX> {
1206 using WorkTag =
typename Policy::work_tag;
1207 using WorkRange =
typename Policy::WorkRange;
1208 using Member =
typename Policy::member_type;
1210 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1211 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1212 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1213 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1214 using pointer_type =
typename Analysis::pointer_type;
1215 using reference_type =
typename Analysis::reference_type;
1216 using value_type =
typename Analysis::value_type;
1218 const FunctorType m_functor;
1219 const Policy m_policy;
1221 template <
class TagType>
1223 typename std::enable_if<std::is_same<TagType, void>::value>::type
1224 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1225 const Member i_end, reference_type update,
1227 for (Member i = i_begin; i < i_end; ++i) {
1228 functor(i, update,
final);
1232 template <
class TagType>
1234 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1235 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1236 const Member i_end, reference_type update,
1239 for (Member i = i_begin; i < i_end; ++i) {
1240 functor(t, i, update,
final);
1245 void execute()
const { dispatch_execute_task(
this); }
1247 inline void execute_task()
const {
1248 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1249 const int value_count = Analysis::value_count(m_functor);
1250 const std::size_t value_size = Analysis::value_size(m_functor);
1252 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
1253 buffer.resize(num_worker_threads, 2 * value_size);
1255 using hpx::lcos::local::barrier;
1256 using hpx::parallel::for_loop;
1257 using hpx::parallel::execution::par;
1258 using hpx::parallel::execution::static_chunk_size;
1260 barrier bar(num_worker_threads);
1262 for_loop(par.with(static_chunk_size(1)), 0, num_worker_threads,
1263 [
this, &buffer, &bar, num_worker_threads, value_count,
1264 value_size](std::size_t
const t) {
1265 reference_type update_sum = ValueInit::init(
1266 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1268 const WorkRange range(m_policy, t, num_worker_threads);
1269 execute_functor_range<WorkTag>(m_functor, range.begin(),
1270 range.end(), update_sum,
false);
1275 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1276 buffer.get(0) + value_size));
1278 for (
int i = 1; i < num_worker_threads; ++i) {
1279 pointer_type ptr_1_prev =
1280 reinterpret_cast<pointer_type
>(buffer.get(i - 1));
1281 pointer_type ptr_2_prev =
reinterpret_cast<pointer_type
>(
1282 buffer.get(i - 1) + value_size);
1283 pointer_type ptr_2 =
reinterpret_cast<pointer_type
>(
1284 buffer.get(i) + value_size);
1286 for (
int j = 0; j < value_count; ++j) {
1287 ptr_2[j] = ptr_2_prev[j];
1290 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1296 reference_type update_base = ValueOps::reference(
1297 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1299 execute_functor_range<WorkTag>(m_functor, range.begin(),
1300 range.end(), update_base,
true);
1304 inline ParallelScan(
const FunctorType &arg_functor,
const Policy &arg_policy)
1305 : m_functor(arg_functor), m_policy(arg_policy) {}
1308 template <
class FunctorType,
class ReturnType,
class... Traits>
1309 class ParallelScanWithTotal<FunctorType, Kokkos::RangePolicy<Traits...>,
1313 using WorkTag =
typename Policy::work_tag;
1314 using WorkRange =
typename Policy::WorkRange;
1315 using Member =
typename Policy::member_type;
1317 FunctorAnalysis<FunctorPatternInterface::SCAN, Policy, FunctorType>;
1318 using ValueInit = Kokkos::Impl::FunctorValueInit<FunctorType, WorkTag>;
1319 using ValueJoin = Kokkos::Impl::FunctorValueJoin<FunctorType, WorkTag>;
1320 using ValueOps = Kokkos::Impl::FunctorValueOps<FunctorType, WorkTag>;
1321 using pointer_type =
typename Analysis::pointer_type;
1322 using reference_type =
typename Analysis::reference_type;
1323 using value_type =
typename Analysis::value_type;
1325 const FunctorType m_functor;
1326 const Policy m_policy;
1329 template <
class TagType>
1331 typename std::enable_if<std::is_same<TagType, void>::value>::type
1332 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1333 const Member i_end, reference_type update,
1335 for (Member i = i_begin; i < i_end; ++i) {
1336 functor(i, update,
final);
1340 template <
class TagType>
1342 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1343 execute_functor_range(
const FunctorType &functor,
const Member i_begin,
1344 const Member i_end, reference_type update,
1347 for (Member i = i_begin; i < i_end; ++i) {
1348 functor(t, i, update,
final);
1353 void execute()
const { dispatch_execute_task(
this); }
1355 inline void execute_task()
const {
1356 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1357 const int value_count = Analysis::value_count(m_functor);
1358 const std::size_t value_size = Analysis::value_size(m_functor);
1360 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
1361 buffer.resize(num_worker_threads, 2 * value_size);
1363 using hpx::lcos::local::barrier;
1364 using hpx::parallel::for_loop;
1365 using hpx::parallel::execution::par;
1366 using hpx::parallel::execution::static_chunk_size;
1368 barrier bar(num_worker_threads);
1370 for_loop(par.with(static_chunk_size(1)), 0, num_worker_threads,
1371 [
this, &buffer, &bar, num_worker_threads, value_count,
1372 value_size](std::size_t
const t) {
1373 reference_type update_sum = ValueInit::init(
1374 m_functor, reinterpret_cast<pointer_type>(buffer.get(t)));
1376 const WorkRange range(m_policy, t, num_worker_threads);
1377 execute_functor_range<WorkTag>(m_functor, range.begin(),
1378 range.end(), update_sum,
false);
1383 ValueInit::init(m_functor, reinterpret_cast<pointer_type>(
1384 buffer.get(0) + value_size));
1386 for (
int i = 1; i < num_worker_threads; ++i) {
1387 pointer_type ptr_1_prev =
1388 reinterpret_cast<pointer_type
>(buffer.get(i - 1));
1389 pointer_type ptr_2_prev =
reinterpret_cast<pointer_type
>(
1390 buffer.get(i - 1) + value_size);
1391 pointer_type ptr_2 =
reinterpret_cast<pointer_type
>(
1392 buffer.get(i) + value_size);
1394 for (
int j = 0; j < value_count; ++j) {
1395 ptr_2[j] = ptr_2_prev[j];
1398 ValueJoin::join(m_functor, ptr_2, ptr_1_prev);
1404 reference_type update_base = ValueOps::reference(
1405 reinterpret_cast<pointer_type>(buffer.get(t) + value_size));
1407 execute_functor_range<WorkTag>(m_functor, range.begin(),
1408 range.end(), update_base,
true);
1410 if (t == std::size_t(num_worker_threads - 1)) {
1411 m_returnvalue = update_base;
1416 inline ParallelScanWithTotal(
const FunctorType &arg_functor,
1417 const Policy &arg_policy,
1419 : m_functor(arg_functor), m_policy(arg_policy),
1420 m_returnvalue(arg_returnvalue) {}
1427 template <
class FunctorType,
class... Properties>
1428 class ParallelFor<FunctorType, Kokkos::TeamPolicy<Properties...>,
1429 Kokkos::Experimental::HPX> {
1431 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1432 using WorkTag =
typename Policy::work_tag;
1433 using Member =
typename Policy::member_type;
1436 const FunctorType m_functor;
1437 const Policy m_policy;
1439 const std::size_t m_shared;
1441 template <
class TagType>
1443 typename std::enable_if<std::is_same<TagType, void>::value>::type
1444 execute_functor(
const FunctorType &functor,
const Policy &policy,
1445 const int league_rank,
char *local_buffer,
1446 const std::size_t local_buffer_size) {
1447 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1450 template <
class TagType>
1452 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1453 execute_functor(
const FunctorType &functor,
const Policy &policy,
1454 const int league_rank,
char *local_buffer,
1455 const std::size_t local_buffer_size) {
1457 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1460 template <
class TagType>
1462 typename std::enable_if<std::is_same<TagType, void>::value>::type
1463 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
1464 const int league_rank_begin,
1465 const int league_rank_end,
char *local_buffer,
1466 const std::size_t local_buffer_size) {
1467 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
1469 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1473 template <
class TagType>
1475 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1476 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
1477 const int league_rank_begin,
1478 const int league_rank_end,
char *local_buffer,
1479 const std::size_t local_buffer_size) {
1481 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
1484 Member(policy, 0, league_rank, local_buffer, local_buffer_size));
1489 void execute()
const { dispatch_execute_task(
this); }
1491 inline void execute_task()
const {
1492 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1494 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
1495 buffer.resize(num_worker_threads, m_shared);
1497 #if KOKKOS_HPX_IMPLEMENTATION == 0
1498 using hpx::parallel::for_loop;
1499 using hpx::parallel::execution::par;
1500 using hpx::parallel::execution::static_chunk_size;
1502 for_loop(par.with(static_chunk_size(m_policy.chunk_size())), 0,
1503 m_policy.league_size(), [
this, &buffer](
const int league_rank) {
1504 execute_functor<WorkTag>(
1505 m_functor, m_policy, league_rank,
1506 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()),
1510 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1512 using hpx::lcos::local::counting_semaphore;
1514 counting_semaphore sem(0);
1515 std::size_t num_tasks = 0;
1517 for (
int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
1518 league_rank_begin += m_policy.chunk_size()) {
1519 apply([
this, &buffer, &sem, league_rank_begin]() {
1520 const int league_rank_end = (std::min)(
1521 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
1522 execute_functor_range<WorkTag>(
1523 m_functor, m_policy, league_rank_begin, league_rank_end,
1524 buffer.get(Kokkos::Experimental::HPX::impl_hardware_thread_id()), m_shared);
1532 sem.wait(num_tasks);
1536 ParallelFor(
const FunctorType &arg_functor,
const Policy &arg_policy)
1537 : m_functor(arg_functor), m_policy(arg_policy),
1538 m_league(arg_policy.league_size()),
1539 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
1540 FunctorTeamShmemSize<FunctorType>::value(
1541 arg_functor, arg_policy.team_size())) {}
1544 template <
class FunctorType,
class ReducerType,
class... Properties>
1545 class ParallelReduce<FunctorType, Kokkos::TeamPolicy<Properties...>,
1546 ReducerType, Kokkos::Experimental::HPX> {
1548 using Policy = TeamPolicyInternal<Kokkos::Experimental::HPX, Properties...>;
1550 FunctorAnalysis<FunctorPatternInterface::REDUCE, Policy, FunctorType>;
1551 using Member =
typename Policy::member_type;
1552 using WorkTag =
typename Policy::work_tag;
1553 using ReducerConditional =
1554 Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1555 FunctorType, ReducerType>;
1556 using ReducerTypeFwd =
typename ReducerConditional::type;
1558 typename Kokkos::Impl::if_c<std::is_same<InvalidType, ReducerType>::value,
1559 WorkTag,
void>::type;
1560 using ValueInit = Kokkos::Impl::FunctorValueInit<ReducerTypeFwd, WorkTagFwd>;
1561 using ValueJoin = Kokkos::Impl::FunctorValueJoin<ReducerTypeFwd, WorkTagFwd>;
1562 using ValueOps = Kokkos::Impl::FunctorValueOps<ReducerTypeFwd, WorkTagFwd>;
1563 using pointer_type =
typename Analysis::pointer_type;
1564 using reference_type =
typename Analysis::reference_type;
1565 using value_type =
typename Analysis::value_type;
1567 const FunctorType m_functor;
1569 const Policy m_policy;
1570 const ReducerType m_reducer;
1571 pointer_type m_result_ptr;
1572 const std::size_t m_shared;
1574 bool m_force_synchronous;
1576 template <
class TagType>
1578 typename std::enable_if<std::is_same<TagType, void>::value>::type
1579 execute_functor(
const FunctorType &functor,
const Policy &policy,
1580 const int league_rank,
char *local_buffer,
1581 const std::size_t local_buffer_size,
1582 reference_type update) {
1583 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1587 template <
class TagType>
1589 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1590 execute_functor(
const FunctorType &functor,
const Policy &policy,
1591 const int league_rank,
char *local_buffer,
1592 const std::size_t local_buffer_size,
1593 reference_type update) {
1595 functor(t, Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1599 template <
class TagType>
1601 typename std::enable_if<std::is_same<TagType, void>::value>::type
1602 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
1603 const int league_rank_begin,
1604 const int league_rank_end,
char *local_buffer,
1605 const std::size_t local_buffer_size,
1606 reference_type update) {
1607 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
1609 functor(Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1614 template <
class TagType>
1616 typename std::enable_if<!std::is_same<TagType, void>::value>::type
1617 execute_functor_range(
const FunctorType &functor,
const Policy &policy,
1618 const int league_rank_begin,
1619 const int league_rank_end,
char *local_buffer,
1620 const std::size_t local_buffer_size,
1621 reference_type update) {
1623 for (
int league_rank = league_rank_begin; league_rank < league_rank_end;
1626 Member(policy, 0, league_rank, local_buffer, local_buffer_size),
1632 void execute()
const {
1633 dispatch_execute_task(
this);
1636 inline void execute_task()
const {
1637 const int num_worker_threads = Kokkos::Experimental::HPX::concurrency();
1638 const std::size_t value_size =
1639 Analysis::value_size(ReducerConditional::select(m_functor, m_reducer));
1641 thread_buffer &buffer = Kokkos::Experimental::HPX::impl_get_buffer();
1642 buffer.resize(num_worker_threads, value_size + m_shared);
1644 using hpx::parallel::for_loop;
1645 using hpx::parallel::execution::par;
1647 for_loop(par, 0, num_worker_threads, [
this, &buffer](std::size_t t) {
1648 ValueInit::init(ReducerConditional::select(m_functor, m_reducer),
1649 reinterpret_cast<pointer_type>(buffer.get(t)));
1652 #if KOKKOS_HPX_IMPLEMENTATION == 0
1653 using hpx::parallel::execution::static_chunk_size;
1655 hpx::parallel::for_loop(
1656 par.with(static_chunk_size(m_policy.chunk_size())), 0,
1657 m_policy.league_size(),
1658 [
this, &buffer, value_size](
const int league_rank) {
1659 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
1660 reference_type update = ValueOps::reference(
1661 reinterpret_cast<pointer_type>(buffer.get(t)));
1663 execute_functor<WorkTag>(m_functor, m_policy, league_rank,
1664 buffer.get(t) + value_size, m_shared,
1668 #elif KOKKOS_HPX_IMPLEMENTATION == 1
1670 using hpx::lcos::local::counting_semaphore;
1672 counting_semaphore sem(0);
1673 std::size_t num_tasks = 0;
1675 for (
int league_rank_begin = 0; league_rank_begin < m_policy.league_size();
1676 league_rank_begin += m_policy.chunk_size()) {
1677 apply([
this, &buffer, &sem, league_rank_begin, value_size]() {
1678 std::size_t t = Kokkos::Experimental::HPX::impl_hardware_thread_id();
1679 reference_type update =
1680 ValueOps::reference(reinterpret_cast<pointer_type>(buffer.get(t)));
1681 const int league_rank_end = (std::min)(
1682 league_rank_begin + m_policy.chunk_size(), m_policy.league_size());
1683 execute_functor_range<WorkTag>(
1684 m_functor, m_policy, league_rank_begin, league_rank_end,
1685 buffer.get(t) + value_size, m_shared, update);
1693 sem.wait(num_tasks);
1696 const pointer_type ptr =
reinterpret_cast<pointer_type
>(buffer.get(0));
1697 for (
int t = 1; t < num_worker_threads; ++t) {
1698 ValueJoin::join(ReducerConditional::select(m_functor, m_reducer), ptr,
1699 reinterpret_cast<pointer_type>(buffer.get(t)));
1702 Kokkos::Impl::FunctorFinal<ReducerTypeFwd, WorkTagFwd>::final(
1703 ReducerConditional::select(m_functor, m_reducer), ptr);
1706 const int n = Analysis::value_count(
1707 ReducerConditional::select(m_functor, m_reducer));
1709 for (
int j = 0; j < n; ++j) {
1710 m_result_ptr[j] = ptr[j];
1715 template <
class ViewType>
1717 const FunctorType &arg_functor,
const Policy &arg_policy,
1718 const ViewType &arg_result,
1719 typename std::enable_if<Kokkos::is_view<ViewType>::value &&
1720 !Kokkos::is_reducer_type<ReducerType>::value,
1721 void *>::type = NULL)
1722 : m_functor(arg_functor), m_league(arg_policy.league_size()),
1723 m_policy(arg_policy), m_reducer(InvalidType()),
1724 m_result_ptr(arg_result.data()),
1725 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
1726 FunctorTeamShmemSize<FunctorType>::value(
1727 m_functor, arg_policy.team_size())),
1728 m_force_synchronous(!arg_result.impl_track().has_record()) {}
1730 inline ParallelReduce(
const FunctorType &arg_functor, Policy arg_policy,
1731 const ReducerType &reducer)
1732 : m_functor(arg_functor), m_league(arg_policy.league_size()),
1733 m_policy(arg_policy), m_reducer(reducer),
1734 m_result_ptr(reducer.view().data()),
1735 m_shared(arg_policy.scratch_size(0) + arg_policy.scratch_size(1) +
1736 FunctorTeamShmemSize<FunctorType>::value(
1737 arg_functor, arg_policy.team_size())),
1738 m_force_synchronous(!reducer.view().impl_track().has_record()) {}
1745 template <
typename iType>
1746 KOKKOS_INLINE_FUNCTION
1747 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1748 TeamThreadRange(
const Impl::HPXTeamMember &thread,
const iType &count) {
1749 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1753 template <
typename iType1,
typename iType2>
1754 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
1755 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
1756 TeamThreadRange(
const Impl::HPXTeamMember &thread,
const iType1 &i_begin,
1757 const iType2 &i_end) {
1758 using iType =
typename std::common_type<iType1, iType2>::type;
1759 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1760 thread, iType(i_begin), iType(i_end));
1763 template <
typename iType>
1764 KOKKOS_INLINE_FUNCTION
1765 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1766 TeamVectorRange(
const Impl::HPXTeamMember &thread,
const iType &count) {
1767 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1771 template <
typename iType1,
typename iType2>
1772 KOKKOS_INLINE_FUNCTION Impl::TeamThreadRangeBoundariesStruct<
1773 typename std::common_type<iType1, iType2>::type, Impl::HPXTeamMember>
1774 TeamVectorRange(
const Impl::HPXTeamMember &thread,
const iType1 &i_begin,
1775 const iType2 &i_end) {
1776 using iType =
typename std::common_type<iType1, iType2>::type;
1777 return Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1778 thread, iType(i_begin), iType(i_end));
1781 template <
typename iType>
1782 KOKKOS_INLINE_FUNCTION
1783 Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1785 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1789 template <
typename iType>
1790 KOKKOS_INLINE_FUNCTION
1791 Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1793 const iType &i_end) {
1794 return Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>(
1795 thread, i_begin, i_end);
1798 KOKKOS_INLINE_FUNCTION
1799 Impl::ThreadSingleStruct<Impl::HPXTeamMember>
1800 PerTeam(
const Impl::HPXTeamMember &thread) {
1801 return Impl::ThreadSingleStruct<Impl::HPXTeamMember>(thread);
1804 KOKKOS_INLINE_FUNCTION
1805 Impl::VectorSingleStruct<Impl::HPXTeamMember>
1806 PerThread(
const Impl::HPXTeamMember &thread) {
1807 return Impl::VectorSingleStruct<Impl::HPXTeamMember>(thread);
1815 template <
typename iType,
class Lambda>
1817 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1819 const Lambda &lambda) {
1820 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1821 i += loop_boundaries.increment)
1831 template <
typename iType,
class Lambda,
typename ValueType>
1833 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1835 const Lambda &lambda, ValueType &result) {
1836 result = ValueType();
1837 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1838 i += loop_boundaries.increment) {
1848 template <
typename iType,
class Lambda>
1850 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1852 const Lambda &lambda) {
1853 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
1856 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1857 i += loop_boundaries.increment) {
1868 template <
typename iType,
class Lambda,
typename ValueType>
1870 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1872 const Lambda &lambda, ValueType &result) {
1873 result = ValueType();
1874 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
1877 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1878 i += loop_boundaries.increment) {
1883 template <
typename iType,
class Lambda,
typename ReducerType>
1885 const Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1887 const Lambda &lambda,
const ReducerType &reducer) {
1888 reducer.init(reducer.reference());
1889 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1890 i += loop_boundaries.increment) {
1891 lambda(i, reducer.reference());
1895 template <
typename iType,
class Lambda,
typename ReducerType>
1897 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1899 const Lambda &lambda,
const ReducerType &reducer) {
1900 reducer.init(reducer.reference());
1901 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
1904 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1905 i += loop_boundaries.increment) {
1906 lambda(i, reducer.reference());
1910 template <
typename iType,
class FunctorType>
1911 KOKKOS_INLINE_FUNCTION
void parallel_scan(
1912 Impl::TeamThreadRangeBoundariesStruct<iType, Impl::HPXTeamMember>
const
1914 const FunctorType &lambda) {
1915 using value_type =
typename Kokkos::Impl::FunctorAnalysis<
1916 Kokkos::Impl::FunctorPatternInterface::SCAN, void,
1917 FunctorType>::value_type;
1919 value_type scan_val = value_type();
1922 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1923 i += loop_boundaries.increment) {
1924 lambda(i, scan_val,
false);
1928 scan_val = loop_boundaries.thread.team_scan(scan_val);
1930 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1931 i += loop_boundaries.increment) {
1932 lambda(i, scan_val,
true);
1947 template <
typename iType,
class FunctorType>
1948 KOKKOS_INLINE_FUNCTION
void parallel_scan(
1949 const Impl::ThreadVectorRangeBoundariesStruct<iType, Impl::HPXTeamMember>
1951 const FunctorType &lambda) {
1952 using ValueTraits = Kokkos::Impl::FunctorValueTraits<FunctorType, void>;
1953 using value_type =
typename ValueTraits::value_type;
1955 value_type scan_val = value_type();
1957 #ifdef KOKKOS_ENABLE_PRAGMA_IVDEP
1960 for (iType i = loop_boundaries.start; i < loop_boundaries.end;
1961 i += loop_boundaries.increment) {
1962 lambda(i, scan_val,
true);
1966 template <
class FunctorType>
1967 KOKKOS_INLINE_FUNCTION
void
1968 single(
const Impl::VectorSingleStruct<Impl::HPXTeamMember> &single_struct,
1969 const FunctorType &lambda) {
1973 template <
class FunctorType>
1974 KOKKOS_INLINE_FUNCTION
void
1975 single(
const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &single_struct,
1976 const FunctorType &lambda) {
1980 template <
class FunctorType,
class ValueType>
1981 KOKKOS_INLINE_FUNCTION
void
1982 single(
const Impl::VectorSingleStruct<Impl::HPXTeamMember> &single_struct,
1983 const FunctorType &lambda, ValueType &val) {
1987 template <
class FunctorType,
class ValueType>
1988 KOKKOS_INLINE_FUNCTION
void
1989 single(
const Impl::ThreadSingleStruct<Impl::HPXTeamMember> &single_struct,
1990 const FunctorType &lambda, ValueType &val) {
1996 #include <HPX/Kokkos_HPX_Task.hpp>
void parallel_for(const ExecPolicy &policy, const FunctorType &functor, const std::string &str="", typename Impl::enable_if< Kokkos::Impl::is_execution_policy< ExecPolicy >::value >::type *=0)
Execute functor in parallel according to the execution policy.
void print_configuration(std::ostream &, const bool detail=false)
Print "Bill of Materials".
KOKKOS_INLINE_FUNCTION_DELETED Impl::TeamThreadRangeBoundariesStruct< iType, TeamMemberType > TeamThreadRange(const TeamMemberType &, const iType &count)=delete
Execution policy for parallel work over a threads within a team.
Scratch memory space associated with an execution space.
void parallel_reduce(const std::string &label, const PolicyType &policy, const FunctorType &functor, ReturnType &return_value, typename Impl::enable_if< Kokkos::Impl::is_execution_policy< PolicyType >::value >::type *=0)
Parallel reduction.
std::enable_if< std::is_same< typename Kokkos::View< T, P...>::array_layout, Kokkos::LayoutLeft >::value||std::is_same< typename Kokkos::View< T, P...>::array_layout, Kokkos::LayoutRight >::value >::type resize(Kokkos::View< T, P...> &v, const size_t n0=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n1=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n2=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n3=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n4=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n5=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n6=KOKKOS_IMPL_CTOR_DEFAULT_ARG, const size_t n7=KOKKOS_IMPL_CTOR_DEFAULT_ARG)
Resize a view with copying old data to new data at the corresponding indices.
Memory management for host memory.
KOKKOS_INLINE_FUNCTION_DELETED Impl::ThreadVectorRangeBoundariesStruct< iType, TeamMemberType > ThreadVectorRange(const TeamMemberType &, const iType &count)=delete
Execution policy for a vector parallel loop.
Declaration of various MemoryLayout options.
Declaration of parallel operators.
Execution policy for work over a range of an integral type.
KOKKOS_INLINE_FUNCTION_DELETED Impl::TeamThreadRangeBoundariesStruct< iType, TeamMemberType > TeamVectorRange(const TeamMemberType &, const iType &count)=delete
Execution policy for parallel work over a threads within a team.