45 #ifndef KOKKOS_TASKSCHEDULER_HPP
46 #define KOKKOS_TASKSCHEDULER_HPP
50 #include <Kokkos_Macros.hpp>
51 #if defined(KOKKOS_ENABLE_TASKDAG)
53 #include <Kokkos_Core_fwd.hpp>
54 #include <Kokkos_TaskScheduler_fwd.hpp>
57 #include <Kokkos_MemoryPool.hpp>
58 #include <impl/Kokkos_Tags.hpp>
60 #include <Kokkos_Future.hpp>
61 #include <impl/Kokkos_TaskQueue.hpp>
62 #include <impl/Kokkos_SingleTaskQueue.hpp>
63 #include <impl/Kokkos_TaskQueueMultiple.hpp>
64 #include <impl/Kokkos_TaskPolicyData.hpp>
65 #include <impl/Kokkos_TaskTeamMember.hpp>
66 #include <impl/Kokkos_SimpleTaskScheduler.hpp>
75 template <
class,
class>
80 template <
class ExecSpace,
class QueueType>
81 class BasicTaskScheduler :
public Impl::TaskSchedulerBase {
83 using scheduler_type = BasicTaskScheduler;
84 using execution_space = ExecSpace;
85 using queue_type = QueueType;
86 using memory_space =
typename queue_type::memory_space;
87 using memory_pool =
typename queue_type::memory_pool;
88 using specialization = Impl::TaskQueueSpecialization<BasicTaskScheduler>;
89 using member_type =
typename specialization::member_type;
90 using team_scheduler_type = BasicTaskScheduler;
91 template <
class Functor>
92 using runnable_task_type =
93 Impl::Task<scheduler_type, typename Functor::value_type, Functor>;
94 template <
class ValueType>
95 using future_type = Kokkos::BasicFuture<ValueType, BasicTaskScheduler>;
96 template <
class FunctorType>
97 using future_type_for_functor = future_type<typename FunctorType::value_type>;
100 using track_type = Kokkos::Impl::SharedAllocationTracker;
101 using task_base = Impl::TaskBase;
108 template <
typename,
typename>
109 friend class Impl::TaskQueue;
111 friend struct Impl::TaskQueueSpecialization;
112 template <
typename,
typename>
113 friend class Impl::TaskQueueSpecializationConstrained;
114 template <
typename,
typename>
115 friend class Impl::TaskTeamMemberAdapter;
116 template <
typename,
typename>
117 friend class Impl::TaskExec;
121 KOKKOS_INLINE_FUNCTION
122 BasicTaskScheduler(track_type arg_track, queue_type* arg_queue)
123 : m_track(std::move(arg_track)), m_queue(std::move(arg_queue)) {}
125 KOKKOS_INLINE_FUNCTION
126 team_scheduler_type get_team_scheduler(
int team_rank)
const {
127 return {m_track, &m_queue->get_team_queue(team_rank)};
132 KOKKOS_INLINE_FUNCTION
133 static constexpr task_base* _get_task_ptr(std::nullptr_t) {
return nullptr; }
135 template <
class ValueType>
136 KOKKOS_INLINE_FUNCTION
static constexpr task_base* _get_task_ptr(
137 future_type<ValueType>&& f) {
141 template <
int TaskEnum,
typename DepTaskType,
typename FunctorType>
143 Kokkos::BasicFuture<typename FunctorType::value_type, scheduler_type>
144 _spawn_impl(DepTaskType* arg_predecessor_task, TaskPriority arg_priority,
145 typename task_base::function_type arg_function,
146 typename task_base::destroy_type ,
147 FunctorType&& arg_functor) {
148 using functor_future_type =
149 future_type_for_functor<typename std::decay<FunctorType>::type>;
151 Impl::Task<BasicTaskScheduler,
typename functor_future_type::value_type,
163 functor_future_type f;
167 const size_t alloc_size =
168 m_queue->template spawn_allocation_size<FunctorType>();
170 void* task_storage = m_queue->allocate(alloc_size);
178 new (task_storage) task_type(std::forward<FunctorType>(arg_functor));
180 f.m_task->m_apply = arg_function;
182 f.m_task->m_queue = m_queue;
183 f.m_task->m_next = arg_predecessor_task;
184 f.m_task->m_ref_count = 2;
185 f.m_task->m_alloc_size = alloc_size;
186 f.m_task->m_task_type = TaskEnum;
187 f.m_task->m_priority = (int16_t)arg_priority;
189 Kokkos::memory_fence();
196 m_queue->schedule_runnable(f.m_task);
205 KOKKOS_INLINE_FUNCTION
206 BasicTaskScheduler() : m_track(), m_queue(nullptr) {}
208 KOKKOS_INLINE_FUNCTION
209 BasicTaskScheduler(BasicTaskScheduler&& rhs) noexcept
210 : m_track(rhs.m_track),
212 m_queue(std::move(rhs.m_queue)) {}
214 KOKKOS_INLINE_FUNCTION
215 BasicTaskScheduler(BasicTaskScheduler
const& rhs)
216 : m_track(rhs.m_track), m_queue(rhs.m_queue) {}
218 KOKKOS_INLINE_FUNCTION
219 BasicTaskScheduler& operator=(BasicTaskScheduler&& rhs) noexcept {
220 m_track = rhs.m_track;
222 m_queue = std::move(rhs.m_queue);
226 KOKKOS_INLINE_FUNCTION
227 BasicTaskScheduler& operator=(BasicTaskScheduler
const& rhs) {
228 m_track = rhs.m_track;
229 m_queue = rhs.m_queue;
233 explicit BasicTaskScheduler(memory_pool
const& arg_memory_pool) noexcept
234 : m_track(), m_queue(
nullptr) {
235 typedef Kokkos::Impl::SharedAllocationRecord<memory_space,
236 typename queue_type::Destroy>
239 record_type* record =
240 record_type::allocate(memory_space(),
"TaskQueue",
sizeof(queue_type));
242 m_queue =
new (record->data()) queue_type(arg_memory_pool);
244 record->m_destroy.m_queue = m_queue;
246 m_track.assign_allocated_record_to_uninitialized(record);
249 BasicTaskScheduler(memory_space
const& arg_memory_space,
250 size_t const mempool_capacity,
251 unsigned const mempool_min_block_size
253 unsigned const mempool_max_block_size
255 unsigned const mempool_superblock_size
257 : BasicTaskScheduler(memory_pool(
258 arg_memory_space, mempool_capacity, mempool_min_block_size,
259 mempool_max_block_size, mempool_superblock_size)) {}
263 KOKKOS_INLINE_FUNCTION
264 queue_type& queue() const noexcept {
265 KOKKOS_EXPECTS(m_queue !=
nullptr);
269 KOKKOS_INLINE_FUNCTION
270 memory_pool* memory() const noexcept {
271 return m_queue ? &(m_queue->m_memory) : (memory_pool*)0;
276 template <
typename FunctorType>
277 KOKKOS_FUNCTION
size_t spawn_allocation_size()
const {
278 return m_queue->template spawn_allocation_size<FunctorType>();
283 size_t when_all_allocation_size(
int narg)
const {
284 return m_queue->when_all_allocation_size(narg);
289 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
290 KOKKOS_FUNCTION
static Kokkos::BasicFuture<
typename FunctorType::value_type,
292 spawn(Impl::TaskPolicyWithScheduler<TaskEnum, scheduler_type, DepFutureType>&&
294 typename task_base::function_type arg_function,
295 typename task_base::destroy_type arg_destroy,
296 FunctorType&& arg_functor) {
297 return std::move(arg_policy.scheduler())
298 .
template _spawn_impl<TaskEnum>(
299 _get_task_ptr(std::move(arg_policy.predecessor())),
300 arg_policy.priority(), arg_function, arg_destroy,
301 std::forward<FunctorType>(arg_functor));
304 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
306 future_type_for_functor<typename std::decay<FunctorType>::type>
308 Impl::TaskPolicyWithPredecessor<TaskEnum, DepFutureType>&& arg_policy,
309 FunctorType&& arg_functor) {
310 using task_type = runnable_task_type<FunctorType>;
311 typename task_type::function_type
const ptr = task_type::apply;
312 typename task_type::destroy_type
const dtor = task_type::destroy;
314 return _spawn_impl<TaskEnum>(
315 _get_task_ptr(std::move(arg_policy).predecessor()),
316 arg_policy.priority(), ptr, dtor,
317 std::forward<FunctorType>(arg_functor));
320 template <
typename FunctorType,
typename ValueType,
typename Scheduler>
321 KOKKOS_FUNCTION
static void respawn(
322 FunctorType* arg_self,
323 BasicFuture<ValueType, Scheduler>
const& arg_dependence,
324 TaskPriority
const& arg_priority) {
327 using value_type =
typename FunctorType::value_type;
328 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
330 task_type*
const task =
static_cast<task_type*
>(arg_self);
332 task->m_priority =
static_cast<int>(arg_priority);
334 task->add_dependence(arg_dependence.m_task);
339 template <
typename FunctorType>
340 KOKKOS_FUNCTION
static void respawn(FunctorType* arg_self,
341 BasicTaskScheduler
const&,
342 TaskPriority
const& arg_priority) {
345 using value_type =
typename FunctorType::value_type;
346 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
348 task_type*
const task =
static_cast<task_type*
>(arg_self);
350 task->m_priority =
static_cast<int>(arg_priority);
352 task->add_dependence(
nullptr);
361 template <
typename ValueType>
362 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
363 BasicFuture<ValueType, BasicTaskScheduler>
const arg[],
int narg) {
367 queue_type* q = m_queue;
371 for (
int i = 0; i < narg; ++i) {
372 task_base*
const t = arg[i].m_task;
375 Kokkos::atomic_increment(&(t->m_ref_count));
376 if (q != static_cast<queue_type const*>(t->m_queue)) {
378 "Kokkos when_all Futures must be in the same scheduler");
386 size_t const alloc_size = q->when_all_allocation_size(narg);
388 f.m_task =
reinterpret_cast<task_base*
>(q->allocate(alloc_size));
396 new (f.m_task) task_base();
398 f.m_task->m_queue = q;
399 f.m_task->m_ref_count = 2;
400 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
401 f.m_task->m_dep_count = narg;
402 f.m_task->m_task_type = task_base::Aggregate;
406 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
408 for (
int i = 0; i < narg; ++i) {
409 dep[i] = arg[i].m_task;
412 Kokkos::memory_fence();
414 q->schedule_aggregate(f.m_task);
424 KOKKOS_FUNCTION BasicFuture<void, scheduler_type> when_all(
int narg,
426 using input_type = decltype(func(0));
428 static_assert(is_future<input_type>::value,
429 "Functor must return a Kokkos::Future");
433 if (0 == narg)
return f;
435 size_t const alloc_size = m_queue->when_all_allocation_size(narg);
437 f.m_task =
reinterpret_cast<task_base*
>(m_queue->allocate(alloc_size));
444 new (f.m_task) task_base();
448 f.m_task->m_queue = m_queue;
449 f.m_task->m_ref_count = 2;
450 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
451 f.m_task->m_dep_count = narg;
452 f.m_task->m_task_type = task_base::Aggregate;
458 task_base*
volatile*
const dep = f.m_task->aggregate_dependences();
460 for (
int i = 0; i < narg; ++i) {
461 const input_type arg_f = func(i);
462 if (
nullptr != arg_f.m_task) {
470 Kokkos::atomic_increment(&(arg_f.m_task->m_ref_count));
471 dep[i] = arg_f.m_task;
475 Kokkos::memory_fence();
477 m_queue->schedule_aggregate(f.m_task);
485 KOKKOS_INLINE_FUNCTION
486 int allocation_capacity() const noexcept {
487 return m_queue->m_memory.capacity();
490 KOKKOS_INLINE_FUNCTION
491 int allocated_task_count() const noexcept {
return m_queue->m_count_alloc; }
493 KOKKOS_INLINE_FUNCTION
494 int allocated_task_count_max() const noexcept {
return m_queue->m_max_alloc; }
496 KOKKOS_INLINE_FUNCTION
497 long allocated_task_count_accum() const noexcept {
498 return m_queue->m_accum_alloc;
503 template <
class S,
class Q>
504 friend void wait(Kokkos::BasicTaskScheduler<S, Q>
const&);
517 template <
class T,
class Scheduler>
518 Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskTeam,
519 Kokkos::BasicFuture<T, Scheduler> >
520 KOKKOS_INLINE_FUNCTION
521 TaskTeam(Kokkos::BasicFuture<T, Scheduler> arg_future,
522 TaskPriority arg_priority = TaskPriority::Regular) {
523 return {std::move(arg_future), arg_priority};
526 template <
class Scheduler>
527 Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskTeam, Scheduler>
528 KOKKOS_INLINE_FUNCTION
529 TaskTeam(Scheduler arg_scheduler,
530 typename std::enable_if<Kokkos::is_scheduler<Scheduler>::value,
531 TaskPriority>::type arg_priority =
532 TaskPriority::Regular) {
533 return {std::move(arg_scheduler), arg_priority};
536 template <
class Scheduler,
class PredecessorFuture>
537 Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskTeam, Scheduler,
539 KOKKOS_INLINE_FUNCTION TaskTeam(
540 Scheduler arg_scheduler, PredecessorFuture arg_future,
541 typename std::enable_if<Kokkos::is_scheduler<Scheduler>::value &&
542 Kokkos::is_future<PredecessorFuture>::value,
543 TaskPriority>::type arg_priority =
544 TaskPriority::Regular) {
545 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
547 "Can't create a task policy from a scheduler and a future from "
548 "a different scheduler");
550 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
555 template <
class T,
class Scheduler>
556 Impl::TaskPolicyWithPredecessor<Impl::TaskType::TaskSingle,
557 Kokkos::BasicFuture<T, Scheduler> >
558 KOKKOS_INLINE_FUNCTION
559 TaskSingle(Kokkos::BasicFuture<T, Scheduler> arg_future,
560 TaskPriority arg_priority = TaskPriority::Regular) {
561 return {std::move(arg_future), arg_priority};
564 template <
class Scheduler>
565 Impl::TaskPolicyWithScheduler<Impl::TaskType::TaskSingle, Scheduler>
566 KOKKOS_INLINE_FUNCTION
567 TaskSingle(Scheduler arg_scheduler,
568 typename std::enable_if<Kokkos::is_scheduler<Scheduler>::value,
569 TaskPriority>::type arg_priority =
570 TaskPriority::Regular) {
571 return {std::move(arg_scheduler), arg_priority};
574 template <
class Scheduler,
class PredecessorFuture>
575 Impl::TaskPolicyWithScheduler<Kokkos::Impl::TaskType::TaskSingle, Scheduler,
577 KOKKOS_INLINE_FUNCTION TaskSingle(
578 Scheduler arg_scheduler, PredecessorFuture arg_future,
579 typename std::enable_if<Kokkos::is_scheduler<Scheduler>::value &&
580 Kokkos::is_future<PredecessorFuture>::value,
581 TaskPriority>::type arg_priority =
582 TaskPriority::Regular) {
583 static_assert(std::is_same<
typename PredecessorFuture::scheduler_type,
585 "Can't create a task policy from a scheduler and a future from "
586 "a different scheduler");
588 return {std::move(arg_scheduler), std::move(arg_future), arg_priority};
599 template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
600 typename FunctorType>
601 typename Scheduler::template future_type_for_functor<
602 typename std::decay<FunctorType>::type>
603 host_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
605 FunctorType&& arg_functor) {
606 using scheduler_type = Scheduler;
608 typename scheduler_type::template runnable_task_type<FunctorType>;
610 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
611 TaskEnum == Impl::TaskType::TaskSingle,
612 "Kokkos host_spawn requires TaskTeam or TaskSingle");
616 typename task_type::function_type ptr;
617 typename task_type::destroy_type dtor;
618 Kokkos::Impl::TaskQueueSpecialization<
619 scheduler_type>::template get_function_pointer<task_type>(ptr, dtor);
621 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
622 std::forward<FunctorType>(arg_functor));
631 template <
int TaskEnum,
typename Scheduler,
typename DepFutureType,
632 typename FunctorType>
633 typename Scheduler::template future_type_for_functor<
634 typename std::decay<FunctorType>::type>
635 KOKKOS_INLINE_FUNCTION
636 task_spawn(Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType>
638 FunctorType&& arg_functor) {
639 using scheduler_type = Scheduler;
642 typename scheduler_type::template runnable_task_type<FunctorType>;
644 #if defined(KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST) && \
645 defined(KOKKOS_ENABLE_CUDA)
654 static_assert(TaskEnum == Impl::TaskType::TaskTeam ||
655 TaskEnum == Impl::TaskType::TaskSingle,
656 "Kokkos task_spawn requires TaskTeam or TaskSingle");
658 typename task_type::function_type
const ptr = task_type::apply;
659 typename task_type::destroy_type
const dtor = task_type::destroy;
661 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
662 std::forward<FunctorType>(arg_functor));
670 template <
typename FunctorType,
typename T>
671 void KOKKOS_INLINE_FUNCTION
673 TaskPriority
const& arg_priority = TaskPriority::Regular) {
674 static_assert(Kokkos::is_future<T>::value || Kokkos::is_scheduler<T>::value,
675 "Kokkos respawn argument must be Future or TaskScheduler");
677 T::scheduler_type::respawn(arg_self, arg, arg_priority);
693 template <
class ExecSpace,
class QueueType>
694 inline void wait(BasicTaskScheduler<ExecSpace, QueueType>
const& scheduler) {
695 using scheduler_type = BasicTaskScheduler<ExecSpace, QueueType>;
696 scheduler_type::specialization::execute(scheduler);
void KOKKOS_INLINE_FUNCTION respawn(FunctorType *arg_self, T const &arg, TaskPriority const &arg_priority=TaskPriority::Regular)
A task respawns itself with options.
Scheduler::template future_type_for_functor< typename std::decay< FunctorType >::type > host_spawn(Impl::TaskPolicyWithScheduler< TaskEnum, Scheduler, DepFutureType > arg_policy, FunctorType &&arg_functor)
A host control thread spawns a task with options.
Scheduler::template future_type_for_functor< typename std::decay< FunctorType >::type > KOKKOS_INLINE_FUNCTION task_spawn(Impl::TaskPolicyWithScheduler< TaskEnum, Scheduler, DepFutureType > arg_policy, FunctorType &&arg_functor)
A task spawns a task with options.