Parameter Space Enumeration

Parameter Space

Often I run into this problem: I need to compute a function for its entire parameter space. For example; I want to make a surface plot and have two set of input parameters. The parameter space $S$ now is a set of tuples $(x,y)$, and the function I want to compute the surface plot for is a function that maps these tuples to the image $z$.

Then, we can construct a higher order function that does this for lists of $x$ and $y$, making all possible tuples by taking the Cartesian product $X\times Y$, applies the function $f :: x \to y \to z$, and stores the resulting $z$ in a datastructure

$$ \forall X, Y.~~~ compute :: (x \to y \to z) \to X \to Y \to [z] $$

Polyvariadic Functions

Other times I want to compute the extrema of the function and the parameter space $S$ can have an arbitrary number of parameters. For a function with $N$ varying input parameters, the parameter space $S$ forms an $N-$dimensional hypercube. The computations to be done correspond to the $N-$tuples formed by taking the Cartesian product of all subsets $S_1 \times S_2 \times \ldots \times S_n$. Let's call the result $r$:

$$ \forall S_1, S_2 \ldots, S_N. ~~~ compute :: (s_1 \to s_2 \to \ldots \to r) \to S_1 \to S_2 \to \ldots \to [r] $$

Since these computations are all unrelated these kinds of jobs can easily be parallelised so they can be computed in a cluster or supercomputer.

In C++, the type of collection (data structure) used to represent $S$ matters and as such we need to take great care to quantify the possible allocator and elements contained within:

In [ ]:
// The total size of the parameter space = S_1 x S_2 x ... S_n

size_t combinations()
{
  return 1;
}

template< typename element_t_
        , typename allocator_t_
        , template<typename, typename> typename collection_t_
        , typename ... collection_ts_>
size_t combinations(const collection_t_<element_t_, allocator_t_> &collection, collection_ts_ ... remainder)
{
  return collection.size() * combinations(remainder...);
}

Invariants

Typically along with the varying parameters in $S$ we also have some arguments that remain fixed, I refer to these as the invariants $i$.

$$ \forall S_1, S_2 \ldots, S_N. ~~~ compute :: (i \to s_1 \to s_2 \to \ldots \to r) \to i \to S_1 \to S_2 \to \ldots \to [r] $$

Result

We want to pass on the set of result to another function. We also pass the postprocessing function so that at a later stage we can decide whether we want to do the postprocessing centrally or distributed:

$$ postprocess :: [r] \to * $$$$ \forall S_1, S_2 \ldots, S_N. ~~~ compute :: (i \to s_1 \to s_2 \to \ldots \to r) \to ([r] \to *) \to i \to S_1 \to S_2 \to \ldots \to * $$

Architecture

The following choices are made based on the setup in my home laboratory. For simplicity we assume the following about our cluster setup:

  • We have one central machine that has fast connections to all other machines
  • The central machine has means to store the results

From now on I refer to the parameter space $N-$tuples as the set of tasks. Each task has an index:

$$ i \in \mathbb{N}. ~~~ indexate :: i \to S \to task $$
In [ ]:
template<typename element_t_, typename allocator_t_, template<typename, typename> typename collection_t_>
tuple<element_t_> indexate(size_t index, const collection_t_<element_t_, allocator_t_> &collection)
{
    auto iterator_ = collection.begin();
    std::advance(iterator_, index% collection.size());
    return tuple<T>(*iterator_);
}

template<typename element_t_, typename allocator_t_, template<typename, typename> typename collection_t_, typename ... Rest>
auto indexate(size_t index, const collection_t_<element_t_, allocator_t_> &collection, Rest ... rest)
{
    auto iterator_ = collection.begin();
    std::advance(iterator_, (index / combinations(rest ...)) % collection.size());
    return tuple_cat(tuple<element_t_>(*iterator_), indexate(index, rest...));
}

Task

I make a simple struct to store both the input arguments and the result so it can be transfered and stored easily. I use std::index_sequence_for to unpack the tuple to function arguments.

In [ ]:
#pragma pack(push, 1) 
template <typename result_t_, typename ... arguments_ts_>
struct task
{
  function<result_t_(const invariants &, size_t, arguments_ts_...)> computation;
  size_t task_index;
  invariants invariants_;
  tuple<arguments_ts_...> parameters;
  result_t_ result;

  task(function<result_t_(const invariants &, size_t, arguments_ts_...)> computation
  , const tuple<arguments_ts_...> &parameters
  , const invariants &i
  , size_t seed)
    : computation(computation)
    , parameters(parameters)
    , invariants_(i)
    , task_index(task_index)
  {
    
  }

  template<size_t ... integers_>
  result_t_ call_unpack(index_sequence<integers_...>)
  {
    return computation(invariants_, task_index, get<integers_>(parameters)...);
  }

  void run()
  {
    result = call_unpack(index_sequence_for<arguments_ts_...>{});
  }
};
#pragma pack(pop) 

template <typename result_t_, typename ...arguments_ts_, typename ...parameter_space_>
task<result_t_, arguments_ts_...> make_task
  ( size_t task_index
  , const invariants &i
  , result_t_(*f)(const invariants &, size_t, arguments_ts_...)
  , parameter_space_ ... ps_ts_)
{
  
  return task<result_t_, arguments_ts_...>(f, detail::indexate(task_index, ps_ts_ ...), i, task_index);
}

We let the central machine assign the $N-$tuples that are input to the function. We could keep track of which jobs have been completed and which have failed but for now assume the job always succeeds in finite time.

For debugging, portability and cases where we want to run the computation on a signle machine we add a backup case where the jobs are parallelised by means of threads.

Compute

In [ ]:
template<typename result_t_, typename ... variant_ts_, typename ... collection_ts_>
void compute( result_t_(*f)(const invariants &, size_t, variant_ts_ ...) 
            , size_t samples
            , const invariants &i
            , collection_ts_ ... cs)
{
  const size_t combinations_ = combinations(cs ...);
  const size_t tasks_ = samples * combinations_;

  mpi::communicator world;
  const size_t scheduler_ = 0;
  if(world.rank() == scheduler_){
    std::cout << tasks_ << " tasks total." << std::endl;
    std::atomic<size_t> progress_ = 0;
    condition_variable update_;
    vector<thread> pool_;
    mutex mutex_;
    vector<result> results_;
    size_t update_every = std::max<size_t>(1, tasks_ / 1000);
    if(world.size() > 1){
      for(size_t t = 1; t < world.size(); ++t){
        pool_.push_back(thread([&update_every, &progress_, &world, &mutex_, &results_, &tasks_](int worker)
        {
          for(size_t p = 0; p < tasks_ + world.size(); ++p){
            mpi::status s = world.recv(worker, 0);
            size_t task_index_ = progress_.fetch_add(1, std::memory_order_relaxed);
            if(!(task_index_ % update_every)){
              std::cout << task_index_ << '/' << tasks_ << std::endl;
            }
            world.send(worker, 0, task_index_);
            if(task_index_ >= tasks_){
              break;
            }
            vector<result> r;
            world.recv(worker, 0, r);
            {
              std::lock_guard<mutex> l(mutex_);
              results_.insert(std::end(results_), std::begin(r), std::end(r));
            }
          }
        }, static_cast<int>(t)));
      }
    }else{
      size_t concurrency_ = std::max<size_t>(1, thread::hardware_concurrency());
      size_t threads_ = std::min<size_t>(tasks_, concurrency_);
      for(size_t t = 0; t < threads_; ++t){
        pool_.push_back(thread([&update_every, tasks_, &progress_, &mutex_, &results_, &i, &f, &cs...]()
        {
          std::chrono::system_clock::time_point start = std::chrono::system_clock::now();
          std::time_t start_time = std::chrono::system_clock::to_time_t(start);
          for(size_t u = 0; u < tasks_; ++u){
            size_t task_index_ = progress_.fetch_add(1, std::memory_order_relaxed);
            if(task_index_ >= tasks_){
              break;
            }
            if(!(task_index_ % update_every)){
              std::lock_guard<mutex> l(mutex_);
              if(0 == task_index_){
                std::cout << task_index_ << '/' << tasks_ << ' ' << std::ctime(&start_time) << std::endl;

              } else{
                auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - start);

                size_t remaining_tasks_ = tasks_ - task_index_;
                long double ratio = (double)(remaining_tasks_) / task_index_;
                auto mult_ = elapsed * ratio;
                auto forecast_ = std::chrono::system_clock::now() + std::chrono::duration_cast<std::chrono::seconds>(mult_);
                std::time_t forecast_c = std::chrono::system_clock::to_time_t(forecast_);
                auto str_ = std::ctime(&forecast_c);
                if(nullptr != str_){
                  std::cout << task_index_ << '/' << tasks_ << " -> " << str_;
                }
              }
            }
            auto t = make_task(task_index_, i, f, cs ...);
            t.run();
            {
              std::lock_guard<mutex> l(mutex_);
              results_.insert(std::end(results_), std::begin(t.result), std::end(t.result));
            }
          }
        }));
      }
    }

    for(auto &t : pool_){
      t.join();
    }

    postprocess(tasks_, results_, cs ...);

  } else{
    for(size_t completed_ = 0; completed_ <= tasks_; ++completed_){
      world.send(scheduler_, 0);
      size_t task_index_;
      mpi::status s = world.recv(scheduler_, 0, task_index_);
      if(task_index_ >= tasks_){
        break;
      }
      auto t = make_task(task_index_, i, f, cs ...);
      t.run();
      world.send(scheduler_, 0, t.result);
    }
  }
}

Dependencies

  • C++14
  • Boost.MPI
  • An MPI implementation such as OpenMPI
  • A supercomputer :-)