MPI main-child application

We illustrate the concept of a main-child application using our research code respy. As a use case, we are interested in capturing the uncertainties in the model’s predictions about average final schooling. For that purpose we start a main process that distributes sampled parameter values from the imposed distribution of the discount factor and the return to schooling.

We can start the script using the terminal.

mpiexec -n 1 -usize 5 python main.py

This starts the main process and allows to create up to five additional child processes.

import shutil
import glob
import sys
import os

if "PMI_SIZE" not in os.environ.keys():
    raise AssertionError("requires MPI access")
from mpi4py import MPI

import chaospy as cp
import numpy as np

from auxiliary import aggregate_results
from auxiliary import TAGS


if __name__ == "__main__":

    # We specify the number of draws and number of children.
    num_samples, num_children = 5, 2

    # We draw a sample from the joint distribution of the parameters that is subsequently
    # distributed to the child processes.
    distribution = cp.J(cp.Uniform(0.92, 0.98), cp.Uniform(0.03, 0.08))
    samples = distribution.sample(num_samples, rule="random").T

    info = MPI.Info.Create()
    info.update({"wdir": os.getcwd()})

    # We start all child processes and make sure they can work in their own respective directory.
    [shutil.rmtree(dirname) for dirname in glob.glob("subdir_child_*")]
    file_ = os.path.dirname(os.path.realpath(__file__)) + "/child.py"
    comm = MPI.COMM_SELF.Spawn(
        sys.executable, args=[file_], maxprocs=num_children, info=info
    )

    # We send all problem-specific information once and for all.
    prob_info = dict()
    prob_info["num_params"] = samples.shape[1]
    comm.bcast(prob_info, root=MPI.ROOT)

    status = MPI.Status()
    for sample in samples:

        comm.recv(status=status)
        rank_sender = status.Get_source()

        comm.send(None, tag=TAGS.RUN, dest=rank_sender)

        sample = np.array(sample, dtype="float64")
        comm.Send([sample, MPI.DOUBLE], dest=rank_sender)

    # We are done and now terminate all child processes properly and finally the turn off the
    # communicator. We need for all to acknowledge the receipt to make sure we do not continue here
    # before all tasks are not only started but actually finished.
    [comm.send(None, tag=TAGS.EXIT, dest=rank) for rank in range(num_children)]
    [comm.recv() for rank in range(num_children)]
    comm.Disconnect()

    rslt = aggregate_results()

The behavior of the child processes is governed in the following script.

#!/usr/bin/env python
"""This script provides all capabilities for the child processes."""

import os

# In this script we only have explicit use of MPI as our level of parallelism. This needs to be
# done right at the beginning of the script.
update = {
    "NUMBA_NUM_THREADS": "1",
    "OMP_NUM_THREADS": "1",
    "OPENBLAS_NUM_THREADS": "1",
    "NUMEXPR_NUM_THREADS": "1",
    "MKL_NUM_THREADS": "1",
}
os.environ.update(update)

from mpi4py import MPI
import pandas as pd
import numpy as np
import respy as rp

from auxiliary import TAGS


if __name__ == "__main__":
    comm = MPI.Comm.Get_parent()
    num_slaves, rank = comm.Get_size(), comm.Get_rank()
    status = MPI.Status()

    # We need some additional task-specific information.
    prob_info = comm.bcast(None)

    subdir = f"subdir_child_{rank}"
    os.mkdir(subdir)
    os.chdir(subdir)

    # We now set up the simulation function of  `respy` and receive some task-specific information.
    params, options, df = rp.get_example_model("kw_94_one")
    simulate = rp.get_simulate_func(params, options)

    rslt = list()
    while True:

        # Signal readiness
        comm.send(None, dest=0)

        # Receive instructions and act accordingly.
        comm.recv(status=status)
        tag = status.Get_tag()

        if tag == TAGS.EXIT:
            # We set up a container to store the results.
            df = pd.DataFrame(rslt, columns=["qoi", "delta", "exp_edu"])
            df.index.name = "sample"
            df.to_pickle(f"rslt_child_{rank}.pkl")

            # Now we are ready to acknowledge completion and disconnect.
            comm.send(None, dest=0)
            comm.Disconnect()
            break

        elif tag == TAGS.RUN:
            # We are called to sample the quantity of interest and need to update the parameters
            # accordingly.
            sample = np.empty(prob_info["num_params"], dtype="float64")
            comm.Recv([sample, MPI.DOUBLE])

            (
                params.loc["delta", "value"],
                params.loc[("wage_a", "exp_edu"), "value"],
            ) = sample

            stat = simulate(params).groupby("Identifier")["Experience_Edu"].max().mean()
            rslt.append([stat, *sample])

        else:
            raise AssertionError