44 #ifndef KOKKOS_TASKSCHEDULER_HPP
45 #define KOKKOS_TASKSCHEDULER_HPP
49 #include <Kokkos_Macros.hpp>
50 #if defined( KOKKOS_ENABLE_TASKDAG )
52 #include <Kokkos_Core_fwd.hpp>
55 #include <Kokkos_MemoryPool.hpp>
56 #include <impl/Kokkos_Tags.hpp>
64 template<
typename Arg1 =
void ,
typename Arg2 =
void >
67 template<
typename Space >
70 template<
typename Space >
71 void wait( TaskScheduler< Space >
const & );
73 template<
typename Space >
74 struct is_scheduler :
public std::false_type {};
76 template<
typename Space >
77 struct is_scheduler< TaskScheduler< Space > > :
public std::true_type {};
81 #include <impl/Kokkos_TaskQueue.hpp>
103 template<
typename Space ,
typename ResultType ,
typename FunctorType >
120 template<
typename Arg1 ,
typename Arg2 >
124 template<
typename >
friend class TaskScheduler ;
125 template<
typename ,
typename >
friend class Future ;
126 template<
typename ,
typename ,
typename >
friend class Impl::TaskBase ;
128 enum { Arg1_is_space = Kokkos::is_space< Arg1 >::value };
129 enum { Arg2_is_space = Kokkos::is_space< Arg2 >::value };
130 enum { Arg1_is_value = ! Arg1_is_space &&
131 ! std::is_same< Arg1 , void >::value };
132 enum { Arg2_is_value = ! Arg2_is_space &&
133 ! std::is_same< Arg2 , void >::value };
135 static_assert( ! ( Arg1_is_space && Arg2_is_space )
136 ,
"Future cannot be given two spaces" );
138 static_assert( ! ( Arg1_is_value && Arg2_is_value )
139 ,
"Future cannot be given two value types" );
142 typename std::conditional< Arg1_is_value , Arg1 ,
143 typename std::conditional< Arg2_is_value , Arg2 ,
void
147 typename std::conditional< Arg1_is_space , Arg1 ,
148 typename std::conditional< Arg2_is_space , Arg2 ,
void
151 using task_base = Impl::TaskBase< void , void , void > ;
152 using queue_type = Impl::TaskQueue< Space > ;
156 KOKKOS_INLINE_FUNCTION
explicit
157 Future( task_base * task ) : m_task(0)
158 {
if ( task ) queue_type::assign( & m_task , task ); }
164 using execution_space =
typename Space::execution_space ;
165 using value_type = ValueType ;
169 KOKKOS_INLINE_FUNCTION
170 bool is_null()
const {
return 0 == m_task ; }
172 KOKKOS_INLINE_FUNCTION
173 int reference_count()
const
174 {
return 0 != m_task ? m_task->reference_count() : 0 ; }
178 KOKKOS_INLINE_FUNCTION
180 {
if ( m_task ) queue_type::assign( & m_task , (task_base*)0 ); }
184 KOKKOS_INLINE_FUNCTION
185 ~Future() { clear(); }
189 KOKKOS_INLINE_FUNCTION
190 constexpr Future() noexcept : m_task(0) {}
192 KOKKOS_INLINE_FUNCTION
193 Future( Future && rhs )
194 : m_task( rhs.m_task ) { rhs.m_task = 0 ; }
196 KOKKOS_INLINE_FUNCTION
197 Future(
const Future & rhs )
199 {
if ( rhs.m_task ) queue_type::assign( & m_task , rhs.m_task ); }
201 KOKKOS_INLINE_FUNCTION
202 Future & operator = ( Future && rhs )
205 m_task = rhs.m_task ;
210 KOKKOS_INLINE_FUNCTION
211 Future & operator = (
const Future & rhs )
213 if ( m_task || rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
219 template<
class A1 ,
class A2 >
220 KOKKOS_INLINE_FUNCTION
221 Future( Future<A1,A2> && rhs )
222 : m_task( rhs.m_task )
225 ( std::is_same< Space , void >::value ||
226 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
227 ,
"Assigned Futures must have the same space" );
230 ( std::is_same< value_type , void >::value ||
231 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
232 ,
"Assigned Futures must have the same value_type" );
237 template<
class A1 ,
class A2 >
238 KOKKOS_INLINE_FUNCTION
239 Future(
const Future<A1,A2> & rhs )
243 ( std::is_same< Space , void >::value ||
244 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
245 ,
"Assigned Futures must have the same space" );
248 ( std::is_same< value_type , void >::value ||
249 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
250 ,
"Assigned Futures must have the same value_type" );
252 if ( rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
255 template<
class A1 ,
class A2 >
256 KOKKOS_INLINE_FUNCTION
257 Future & operator = (
const Future<A1,A2> & rhs )
260 ( std::is_same< Space , void >::value ||
261 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
262 ,
"Assigned Futures must have the same space" );
265 ( std::is_same< value_type , void >::value ||
266 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
267 ,
"Assigned Futures must have the same value_type" );
269 if ( m_task || rhs.m_task ) queue_type::assign( & m_task , rhs.m_task );
273 template<
class A1 ,
class A2 >
274 KOKKOS_INLINE_FUNCTION
275 Future & operator = ( Future<A1,A2> && rhs )
278 ( std::is_same< Space , void >::value ||
279 std::is_same< Space ,
typename Future<A1,A2>::Space >::value
280 ,
"Assigned Futures must have the same space" );
283 ( std::is_same< value_type , void >::value ||
284 std::is_same< value_type ,
typename Future<A1,A2>::value_type >::value
285 ,
"Assigned Futures must have the same value_type" );
288 m_task = rhs.m_task ;
295 KOKKOS_INLINE_FUNCTION
296 int is_ready() const noexcept
297 {
return ( 0 == m_task ) || ( ((task_base*) task_base::LockTag) == m_task->m_wait ); }
299 KOKKOS_INLINE_FUNCTION
300 const typename Impl::TaskResult< ValueType >::reference_type
304 Kokkos::abort(
"Kokkos:::Future::get ERROR: is_null()");
306 return Impl::TaskResult< ValueType >::get( m_task );
311 template<
typename ,
typename ExecSpace =
void >
312 struct is_future :
public std::false_type {};
314 template<
typename Arg1 ,
typename Arg2 ,
typename ExecSpace >
315 struct is_future< Future<Arg1,Arg2> , ExecSpace >
316 :
public std::integral_constant
318 ( std::is_same< ExecSpace , void >::value ||
319 std::is_same< ExecSpace
320 , typename Future<Arg1,Arg2>::execution_space >::value )
330 enum class TaskPriority : int { High = 0
344 template<
int TaskEnum ,
typename DepFutureType >
345 struct TaskPolicyData
347 using execution_space =
typename DepFutureType::execution_space ;
348 using scheduler_type = TaskScheduler< execution_space > ;
350 enum :
int { m_task_type = TaskEnum };
352 scheduler_type
const * m_scheduler ;
353 DepFutureType
const m_dependence ;
356 TaskPolicyData() = delete ;
357 TaskPolicyData( TaskPolicyData && ) = default ;
358 TaskPolicyData( TaskPolicyData
const & ) = default ;
359 TaskPolicyData & operator = ( TaskPolicyData && ) = default ;
360 TaskPolicyData & operator = ( TaskPolicyData
const & ) = default ;
362 KOKKOS_INLINE_FUNCTION
363 TaskPolicyData( DepFutureType
const & arg_future
364 , Kokkos::TaskPriority
const & arg_priority )
366 , m_dependence( arg_future )
367 , m_priority( static_cast<int>( arg_priority ) )
370 KOKKOS_INLINE_FUNCTION
371 TaskPolicyData( scheduler_type
const & arg_scheduler
372 , Kokkos::TaskPriority
const & arg_priority )
373 : m_scheduler( & arg_scheduler )
375 , m_priority( static_cast<int>( arg_priority ) )
378 KOKKOS_INLINE_FUNCTION
379 TaskPolicyData( scheduler_type
const & arg_scheduler
380 , DepFutureType
const & arg_future
381 , Kokkos::TaskPriority
const & arg_priority )
382 : m_scheduler( & arg_scheduler )
383 , m_dependence( arg_future )
384 , m_priority( static_cast<int>( arg_priority ) )
396 template<
typename ExecSpace >
401 using track_type = Kokkos::Impl::SharedAllocationTracker ;
402 using queue_type = Kokkos::Impl::TaskQueue< ExecSpace > ;
403 using task_base = Impl::TaskBase< void , void , void > ;
406 queue_type * m_queue ;
412 using execution_space = ExecSpace ;
413 using memory_space =
typename queue_type::memory_space ;
414 using memory_pool =
typename queue_type::memory_pool ;
416 typename Kokkos::Impl::TaskQueueSpecialization< ExecSpace >::member_type ;
418 KOKKOS_INLINE_FUNCTION
419 TaskScheduler() : m_track(), m_queue(0) {}
421 KOKKOS_INLINE_FUNCTION
422 TaskScheduler( TaskScheduler && rhs )
423 : m_track( rhs.m_track ), m_queue( rhs.m_queue ) {}
425 KOKKOS_INLINE_FUNCTION
426 TaskScheduler( TaskScheduler
const & rhs )
427 : m_track( rhs.m_track ), m_queue( rhs.m_queue ) {}
429 KOKKOS_INLINE_FUNCTION
430 TaskScheduler & operator = ( TaskScheduler && rhs )
431 { m_track = rhs.m_track ; m_queue = rhs.m_queue ;
return *this ; }
433 KOKKOS_INLINE_FUNCTION
434 TaskScheduler & operator = ( TaskScheduler
const & rhs )
435 { m_track = rhs.m_track ; m_queue = rhs.m_queue ;
return *this ; }
437 TaskScheduler( memory_pool
const & arg_memory_pool )
441 typedef Kokkos::Impl::SharedAllocationRecord
442 < memory_space ,
typename queue_type::Destroy >
445 record_type * record =
446 record_type::allocate( memory_space()
451 m_queue =
new( record->data() ) queue_type( arg_memory_pool );
453 record->m_destroy.m_queue = m_queue ;
455 m_track.assign_allocated_record_to_uninitialized( record );
458 TaskScheduler( memory_space
const & arg_memory_space
459 ,
size_t const mempool_capacity
460 ,
unsigned const mempool_min_block_size
461 ,
unsigned const mempool_max_block_size
462 ,
unsigned const mempool_superblock_size
464 : TaskScheduler( memory_pool( arg_memory_space
466 , mempool_min_block_size
467 , mempool_max_block_size
468 , mempool_superblock_size ) )
473 KOKKOS_INLINE_FUNCTION
474 memory_pool * memory() const noexcept
475 {
return m_queue ? &( m_queue->m_memory ) : (memory_pool*) 0 ; }
479 template<
typename FunctorType >
481 size_t spawn_allocation_size()
const
482 {
return m_queue->template spawn_allocation_size< FunctorType >(); }
486 size_t when_all_allocation_size(
int narg )
const
487 {
return m_queue->when_all_allocation_size( narg ); }
491 template<
int TaskEnum ,
typename DepFutureType ,
typename FunctorType >
492 KOKKOS_FUNCTION
static
494 spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
495 ,
typename task_base::function_type arg_function
496 , FunctorType && arg_functor
499 using value_type =
typename FunctorType::value_type ;
500 using future_type = Future< value_type , execution_space > ;
501 using task_type = Impl::TaskBase< execution_space
505 queue_type *
const queue =
506 arg_policy.m_scheduler ? arg_policy.m_scheduler->m_queue : (
507 arg_policy.m_dependence.m_task
508 ?
static_cast<queue_type*
>(arg_policy.m_dependence.m_task->m_queue)
512 Kokkos::abort(
"Kokkos spawn requires scheduler or non-null Future");
515 if ( arg_policy.m_dependence.m_task != 0 &&
516 arg_policy.m_dependence.m_task->m_queue != queue ) {
517 Kokkos::abort(
"Kokkos spawn given incompatible scheduler and Future");
524 queue->iff_single_thread_recursive_execute();
532 const size_t alloc_size =
533 queue->template spawn_allocation_size< FunctorType >();
536 reinterpret_cast< task_type *
>(queue->allocate(alloc_size) );
544 new ( f.m_task ) task_type( std::move(arg_functor) );
546 f.m_task->m_apply = arg_function ;
547 f.m_task->m_queue = queue ;
548 f.m_task->m_next = arg_policy.m_dependence.m_task ;
549 f.m_task->m_ref_count = 2 ;
550 f.m_task->m_alloc_size = alloc_size ;
551 f.m_task->m_task_type = arg_policy.m_task_type ;
552 f.m_task->m_priority = arg_policy.m_priority ;
554 Kokkos::memory_fence();
561 queue->schedule_runnable( f.m_task );
569 template<
typename FunctorType ,
typename A1 ,
typename A2 >
570 KOKKOS_FUNCTION
static
572 respawn( FunctorType * arg_self
573 , Future<A1,A2>
const & arg_dependence
574 , TaskPriority
const & arg_priority
579 using value_type =
typename FunctorType::value_type ;
580 using task_type = Impl::TaskBase< execution_space
584 task_type *
const task =
static_cast< task_type *
>( arg_self );
586 task->m_priority =
static_cast<int>(arg_priority);
588 task->add_dependence( arg_dependence.m_task );
593 template<
typename FunctorType >
594 KOKKOS_FUNCTION
static
596 respawn( FunctorType * arg_self
597 , TaskScheduler
const &
598 , TaskPriority
const & arg_priority
603 using value_type =
typename FunctorType::value_type ;
604 using task_type = Impl::TaskBase< execution_space
608 task_type *
const task =
static_cast< task_type *
>( arg_self );
610 task->m_priority =
static_cast<int>(arg_priority);
612 task->add_dependence( (task_base*) 0 );
621 template<
typename A1 ,
typename A2 >
622 KOKKOS_FUNCTION
static
623 Future< execution_space >
624 when_all( Future< A1 , A2 >
const arg[] ,
int narg )
626 using future_type = Future< execution_space > ;
632 queue_type * queue = 0 ;
634 for (
int i = 0 ; i < narg ; ++i ) {
635 task_base *
const t = arg[i].m_task ;
638 Kokkos::atomic_increment( &(t->m_ref_count) );
640 queue =
static_cast< queue_type *
>( t->m_queue );
642 else if ( queue != static_cast< queue_type * >( t->m_queue ) ) {
643 Kokkos::abort(
"Kokkos when_all Futures must be in the same scheduler" );
650 size_t const alloc_size = queue->when_all_allocation_size( narg );
653 reinterpret_cast< task_base *
>( queue->allocate( alloc_size ) );
661 new( f.m_task ) task_base();
663 f.m_task->m_queue = queue ;
664 f.m_task->m_ref_count = 2 ;
665 f.m_task->m_alloc_size = alloc_size ;
666 f.m_task->m_dep_count = narg ;
667 f.m_task->m_task_type = task_base::Aggregate ;
671 task_base *
volatile *
const dep =
672 f.m_task->aggregate_dependences();
674 for (
int i = 0 ; i < narg ; ++i ) { dep[i] = arg[i].m_task ; }
676 Kokkos::memory_fence();
678 queue->schedule_aggregate( f.m_task );
689 Future< execution_space >
690 when_all(
int narg , F
const func )
692 using input_type = decltype( func(0) );
693 using future_type = Future< execution_space > ;
695 static_assert( is_future< input_type >::value
696 ,
"Functor must return a Kokkos::Future" );
700 if ( 0 == narg )
return f ;
702 size_t const alloc_size = m_queue->when_all_allocation_size( narg );
705 reinterpret_cast< task_base *
>( m_queue->allocate( alloc_size ) );
713 new( f.m_task ) task_base();
715 f.m_task->m_queue = m_queue ;
716 f.m_task->m_ref_count = 2 ;
717 f.m_task->m_alloc_size = alloc_size ;
718 f.m_task->m_dep_count = narg ;
719 f.m_task->m_task_type = task_base::Aggregate ;
723 task_base *
volatile *
const dep =
724 f.m_task->aggregate_dependences();
726 for (
int i = 0 ; i < narg ; ++i ) {
727 const input_type arg_f = func(i);
728 if ( 0 != arg_f.m_task ) {
730 if ( m_queue != static_cast< queue_type * >( arg_f.m_task->m_queue ) ) {
731 Kokkos::abort(
"Kokkos when_all Futures must be in the same scheduler" );
734 Kokkos::atomic_increment( &(arg_f.m_task->m_ref_count) );
735 dep[i] = arg_f.m_task ;
739 Kokkos::memory_fence();
741 m_queue->schedule_aggregate( f.m_task );
749 KOKKOS_INLINE_FUNCTION
750 int allocation_capacity() const noexcept
751 {
return m_queue->m_memory.capacity(); }
753 KOKKOS_INLINE_FUNCTION
754 int allocated_task_count() const noexcept
755 {
return m_queue->m_count_alloc ; }
757 KOKKOS_INLINE_FUNCTION
758 int allocated_task_count_max() const noexcept
759 {
return m_queue->m_max_alloc ; }
761 KOKKOS_INLINE_FUNCTION
762 long allocated_task_count_accum() const noexcept
763 {
return m_queue->m_accum_alloc ; }
767 template<
typename S >
769 void Kokkos::wait( Kokkos::TaskScheduler< S >
const & );
783 template<
typename T >
784 Kokkos::Impl::TaskPolicyData
785 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam
786 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
789 KOKKOS_INLINE_FUNCTION
790 TaskTeam( T
const & arg
791 , TaskPriority
const & arg_priority = TaskPriority::Regular
794 static_assert( Kokkos::is_future<T>::value ||
795 Kokkos::is_scheduler<T>::value
796 ,
"Kokkos TaskTeam argument must be Future or TaskScheduler" );
799 Kokkos::Impl::TaskPolicyData
800 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam
801 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
803 >( arg , arg_priority );
806 template<
typename E ,
typename F >
808 TaskPolicyData< Kokkos::Impl::TaskBase<void,void,void>::TaskTeam , F >
809 KOKKOS_INLINE_FUNCTION
810 TaskTeam( TaskScheduler<E>
const & arg_scheduler
811 , F
const & arg_future
812 ,
typename std::enable_if< Kokkos::is_future<F>::value ,
813 TaskPriority >::type
const & arg_priority = TaskPriority::Regular
817 Kokkos::Impl::TaskPolicyData
818 < Kokkos::Impl::TaskBase<void,void,void>::TaskTeam , F >
819 ( arg_scheduler , arg_future , arg_priority );
824 template<
typename T >
825 Kokkos::Impl::TaskPolicyData
826 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle
827 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
830 KOKKOS_INLINE_FUNCTION
831 TaskSingle( T
const & arg
832 , TaskPriority
const & arg_priority = TaskPriority::Regular
835 static_assert( Kokkos::is_future<T>::value ||
836 Kokkos::is_scheduler<T>::value
837 ,
"Kokkos TaskSingle argument must be Future or TaskScheduler" );
840 Kokkos::Impl::TaskPolicyData
841 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle
842 ,
typename std::conditional< Kokkos::is_future< T >::value , T ,
844 >( arg , arg_priority );
847 template<
typename E ,
typename F >
849 TaskPolicyData< Kokkos::Impl::TaskBase<void,void,void>::TaskSingle , F >
850 KOKKOS_INLINE_FUNCTION
851 TaskSingle( TaskScheduler<E>
const & arg_scheduler
852 , F
const & arg_future
853 ,
typename std::enable_if< Kokkos::is_future<F>::value ,
854 TaskPriority >::type
const & arg_priority = TaskPriority::Regular
858 Kokkos::Impl::TaskPolicyData
859 < Kokkos::Impl::TaskBase<void,void,void>::TaskSingle , F >
860 ( arg_scheduler , arg_future , arg_priority );
871 template<
int TaskEnum
872 ,
typename DepFutureType
873 ,
typename FunctorType >
874 Future<
typename FunctorType::value_type
875 ,
typename DepFutureType::execution_space >
876 host_spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
877 , FunctorType && arg_functor
880 using exec_space =
typename DepFutureType::execution_space ;
881 using scheduler = TaskScheduler< exec_space > ;
883 typedef Impl::TaskBase< exec_space
884 ,
typename FunctorType::value_type
888 static_assert( TaskEnum == task_type::TaskTeam ||
889 TaskEnum == task_type::TaskSingle
890 ,
"Kokkos host_spawn requires TaskTeam or TaskSingle" );
894 typename task_type::function_type
const ptr =
895 Kokkos::Impl::TaskQueueSpecialization< exec_space >::
896 template get_function_pointer< task_type >();
898 return scheduler::spawn( arg_policy , ptr , std::move(arg_functor) );
907 template<
int TaskEnum
908 ,
typename DepFutureType
909 ,
typename FunctorType >
910 Future<
typename FunctorType::value_type
911 ,
typename DepFutureType::execution_space >
912 KOKKOS_INLINE_FUNCTION
913 task_spawn( Impl::TaskPolicyData<TaskEnum,DepFutureType>
const & arg_policy
914 , FunctorType && arg_functor
917 using exec_space =
typename DepFutureType::execution_space ;
918 using scheduler = TaskScheduler< exec_space > ;
920 typedef Impl::TaskBase< exec_space
921 ,
typename FunctorType::value_type
925 #if defined( KOKKOS_ACTIVE_EXECUTION_MEMORY_SPACE_HOST ) && \
926 defined( KOKKOS_ENABLE_CUDA )
928 static_assert( ! std::is_same< Kokkos::Cuda , exec_space >::value
929 ,
"Error calling Kokkos::task_spawn for Cuda space within Host code" );
933 static_assert( TaskEnum == task_type::TaskTeam ||
934 TaskEnum == task_type::TaskSingle
935 ,
"Kokkos host_spawn requires TaskTeam or TaskSingle" );
937 typename task_type::function_type
const ptr = task_type::apply ;
939 return scheduler::spawn( arg_policy , ptr , std::move(arg_functor) );
947 template<
typename FunctorType ,
typename T >
949 KOKKOS_INLINE_FUNCTION
952 , TaskPriority
const & arg_priority = TaskPriority::Regular
955 static_assert( Kokkos::is_future<T>::value ||
956 Kokkos::is_scheduler<T>::value
957 ,
"Kokkos respawn argument must be Future or TaskScheduler" );
959 TaskScheduler< typename T::execution_space >::
960 respawn( arg_self , arg , arg_priority );
965 template<
typename A1 ,
typename A2 >
966 KOKKOS_INLINE_FUNCTION
967 Future< typename Future< A1 , A2 >::execution_space >
968 when_all( Future< A1 , A2 >
const arg[]
972 return TaskScheduler< typename Future<A1,A2>::execution_space >::
973 when_all( arg , narg );
979 template<
typename ExecSpace >
981 void wait( TaskScheduler< ExecSpace >
const & scheduler )
982 { scheduler.m_queue->execute(); }
bool is_null(const boost::shared_ptr< T > &p)
Future< typename FunctorType::value_type, typename DepFutureType::execution_space > KOKKOS_INLINE_FUNCTION task_spawn(Impl::TaskPolicyData< TaskEnum, DepFutureType > const &arg_policy, FunctorType &&arg_functor)
A task spawns a task with options.
void KOKKOS_INLINE_FUNCTION respawn(FunctorType *arg_self, T const &arg, TaskPriority const &arg_priority=TaskPriority::Regular)
A task respawns itself with options.
Future< typename FunctorType::value_type, typename DepFutureType::execution_space > host_spawn(Impl::TaskPolicyData< TaskEnum, DepFutureType > const &arg_policy, FunctorType &&arg_functor)
A host control thread spawns a task with options.