Interface Documentation
Version: invalid
policy.hh
Go to the documentation of this file.
1 /*
2  @@@@@@@@ @@ @@@@@@ @@@@@@@@ @@
3  /@@///// /@@ @@////@@ @@////// /@@
4  /@@ /@@ @@@@@ @@ // /@@ /@@
5  /@@@@@@@ /@@ @@///@@/@@ /@@@@@@@@@/@@
6  /@@//// /@@/@@@@@@@/@@ ////////@@/@@
7  /@@ /@@/@@//// //@@ @@ /@@/@@
8  /@@ @@@//@@@@@@ //@@@@@@ @@@@@@@@ /@@
9  // /// ////// ////// //////// //
10 
11  Copyright (c) 2016, Triad National Security, LLC
12  All rights reserved.
13  */
14 #pragma once
15 
18 #include <flecsi-config.h>
19 
20 #if !defined(__FLECSI_PRIVATE__)
21 #error Do not include this file directly!
22 #endif
23 
24 #include "flecsi/exec/launch.hh"
29 #include "flecsi/run/backend.hh"
30 #include "flecsi/util/demangle.hh"
32 #include <flecsi/flog.hh>
33 
34 #include <functional>
35 #include <memory>
36 #include <type_traits>
37 
38 #if !defined(FLECSI_ENABLE_LEGION)
39 #error FLECSI_ENABLE_LEGION not defined! This file depends on Legion!
40 #endif
41 
42 #include <legion.h>
43 
44 namespace flecsi {
45 
46 inline log::devel_tag execution_tag("execution");
47 
48 namespace exec {
49 namespace detail {
50 
51 // Remove const from under a reference, if there is one.
52 template<class T>
53 struct nonconst_ref {
54  using type = T;
55 };
56 
57 template<class T>
58 struct nonconst_ref<const T &> {
59  using type = T &;
60 };
61 
62 template<class T>
63 using nonconst_ref_t = typename nonconst_ref<T>::type;
64 
65 // Serialize a tuple of converted arguments (or references to existing
66 // arguments where possible). Note that is_constructible_v<const
67 // float&,const double&> is true, so we have to check
68 // is_constructible_v<float&,double&> instead.
69 template<class... PP, class... AA>
70 auto
71 serial_arguments(std::tuple<PP...> * /* to deduce PP */, AA &&... aa) {
72  static_assert((std::is_const_v<std::remove_reference_t<const PP>> && ...),
73  "Tasks cannot accept non-const references");
74  return util::serial_put<std::tuple<std::conditional_t<
75  std::is_constructible_v<nonconst_ref_t<PP> &, nonconst_ref_t<AA>>,
76  const PP &,
77  std::decay_t<PP>>...>>(
78  {exec::replace_argument<PP>(std::forward<AA>(aa))...});
79 }
80 
81 } // namespace detail
82 } // namespace exec
83 
84 template<auto & F,
85  const exec::launch_domain & LAUNCH_DOMAIN,
86  class REDUCTION,
87  size_t ATTRIBUTES,
88  typename... ARGS>
89 decltype(auto)
90 reduce(ARGS &&... args) {
91  using namespace Legion;
92  using namespace exec;
93 
94  using traits_t = util::function_traits<decltype(F)>;
95  using RETURN = typename traits_t::return_type;
96  using param_tuple = typename traits_t::arguments_type;
97 
98  // This will guard the entire method
99  log::devel_guard guard(execution_tag);
100 
101  // Get the FleCSI runtime context
102  auto & flecsi_context = run::context::instance();
103 
104  // Get the processor type.
105  constexpr auto processor_type = mask_to_processor_type(ATTRIBUTES);
106 
107  // Get the Legion runtime and context from the current task.
108  auto legion_runtime = Legion::Runtime::get_runtime();
109  auto legion_context = Legion::Runtime::get_context();
110 
111 #if defined(FLECSI_ENABLE_FLOG)
112  const size_t tasks_executed = flecsi_context.tasks_executed();
113  if((tasks_executed > 0) &&
114  (tasks_executed % FLOG_SERIALIZATION_INTERVAL == 0)) {
115 
116  size_t processes = flecsi_context.processes();
117  LegionRuntime::Arrays::Rect<1> launch_bounds(
118  LegionRuntime::Arrays::Point<1>(0),
119  LegionRuntime::Arrays::Point<1>(processes - 1));
120  Domain launch_domain = Domain::from_rect<1>(launch_bounds);
121 
122  constexpr auto red = [] {
123  return log::flog_t::instance().packets().size();
124  };
125  Legion::ArgumentMap arg_map;
126  Legion::IndexLauncher reduction_launcher(leg::task_id<leg::verb<*red>>,
127  launch_domain,
128  Legion::TaskArgument(NULL, 0),
129  arg_map);
130 
131  Legion::Future future = legion_runtime->execute_index_space(
132  legion_context, reduction_launcher, reduction_op<fold::max<std::size_t>>);
133 
134  if(future.get_result<size_t>() > FLOG_SERIALIZATION_THRESHOLD) {
135  constexpr auto send = [] {
136  run::context::instance().set_mpi_task(log::send_to_one);
137  };
138  Legion::IndexLauncher flog_mpi_launcher(leg::task_id<leg::verb<*send>>,
139  launch_domain,
140  Legion::TaskArgument(NULL, 0),
141  arg_map);
142 
143  flog_mpi_launcher.tag = run::FLECSI_MAPPER_FORCE_RANK_MATCH;
144 
145  // Launch the MPI task
146  auto future_mpi =
147  legion_runtime->execute_index_space(legion_context, flog_mpi_launcher);
148 
149  // Force synchronization
150  future_mpi.wait_all_results(true);
151 
152  // Handoff to the MPI runtime.
153  flecsi_context.handoff_to_mpi(legion_context, legion_runtime);
154 
155  // Wait for MPI to finish execution (synchronous).
156  flecsi_context.wait_on_mpi(legion_context, legion_runtime);
157  } // if
158  } // if
159 #endif // FLECSI_ENABLE_FLOG
160 
161  size_t domain_size = LAUNCH_DOMAIN.size();
162  domain_size = domain_size == 0 ? flecsi_context.processes() : domain_size;
163 
164  ++flecsi_context.tasks_executed();
165 
166  leg::task_prologue_t pro(domain_size);
167  pro.walk<param_tuple>(args...);
168 
169  std::optional<param_tuple> mpi_args;
170  std::vector<std::byte> buf;
171  if constexpr(processor_type == task_processor_type_t::mpi) {
172  // MPI tasks must be invoked collectively from one task on each rank.
173  // We therefore can transmit merely a pointer to a tuple of the arguments.
174  // util::serial_put deliberately doesn't support this, so just memcpy it.
175  mpi_args.emplace(std::forward<ARGS>(args)...);
176  const auto p = &*mpi_args;
177  buf.resize(sizeof p);
178  std::memcpy(buf.data(), &p, sizeof p);
179  }
180  else {
181  buf = detail::serial_arguments(
182  static_cast<param_tuple *>(nullptr), std::forward<ARGS>(args)...);
183  }
184 
185  //------------------------------------------------------------------------//
186  // Single launch
187  //------------------------------------------------------------------------//
188 
189  using wrap = leg::task_wrapper<F, processor_type>;
190  const auto task = leg::task_id<wrap::execute,
191  (ATTRIBUTES & ~mpi) | 1 << static_cast<std::size_t>(wrap::LegionProcessor)>;
192 
193  if constexpr(LAUNCH_DOMAIN == single) {
194 
195  static_assert(std::is_void_v<REDUCTION>,
196  "reductions are not supported for single tasks");
197 
198  {
199  log::devel_guard guard(execution_tag);
200  flog_devel(info) << "Executing single task" << std::endl;
201  }
202 
203  TaskLauncher launcher(task, TaskArgument(buf.data(), buf.size()));
204 
205  // adding region requirements to the launcher
206  for(auto & req : pro.region_requirements()) {
207  launcher.add_region_requirement(req);
208  } // for
209 
210  // adding futures to the launcher
211  launcher.futures = std::move(pro).futures();
212 
213  static_assert(!(is_index_future<std::decay_t<ARGS>> || ...),
214  "can't use index future with single task");
215 
216  if constexpr(processor_type == task_processor_type_t::toc ||
217  processor_type == task_processor_type_t::loc) {
218  return future<RETURN>{
219  legion_runtime->execute_task(legion_context, launcher)};
220  }
221  else {
222  static_assert(
223  processor_type == task_processor_type_t::mpi, "Unknown launch type");
224  flog_fatal("Invalid launch type!"
225  << std::endl
226  << "Legion backend does not support 'single' launch"
227  << " for MPI tasks yet");
228  }
229  }
230 
231  //------------------------------------------------------------------------//
232  // Index launch
233  //------------------------------------------------------------------------//
234 
235  else {
236 
237  {
238  log::devel_guard guard(execution_tag);
239  flog_devel(info) << "Executing index task" << std::endl;
240  }
241 
242  LegionRuntime::Arrays::Rect<1> launch_bounds(
243  LegionRuntime::Arrays::Point<1>(0),
244  LegionRuntime::Arrays::Point<1>(domain_size - 1));
245  Domain launch_domain = Domain::from_rect<1>(launch_bounds);
246 
247  Legion::ArgumentMap arg_map;
248  Legion::IndexLauncher launcher(
249  task, launch_domain, TaskArgument(buf.data(), buf.size()), arg_map);
250 
251  // adding region requirement to the launcher
252  for(auto & req : pro.region_requirements()) {
253  launcher.add_region_requirement(req);
254  } // for
255 
256  // adding futures to the launcher
257  launcher.futures = std::move(pro).futures();
258  launcher.point_futures.assign(
259  pro.future_maps().begin(), pro.future_maps().end());
260 
261  if constexpr(processor_type == task_processor_type_t::toc ||
262  processor_type == task_processor_type_t::loc) {
263  flog_devel(info) << "Executing index launch on loc" << std::endl;
264 
265  if constexpr(!std::is_void_v<REDUCTION>) {
266  flog_devel(info) << "executing reduction logic for "
267  << util::type<REDUCTION>() << std::endl;
268 
269  return future<RETURN>{legion_runtime->execute_index_space(
270  legion_context, launcher, reduction_op<REDUCTION>)};
271  }
272  else {
273  // Enqueue the task.
274  Legion::FutureMap future_map =
275  legion_runtime->execute_index_space(legion_context, launcher);
276 
277  return future<RETURN, launch_type_t::index>{future_map};
278  } // else
279  }
280  else {
281  static_assert(
282  processor_type == task_processor_type_t::mpi, "Unknown launch type");
283  launcher.tag = run::FLECSI_MAPPER_FORCE_RANK_MATCH;
284 
285  // Launch the MPI task
286  const auto ret = future<RETURN, launch_type_t::index>{
287  legion_runtime->execute_index_space(legion_context, launcher)};
288  // Force synchronization
289  ret.wait(true);
290 
291  // Handoff to the MPI runtime.
292  flecsi_context.handoff_to_mpi(legion_context, legion_runtime);
293 
294  // Wait for MPI to finish execution (synchronous).
295  // We must keep mpi_args alive until then.
296  flecsi_context.wait_on_mpi(legion_context, legion_runtime);
297 
298  if constexpr(!std::is_void_v<REDUCTION>) {
299  // FIXME implement logic for reduction MPI task
300  flog_fatal("there is no implementation for the mpi"
301  " reduction task");
302  }
303  return ret;
304  }
305  } // if constexpr
306 
307  // return 0;
308 } // execute_task
309 
310 } // namespace flecsi
decltype(auto) execute(ARGS &&... args)
Definition: execution.hh:382
decltype(auto) reduce(ARGS &&... args)
Definition: policy.hh:90
Definition: policy.hh:53
Definition: flog.hh:82
Definition: function_traits.hh:29
#define flog_fatal(message)
Definition: flog.hh:358
static flog_t & instance()
Definition: state.hh:73
size_t processes()
Definition: execution.hh:294
Definition: launch.hh:95
A launch domain with a static identity but a runtime size.
Definition: launch.hh:62
Definition: control.hh:31