44 #ifndef KOKKOS_TASKSCHEDULER_HPP
45 #define KOKKOS_TASKSCHEDULER_HPP
49 #include <Kokkos_Macros.hpp>
50 #if defined( KOKKOS_ENABLE_TASKDAG )
52 #include <Kokkos_Core_fwd.hpp>
53 #include <Kokkos_TaskScheduler_fwd.hpp>
56 #include <Kokkos_MemoryPool.hpp>
57 #include <impl/Kokkos_Tags.hpp>
59 #include <Kokkos_Future.hpp>
60 #include <impl/Kokkos_TaskQueue.hpp>
61 #include <impl/Kokkos_SingleTaskQueue.hpp>
62 #include <impl/Kokkos_TaskQueueMultiple.hpp>
63 #include <impl/Kokkos_TaskPolicyData.hpp>
64 #include <impl/Kokkos_TaskTeamMember.hpp>
65 #include <impl/Kokkos_SimpleTaskScheduler.hpp>
74 template <
class,
class>
80 template<
class ExecSpace,
class QueueType>
81 class BasicTaskScheduler :
public Impl::TaskSchedulerBase
85 using scheduler_type = BasicTaskScheduler;
86 using execution_space = ExecSpace;
87 using queue_type = QueueType;
88 using memory_space =
typename queue_type::memory_space;
89 using memory_pool =
typename queue_type::memory_pool;
90 using specialization = Impl::TaskQueueSpecialization<BasicTaskScheduler>;
91 using member_type =
typename specialization::member_type;
92 using team_scheduler_type = BasicTaskScheduler;
93 template <
class Functor>
94 using runnable_task_type = Impl::Task<scheduler_type, typename Functor::value_type, Functor>;
95 template <
class ValueType>
96 using future_type = Kokkos::BasicFuture<ValueType, BasicTaskScheduler>;
97 template <
class FunctorType>
98 using future_type_for_functor = future_type<typename FunctorType::value_type>;
102 using track_type = Kokkos::Impl::SharedAllocationTracker ;
103 using task_base = Impl::TaskBase;
106 queue_type * m_queue;
110 template <
typename,
typename>
111 friend class Impl::TaskQueue;
113 friend struct Impl::TaskQueueSpecialization;
114 template <
typename,
typename>
115 friend class Impl::TaskQueueSpecializationConstrained;
116 template <
typename,
typename>
117 friend class Impl::TaskTeamMemberAdapter;
118 template <
typename,
typename>
119 friend class Impl::TaskExec;
123 KOKKOS_INLINE_FUNCTION
125 track_type arg_track,
126 queue_type* arg_queue
128 : m_track(std::move(arg_track)),
129 m_queue(std::move(arg_queue))
132 KOKKOS_INLINE_FUNCTION
133 team_scheduler_type get_team_scheduler(
int team_rank)
const {
134 return { m_track, &m_queue->get_team_queue(team_rank) };
139 KOKKOS_INLINE_FUNCTION
140 static constexpr task_base* _get_task_ptr(std::nullptr_t) {
return nullptr; }
142 template <
class ValueType>
143 KOKKOS_INLINE_FUNCTION
144 static constexpr task_base* _get_task_ptr(future_type<ValueType>&& f)
149 template<
int TaskEnum ,
typename DepTaskType ,
typename FunctorType >
151 Kokkos::BasicFuture<typename FunctorType::value_type, scheduler_type>
153 DepTaskType* arg_predecessor_task,
154 TaskPriority arg_priority,
155 typename task_base::function_type arg_function,
156 typename task_base::destroy_type arg_destroy,
157 FunctorType&& arg_functor
160 using functor_future_type = future_type_for_functor<typename std::decay<FunctorType>::type>;
161 using task_type = Impl::Task<BasicTaskScheduler, typename functor_future_type::value_type, FunctorType>;
172 functor_future_type f ;
176 const size_t alloc_size =
177 m_queue->template spawn_allocation_size< FunctorType >();
179 void* task_storage = m_queue->allocate(alloc_size);
187 f.m_task =
new (task_storage) task_type( std::forward<FunctorType>(arg_functor) );
189 f.m_task->m_apply = arg_function;
191 f.m_task->m_queue = m_queue;
192 f.m_task->m_next = arg_predecessor_task;
193 f.m_task->m_ref_count = 2;
194 f.m_task->m_alloc_size = alloc_size;
195 f.m_task->m_task_type = TaskEnum;
196 f.m_task->m_priority = (int16_t)arg_priority;
198 Kokkos::memory_fence();
205 m_queue->schedule_runnable( f.m_task );
217 KOKKOS_INLINE_FUNCTION
218 BasicTaskScheduler() : m_track(), m_queue(0) {}
220 KOKKOS_INLINE_FUNCTION
221 BasicTaskScheduler( BasicTaskScheduler && rhs ) noexcept
222 : m_track(rhs.m_track),
223 m_queue(std::move(rhs.m_queue))
226 KOKKOS_INLINE_FUNCTION
227 BasicTaskScheduler( BasicTaskScheduler
const & rhs )
228 : m_track(rhs.m_track),
232 KOKKOS_INLINE_FUNCTION
233 BasicTaskScheduler& operator=(BasicTaskScheduler&& rhs) noexcept
235 m_track = rhs.m_track;
236 m_queue = std::move(rhs.m_queue);
240 KOKKOS_INLINE_FUNCTION
241 BasicTaskScheduler& operator=(BasicTaskScheduler
const& rhs)
243 m_track = rhs.m_track;
244 m_queue = rhs.m_queue;
248 explicit BasicTaskScheduler(memory_pool
const & arg_memory_pool) noexcept
249 : m_track(), m_queue(0)
251 typedef Kokkos::Impl::SharedAllocationRecord
252 < memory_space ,
typename queue_type::Destroy >
255 record_type * record =
256 record_type::allocate( memory_space()
261 m_queue =
new( record->data() ) queue_type( arg_memory_pool );
263 record->m_destroy.m_queue = m_queue ;
265 m_track.assign_allocated_record_to_uninitialized( record );
268 BasicTaskScheduler( memory_space
const & arg_memory_space
269 ,
size_t const mempool_capacity
270 ,
unsigned const mempool_min_block_size
271 ,
unsigned const mempool_max_block_size
272 ,
unsigned const mempool_superblock_size
274 : BasicTaskScheduler( memory_pool( arg_memory_space
276 , mempool_min_block_size
277 , mempool_max_block_size
278 , mempool_superblock_size ) )
283 KOKKOS_INLINE_FUNCTION
284 queue_type& queue() const noexcept {
285 KOKKOS_EXPECTS(m_queue !=
nullptr);
289 KOKKOS_INLINE_FUNCTION
290 memory_pool * memory() const noexcept
291 {
return m_queue ? &( m_queue->m_memory ) : (memory_pool*) 0 ; }
295 template<
typename FunctorType >
297 size_t spawn_allocation_size()
const
298 {
return m_queue->template spawn_allocation_size< FunctorType >(); }
302 size_t when_all_allocation_size(
int narg )
const
303 {
return m_queue->when_all_allocation_size( narg ); }
308 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
309 KOKKOS_FUNCTION
static
310 Kokkos::BasicFuture<typename FunctorType::value_type, scheduler_type>
312 Impl::TaskPolicyWithScheduler<TaskEnum, scheduler_type, DepFutureType>&& arg_policy,
313 typename task_base::function_type arg_function,
314 typename task_base::destroy_type arg_destroy,
315 FunctorType&& arg_functor
318 return std::move(arg_policy.scheduler()).
template _spawn_impl<TaskEnum>(
319 _get_task_ptr(std::move(arg_policy.predecessor())),
320 arg_policy.priority(),
323 std::forward<FunctorType>(arg_functor)
327 template <
int TaskEnum,
typename DepFutureType,
typename FunctorType>
329 future_type_for_functor<typename std::decay<FunctorType>::type>
331 Impl::TaskPolicyWithPredecessor<TaskEnum, DepFutureType>&& arg_policy,
332 FunctorType&& arg_functor
335 using task_type = runnable_task_type<FunctorType>;
336 typename task_type::function_type
const ptr = task_type::apply;
337 typename task_type::destroy_type
const dtor = task_type::destroy;
339 return _spawn_impl<TaskEnum>(
340 _get_task_ptr(std::move(arg_policy).predecessor()),
341 arg_policy.priority(),
343 std::forward<FunctorType>(arg_functor)
347 template<
typename FunctorType,
typename ValueType,
typename Scheduler>
348 KOKKOS_FUNCTION
static
351 FunctorType* arg_self,
352 BasicFuture<ValueType,Scheduler>
const & arg_dependence,
353 TaskPriority
const & arg_priority
357 using value_type =
typename FunctorType::value_type ;
358 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
360 task_type *
const task =
static_cast< task_type *
>( arg_self );
362 task->m_priority =
static_cast<int>(arg_priority);
364 task->add_dependence( arg_dependence.m_task );
369 template<
typename FunctorType >
370 KOKKOS_FUNCTION
static
373 FunctorType* arg_self,
374 BasicTaskScheduler
const &,
375 TaskPriority
const & arg_priority
380 using value_type =
typename FunctorType::value_type;
381 using task_type = Impl::Task<BasicTaskScheduler, value_type, FunctorType>;
383 task_type *
const task =
static_cast< task_type *
>( arg_self );
385 task->m_priority =
static_cast<int>(arg_priority);
387 task->add_dependence( (task_base*) 0 );
396 template<
typename ValueType>
398 BasicFuture< void, scheduler_type >
399 when_all(BasicFuture<ValueType, BasicTaskScheduler>
const arg[],
int narg)
402 future_type<void> f ;
406 queue_type* q = m_queue;
410 for (
int i = 0 ; i < narg ; ++i ) {
411 task_base *
const t = arg[i].m_task ;
412 if (
nullptr != t ) {
414 Kokkos::atomic_increment( &(t->m_ref_count) );
415 if(q != static_cast< queue_type const* >(t->m_queue)) {
416 Kokkos::abort(
"Kokkos when_all Futures must be in the same scheduler" );
423 size_t const alloc_size = q->when_all_allocation_size( narg );
426 reinterpret_cast< task_base *
>( q->allocate( alloc_size ) );
435 new( f.m_task ) task_base();
437 f.m_task->m_queue = q;
438 f.m_task->m_ref_count = 2 ;
439 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
440 f.m_task->m_dep_count = narg ;
441 f.m_task->m_task_type = task_base::Aggregate ;
445 task_base *
volatile *
const dep =
446 f.m_task->aggregate_dependences();
448 for (
int i = 0 ; i < narg ; ++i ) { dep[i] = arg[i].m_task ; }
450 Kokkos::memory_fence();
452 q->schedule_aggregate( f.m_task );
463 BasicFuture< void, scheduler_type >
464 when_all(
int narg , F
const func )
466 using input_type = decltype( func(0) );
468 static_assert( is_future< input_type >::value
469 ,
"Functor must return a Kokkos::Future" );
471 future_type<void> f ;
473 if ( 0 == narg )
return f ;
475 size_t const alloc_size = m_queue->when_all_allocation_size( narg );
478 reinterpret_cast< task_base *
>( m_queue->allocate( alloc_size ) );
486 new( f.m_task ) task_base();
490 f.m_task->m_queue = m_queue;
491 f.m_task->m_ref_count = 2 ;
492 f.m_task->m_alloc_size =
static_cast<int32_t
>(alloc_size);
493 f.m_task->m_dep_count = narg ;
494 f.m_task->m_task_type = task_base::Aggregate ;
500 task_base *
volatile *
const dep =
501 f.m_task->aggregate_dependences();
503 for (
int i = 0 ; i < narg ; ++i ) {
504 const input_type arg_f = func(i);
505 if ( 0 != arg_f.m_task ) {
512 Kokkos::atomic_increment( &(arg_f.m_task->m_ref_count) );
513 dep[i] = arg_f.m_task ;
517 Kokkos::memory_fence();
519 m_queue->schedule_aggregate( f.m_task );
527 KOKKOS_INLINE_FUNCTION
528 int allocation_capacity() const noexcept
529 {
return m_queue->m_memory.capacity(); }
531 KOKKOS_INLINE_FUNCTION
532 int allocated_task_count() const noexcept
533 {
return m_queue->m_count_alloc ; }
535 KOKKOS_INLINE_FUNCTION
536 int allocated_task_count_max() const noexcept
537 {
return m_queue->m_max_alloc ; }
539 KOKKOS_INLINE_FUNCTION
540 long allocated_task_count_accum() const noexcept
541 {
return m_queue->m_accum_alloc ; }
545 template<
class S,
class Q>
547 void wait(Kokkos::BasicTaskScheduler<S, Q>
const&);
561 template <
class T,
class Scheduler>
562 Impl::TaskPolicyWithPredecessor<
563 Impl::TaskType::TaskTeam,
564 Kokkos::BasicFuture<T, Scheduler>
566 KOKKOS_INLINE_FUNCTION
568 Kokkos::BasicFuture<T, Scheduler> arg_future,
569 TaskPriority arg_priority = TaskPriority::Regular
572 return { std::move(arg_future), arg_priority };
575 template <
class Scheduler>
576 Impl::TaskPolicyWithScheduler<
577 Impl::TaskType::TaskTeam, Scheduler
579 KOKKOS_INLINE_FUNCTION
581 Scheduler arg_scheduler,
582 typename std::enable_if<
583 Kokkos::is_scheduler<Scheduler>::value,
585 >::type arg_priority = TaskPriority::Regular
588 return { std::move(arg_scheduler), arg_priority };
593 class PredecessorFuture
595 Impl::TaskPolicyWithScheduler<
596 Kokkos::Impl::TaskType::TaskTeam,
600 KOKKOS_INLINE_FUNCTION
602 Scheduler arg_scheduler,
603 PredecessorFuture arg_future,
604 typename std::enable_if<
605 Kokkos::is_scheduler<Scheduler>::value
606 && Kokkos::is_future<PredecessorFuture>::value,
608 >::type arg_priority = TaskPriority::Regular
612 std::is_same<typename PredecessorFuture::scheduler_type, Scheduler>::value,
613 "Can't create a task policy from a scheduler and a future from a different scheduler"
616 return { std::move(arg_scheduler), std::move(arg_future), arg_priority };
621 template <
class T,
class Scheduler>
622 Impl::TaskPolicyWithPredecessor<
623 Impl::TaskType::TaskSingle,
624 Kokkos::BasicFuture<T, Scheduler>
626 KOKKOS_INLINE_FUNCTION
628 Kokkos::BasicFuture<T, Scheduler> arg_future,
629 TaskPriority arg_priority = TaskPriority::Regular
632 return { std::move(arg_future), arg_priority };
635 template <
class Scheduler>
636 Impl::TaskPolicyWithScheduler<
637 Impl::TaskType::TaskSingle, Scheduler
639 KOKKOS_INLINE_FUNCTION
641 Scheduler arg_scheduler,
642 typename std::enable_if<
643 Kokkos::is_scheduler<Scheduler>::value,
645 >::type arg_priority = TaskPriority::Regular
648 return { std::move(arg_scheduler), arg_priority };
653 class PredecessorFuture
655 Impl::TaskPolicyWithScheduler<
656 Kokkos::Impl::TaskType::TaskSingle,
660 KOKKOS_INLINE_FUNCTION
662 Scheduler arg_scheduler,
663 PredecessorFuture arg_future,
664 typename std::enable_if<
665 Kokkos::is_scheduler<Scheduler>::value
666 && Kokkos::is_future<PredecessorFuture>::value,
668 >::type arg_priority = TaskPriority::Regular
672 std::is_same<typename PredecessorFuture::scheduler_type, Scheduler>::value,
673 "Can't create a task policy from a scheduler and a future from a different scheduler"
676 return { std::move(arg_scheduler), std::move(arg_future), arg_priority };
687 template<
int TaskEnum,
typename Scheduler,
typename DepFutureType,
typename FunctorType>
688 typename Scheduler::template future_type_for_functor<typename std::decay<FunctorType>::type>
690 Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType> arg_policy,
691 FunctorType&& arg_functor
693 using scheduler_type = Scheduler;
695 typename scheduler_type::template runnable_task_type<FunctorType>;
698 TaskEnum == Impl::TaskType::TaskTeam || TaskEnum == Impl::TaskType::TaskSingle,
699 "Kokkos host_spawn requires TaskTeam or TaskSingle"
704 typename task_type::function_type ptr;
705 typename task_type::destroy_type dtor;
706 Kokkos::Impl::TaskQueueSpecialization< scheduler_type >::
707 template get_function_pointer< task_type >(ptr, dtor);
709 return scheduler_type::spawn(
710 std::move(arg_policy), ptr, dtor, std::forward<FunctorType>(arg_functor)
720 template<
int TaskEnum,
typename Scheduler,
typename DepFutureType,
typename FunctorType>
721 typename Scheduler::template future_type_for_functor<typename std::decay<FunctorType>::type>
722 KOKKOS_INLINE_FUNCTION
724 Impl::TaskPolicyWithScheduler<TaskEnum, Scheduler, DepFutureType> arg_policy,
725 FunctorType&& arg_functor
728 using scheduler_type = Scheduler;
731 typename scheduler_type::template runnable_task_type<FunctorType>;
733 #if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST ) && \
734 defined( KOKKOS_ENABLE_CUDA )
736 static_assert( ! std::is_same< Kokkos::Cuda , typename Scheduler::execution_space >::value
737 ,
"Error calling Kokkos::task_spawn for Cuda space within Host code" );
742 TaskEnum == Impl::TaskType::TaskTeam || TaskEnum == Impl::TaskType::TaskSingle,
743 "Kokkos task_spawn requires TaskTeam or TaskSingle"
746 typename task_type::function_type
const ptr = task_type::apply ;
747 typename task_type::destroy_type
const dtor = task_type::destroy ;
749 return scheduler_type::spawn(std::move(arg_policy), ptr, dtor,
750 std::forward<FunctorType>(arg_functor)
759 template<
typename FunctorType ,
typename T >
761 KOKKOS_INLINE_FUNCTION
764 , TaskPriority
const & arg_priority = TaskPriority::Regular
767 static_assert( Kokkos::is_future<T>::value ||
768 Kokkos::is_scheduler<T>::value
769 ,
"Kokkos respawn argument must be Future or TaskScheduler" );
771 T::scheduler_type::respawn(
772 arg_self , arg , arg_priority
789 template<
class ExecSpace,
class QueueType>
791 void wait(BasicTaskScheduler<ExecSpace, QueueType>
const& scheduler)
793 using scheduler_type = BasicTaskScheduler<ExecSpace, QueueType>;
794 scheduler_type::specialization::execute(scheduler);
Scheduler::template future_type_for_functor< typename std::decay< FunctorType >::type > host_spawn(Impl::TaskPolicyWithScheduler< TaskEnum, Scheduler, DepFutureType > arg_policy, FunctorType &&arg_functor)
A host control thread spawns a task with options.
void KOKKOS_INLINE_FUNCTION respawn(FunctorType *arg_self, T const &arg, TaskPriority const &arg_priority=TaskPriority::Regular)
A task respawns itself with options.
Scheduler::template future_type_for_functor< typename std::decay< FunctorType >::type > KOKKOS_INLINE_FUNCTION task_spawn(Impl::TaskPolicyWithScheduler< TaskEnum, Scheduler, DepFutureType > arg_policy, FunctorType &&arg_functor)
A task spawns a task with options.