17 #ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18 #include <Kokkos_Macros.hpp>
20 "Including non-public Kokkos header files is not allowed.");
22 #ifndef KOKKOS_TASKSCHEDULER_HPP
23 #define KOKKOS_TASKSCHEDULER_HPP
27 #include <Kokkos_Macros.hpp>
28 #if defined(KOKKOS_ENABLE_TASKDAG)
30 #include <Kokkos_Core_fwd.hpp>
31 #include <Kokkos_TaskScheduler_fwd.hpp>
34 #include <Kokkos_MemoryPool.hpp>
36 #include <Kokkos_Future.hpp>
37 #include <impl/Kokkos_TaskQueue.hpp>
38 #include <impl/Kokkos_SingleTaskQueue.hpp>
39 #include <impl/Kokkos_TaskQueueMultiple.hpp>
40 #include <impl/Kokkos_TaskPolicyData.hpp>
41 #include <impl/Kokkos_TaskTeamMember.hpp>
42 #include <impl/Kokkos_SimpleTaskScheduler.hpp>
51 template <
class,
class>
56 template <
class ExecSpace,
class QueueType>
57 class BasicTaskScheduler :
public Impl::TaskSchedulerBase {
59 using scheduler_type = BasicTaskScheduler;
60 using execution_space = ExecSpace;
61 using queue_type = QueueType;
62 using memory_space =
typename queue_type::memory_space;
63 using memory_pool =
typename queue_type::memory_pool;
64 using specialization = Impl::TaskQueueSpecialization<BasicTaskScheduler>;
65 using member_type =
typename specialization::member_type;
66 using team_scheduler_type = BasicTaskScheduler;
67 template <
class Functor>
68 using runnable_task_type =
69 Impl::Task<scheduler_type, typename Functor::value_type, Functor>;
70 template <
class ValueType>
71 using future_type = Kokkos::BasicFuture<ValueType, BasicTaskScheduler>;
72 template <
class FunctorType>
73 using future_type_for_functor = future_type<typename FunctorType::value_type>;
76 using track_type = Kokkos::Impl::SharedAllocationTracker;
77 using task_base = Impl::TaskBase;
84 template <
typename,
typename>
85 friend class Impl::TaskQueue;
87 friend struct Impl::TaskQueueSpecialization;
88 template <
typename,
typename>
89 friend class Impl::TaskQueueSpecializationConstrained;
90 template <
typename,
typename>
91 friend class Impl::TaskTeamMemberAdapter;
92 template <
typename,
typename>
93 friend class Impl::TaskExec;
97 KOKKOS_INLINE_FUNCTION
98 BasicTaskScheduler(track_type arg_track, queue_type* arg_queue)
99 : m_track(std::move(arg_track)), m_queue(std::move(arg_queue)) {}
101 KOKKOS_INLINE_FUNCTION
102 team_scheduler_type get_team_scheduler(
int team_rank)
const {
103 return {m_track, &m_queue->get_team_queue(team_rank)};
108 KOKKOS_INLINE_FUNCTION
109 static constexpr task_base* _get_task_ptr(std::nullptr_t) {
return nullptr; }
111 template <
class ValueType>
112 KOKKOS_INLINE_FUNCTION
static constexpr task_base* _get_task_ptr(
113 future_type<ValueType>&& f) {
117 template <
int TaskEnum,
typename DepTaskType,
typename FunctorType>
119 Kokkos::BasicFuture<typename FunctorType::value_type, scheduler_type>
120 _spawn_impl(DepTaskType* arg_predecessor_task, TaskPriority arg_priority,
121 typename task_base::function_type arg_function,
122 typename task_base::destroy_type ,
123 FunctorType&& arg_functor) {
124 using functor_future_type =
125 future_type_for_functor<std::decay_t<FunctorType>>;
127 Impl::Task<BasicTaskScheduler,
typename functor_future_type::value_type,
139 functor_future_type f;
143 const size_t alloc_size =
144 m_queue->template spawn_allocation_size<FunctorType>();
146 void* task_storage = m_queue->allocate(alloc_size);
154 new (task_storage) task_type(std::forward<FunctorType>(arg_functor));
156 f.m_task->m_apply = arg_function;
158 f.m_task->m_queue = m_queue;
159 f.m_task->m_next = arg_predecessor_task;
160 f.m_task->m_ref_count = 2;
161 f.m_task->m_alloc_size = alloc_size;
162 f.m_task->m_task_type = TaskEnum;
163 f.m_task->m_priority = (int16_t)arg_priority;
165 Kokkos::memory_fence();
172 m_queue->schedule_runnable(f.m_task);
181 KOKKOS_INLINE_FUNCTION
182 BasicTaskScheduler() : m_track(), m_queue(nullptr) {}
184 KOKKOS_INLINE_FUNCTION
185 BasicTaskScheduler(BasicTaskScheduler&& rhs) noexcept
186 : m_track(rhs.m_track),
188 m_queue(std::move(rhs.m_queue)) {}
190 KOKKOS_INLINE_FUNCTION
191 BasicTaskScheduler(BasicTaskScheduler
const& rhs)
192 : m_track(rhs.m_track), m_queue(rhs.m_queue) {}
194 KOKKOS_INLINE_FUNCTION
195 BasicTaskScheduler& operator=(BasicTaskScheduler&& rhs) noexcept {
196 m_track = rhs.m_track;
198 m_queue = std::move(rhs.m_queue);
202 KOKKOS_INLINE_FUNCTION
203 BasicTaskScheduler& operator=(BasicTaskScheduler
const& rhs) {
204 m_track = rhs.m_track;
205 m_queue = rhs.m_queue;
209 explicit BasicTaskScheduler(memory_pool
const& arg_memory_pool) noexcept
210 : m_track(), m_queue(
nullptr) {
212 Kokkos::Impl::SharedAllocationRecord<memory_space,
213 typename queue_type::Destroy>;
215 record_type* record = record_type::allocate(
216 memory_space(),
"Kokkos::TaskQueue",
sizeof(queue_type));
218 m_queue =
new (record->data()) queue_type(arg_memory_pool);
220 record->m_destroy.m_queue = m_queue;
222 m_track.assign_allocated_record_to_uninitialized(record);
225 BasicTaskScheduler(memory_space
const& arg_memory_space,
226 size_t const mempool_capacity,
227 unsigned const mempool_min_block_size
229 unsigned const mempool_max_block_size
231 unsigned const mempool_superblock_size
233 : BasicTaskScheduler(memory_pool(
234 arg_memory_space, mempool_capacity, mempool_min_block_size,
235 mempool_max_block_size, mempool_superblock_size)) {}
239 KOKKOS_INLINE_FUNCTION
240 queue_type& queue() const noexcept {
241 KOKKOS_EXPECTS(m_queue !=
nullptr);
245 KOKKOS_INLINE_FUNCTION
246 memory_pool* memory() const noexcept {
247 return m_queue ? &(m_queue->m_memory) : (memory_pool*)0;
252 template <
typename FunctorType>
253 KOKKOS_FUNCTION
size_t spawn_allocation_size()
const {
254 return m_queue->template spawn_allocation_size<FunctorType>();
259 size_t when_all_allocation_size(
int narg)
const {
260 return m_queue->when_all_allocation_size(narg);
265 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
266 KOKKOS_FUNCTION
static Kokkos::BasicFuture<
typename FunctorType::value_type,
268 spawn(Impl::TaskPolicyWithScheduler<TaskEnum, scheduler_type, DepFutureType>&&
270 typename task_base::function_type arg_function,
271 typename task_base::destroy_type arg_destroy,
272 FunctorType&& arg_functor) {
273 return std::move(arg_policy.scheduler())
274 .
template _spawn_impl<TaskEnum>(
275 _get_task_ptr(std::move(arg_policy.predecessor())),
276 arg_policy.priority(), arg_function, arg_destroy,
277 std::forward<FunctorType>(arg_functor));
280 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
281 KOKKOS_FUNCTION future_type_for_functor<std::decay_t<FunctorType>> spawn(
282 Impl::TaskPolicyWithPredecessor<TaskEnum, DepFutureType>&& arg_policy,
283 FunctorType&& arg_functor) {
284 using task_type = runnable_task_type<FunctorType>;
285 typename task_type::function_type
const ptr = task_type::apply;
286 typename task_type::destroy_type
const dtor = task_type::destroy;
288 return _spawn_impl<TaskEnum>(
289 _get_task_ptr(std::move(arg_policy).predecessor()),
290 arg_policy.priority(), ptr, dtor,
291 std::forward<FunctorType>(arg_functor));
294 template <
typename FunctorType,
typename ValueType,
typename Scheduler>
295 KOKKOS_FUNCTION
static void respawn(
296 FunctorType* arg_self,
297 BasicFuture<ValueType, Scheduler>
const& arg_dependence,
298 TaskPriority
const& arg_priority) {
301 using value_type =
typename FunctorType::value_type;
302 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
304 task_type*
const task =
static_cast<task_type*
>(arg_self);
306 task->m_priority =
static_cast<int>(arg_priority);
308 task->add_dependence(arg_dependence.m_task);
313 template <
typename FunctorType>
314 KOKKOS_FUNCTION
static void respawn(FunctorType* arg_self,
315 BasicTaskScheduler
const&,
316 TaskPriority
const& arg_priority) {
319 using value_type =
typename FunctorType::value_type;
320 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
322 task_type*
const task =
static_cast<task_type*
>(arg_self);
324 task->m_priority =
static_cast<int>(arg_priority);
326 task->add_dependence(
nullptr);
335 template <
typename ValueType>
336 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
337 BasicFuture<ValueType, BasicTaskScheduler>
const arg[],
int narg) {
341 queue_type* q = m_queue;
345 for (
int i = 0; i < narg; ++i) {
346 task_base*
const t = arg[i].m_task;
350 desul::atomic_inc(&(t->m_ref_count), desul::MemoryOrderSeqCst(),
351 desul::MemoryScopeDevice());
352 if (q != static_cast<queue_type const*>(t->m_queue)) {
354 "Kokkos when_all Futures must be in the same scheduler");
362 size_t const alloc_size = q->when_all_allocation_size(narg);
364 f.m_task =
reinterpret_cast<task_base*
>(q->allocate(alloc_size));
372 new (f.m_task) task_base();
374 f.m_task->m_queue = q;
375 f.m_task->m_ref_count = 2;
376 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
377 f.m_task->m_dep_count = narg;
378 f.m_task->m_task_type = task_base::Aggregate;
382 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
384 for (
int i = 0; i < narg; ++i) {
385 dep[i] = arg[i].m_task;
388 Kokkos::memory_fence();
390 q->schedule_aggregate(f.m_task);
400 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
int narg,
402 using input_type = decltype(func(0));
404 static_assert(is_future<input_type>::value,
405 "Functor must return a Kokkos::Future");
409 if (0 == narg)
return f;
411 size_t const alloc_size = m_queue->when_all_allocation_size(narg);
413 f.m_task =
reinterpret_cast<task_base*
>(m_queue->allocate(alloc_size));
420 new (f.m_task) task_base();
424 f.m_task->m_queue = m_queue;
425 f.m_task->m_ref_count = 2;
426 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
427 f.m_task->m_dep_count = narg;
428 f.m_task->m_task_type = task_base::Aggregate;
434 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
436 for (
int i = 0; i < narg; ++i) {
437 const input_type arg_f = func(i);
438 if (
nullptr != arg_f.m_task) {
447 desul::atomic_inc(&(arg_f.m_task->m_ref_count),
448 desul::MemoryOrderSeqCst(),
449 desul::MemoryScopeDevice());
450 dep[i] = arg_f.m_task;
454 Kokkos::memory_fence();
456 m_queue->schedule_aggregate(f.m_task);
464 KOKKOS_INLINE_FUNCTION
465 int allocation_capacity() const noexcept {
466 return m_queue->m_memory.capacity();
469 KOKKOS_INLINE_FUNCTION
470 int allocated_task_count() const noexcept {
return m_queue->m_count_alloc; }
472 KOKKOS_INLINE_FUNCTION
473 int allocated_task_count_max() const noexcept {
return m_queue->m_max_alloc; }
475 KOKKOS_INLINE_FUNCTION
476 long allocated_task_count_accum() const noexcept {
477 return m_queue->m_accum_alloc;
482 template <
class S,
class Q>
483 friend void wait(Kokkos::BasicTaskScheduler<S, Q>
const&);
496 template <
class T,
class Scheduler>
497 Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskTeam,
498 Kokkos::BasicFuture<T, Scheduler>>
499 KOKKOS_INLINE_FUNCTION
500 TaskTeam(Kokkos::BasicFuture<T, Scheduler> arg_future,
501 TaskPriority arg_priority = TaskPriority::Regular) {
502 return {std::move(arg_future), arg_priority};
505 template <
class Scheduler>
506 Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskTeam, Scheduler>
507 KOKKOS_INLINE_FUNCTION TaskTeam(
508 Scheduler arg_scheduler,
509 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value, TaskPriority>
510 arg_priority = TaskPriority::Regular) {
511 return {std::move(arg_scheduler), arg_priority};
514 template <
class Scheduler,
class PredecessorFuture>
515 Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskTeam, Scheduler,
517 KOKKOS_INLINE_FUNCTION
518 TaskTeam(Scheduler arg_scheduler, PredecessorFuture arg_future,
519 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value &&
520 Kokkos::is_future<PredecessorFuture>::value,
522 arg_priority = TaskPriority::Regular) {
523 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
525 "Can't create a task policy from a scheduler and a future from "
526 "a different scheduler");
528 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
533 template <
class T,
class Scheduler>
534 Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskSingle,
535 Kokkos::BasicFuture<T, Scheduler>>
536 KOKKOS_INLINE_FUNCTION
537 TaskSingle(Kokkos::BasicFuture<T, Scheduler> arg_future,
538 TaskPriority arg_priority = TaskPriority::Regular) {
539 return {std::move(arg_future), arg_priority};
542 template <
class Scheduler>
543 Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskSingle, Scheduler>
544 KOKKOS_INLINE_FUNCTION TaskSingle(
545 Scheduler arg_scheduler,
546 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value, TaskPriority>
547 arg_priority = TaskPriority::Regular) {
548 return {std::move(arg_scheduler), arg_priority};
551 template <
class Scheduler,
class PredecessorFuture>
552 Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskSingle, Scheduler,
554 KOKKOS_INLINE_FUNCTION
555 TaskSingle(Scheduler arg_scheduler, PredecessorFuture arg_future,
556 std::enable_if_t<Kokkos::is_scheduler<Scheduler>::value &&
557 Kokkos::is_future<PredecessorFuture>::value,
559 arg_priority = TaskPriority::Regular) {
560 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
562 "Can't create a task policy from a scheduler and a future from "
563 "a different scheduler");
565 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
576 template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
577 typename FunctorType>
578 typename Scheduler::template future_type_for_functor<std::decay_t<FunctorType>>
579 host_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
581 FunctorType&& arg_functor) {
582 using scheduler_type = Scheduler;
584 typename scheduler_type::template runnable_task_type<FunctorType>;
586 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
587 TaskEnum == Impl::TaskType::TaskSingle,
588 "Kokkos host_spawn requires TaskTeam or TaskSingle");
592 typename task_type::function_type ptr;
593 typename task_type::destroy_type dtor;
594 Kokkos::Impl::TaskQueueSpecialization<
595 scheduler_type>::template get_function_pointer<task_type>(ptr, dtor);
597 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
598 std::forward<FunctorType>(arg_functor));
607 template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
608 typename FunctorType>
609 typename Scheduler::template future_type_for_functor<std::decay_t<FunctorType>>
610 KOKKOS_INLINE_FUNCTION
611 task_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
613 FunctorType&& arg_functor) {
614 using scheduler_type = Scheduler;
617 typename scheduler_type::template runnable_task_type<FunctorType>;
619 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
620 TaskEnum == Impl::TaskType::TaskSingle,
621 "Kokkos task_spawn requires TaskTeam or TaskSingle");
623 typename task_type::function_type
const ptr = task_type::apply;
624 typename task_type::destroy_type
const dtor = task_type::destroy;
626 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
627 std::forward<FunctorType>(arg_functor));
635 template <
typename FunctorType,
typename T>
636 void KOKKOS_INLINE_FUNCTION
637 respawn(FunctorType* arg_self, T
const& arg,
638 TaskPriority
const& arg_priority = TaskPriority::Regular) {
639 static_assert(Kokkos::is_future<T>::value || Kokkos::is_scheduler<T>::value,
640 "Kokkos respawn argument must be Future or TaskScheduler");
642 T::scheduler_type::respawn(arg_self, arg, arg_priority);
658 template <
class ExecSpace,
class QueueType>
659 inline void wait(BasicTaskScheduler<ExecSpace, QueueType>
const& scheduler) {
660 using scheduler_type = BasicTaskScheduler<ExecSpace, QueueType>;
661 scheduler_type::specialization::execute(scheduler);