MPI main-child application¶
We illustrate the concept of a main-child application using our research code respy
. As a use case, we are interested in capturing the uncertainties in the model’s predictions about average final schooling. For that purpose we start a main process that distributes sampled parameter values from the imposed distribution of the discount factor and the return to schooling.
We can start the script using the terminal.
mpiexec -n 1 -usize 5 python main.py
This starts the main process and allows to create up to five additional child processes.
import shutil
import glob
import sys
import os
if "PMI_SIZE" not in os.environ.keys():
raise AssertionError("requires MPI access")
from mpi4py import MPI
import chaospy as cp
import numpy as np
from auxiliary import aggregate_results
from auxiliary import TAGS
if __name__ == "__main__":
# We specify the number of draws and number of children.
num_samples, num_children = 5, 2
# We draw a sample from the joint distribution of the parameters that is subsequently
# distributed to the child processes.
distribution = cp.J(cp.Uniform(0.92, 0.98), cp.Uniform(0.03, 0.08))
samples = distribution.sample(num_samples, rule="random").T
info = MPI.Info.Create()
info.update({"wdir": os.getcwd()})
# We start all child processes and make sure they can work in their own respective directory.
[shutil.rmtree(dirname) for dirname in glob.glob("subdir_child_*")]
file_ = os.path.dirname(os.path.realpath(__file__)) + "/child.py"
comm = MPI.COMM_SELF.Spawn(
sys.executable, args=[file_], maxprocs=num_children, info=info
)
# We send all problem-specific information once and for all.
prob_info = dict()
prob_info["num_params"] = samples.shape[1]
comm.bcast(prob_info, root=MPI.ROOT)
status = MPI.Status()
for sample in samples:
comm.recv(status=status)
rank_sender = status.Get_source()
comm.send(None, tag=TAGS.RUN, dest=rank_sender)
sample = np.array(sample, dtype="float64")
comm.Send([sample, MPI.DOUBLE], dest=rank_sender)
# We are done and now terminate all child processes properly and finally the turn off the
# communicator. We need for all to acknowledge the receipt to make sure we do not continue here
# before all tasks are not only started but actually finished.
[comm.send(None, tag=TAGS.EXIT, dest=rank) for rank in range(num_children)]
[comm.recv() for rank in range(num_children)]
comm.Disconnect()
rslt = aggregate_results()
The behavior of the child processes is governed in the following script.
#!/usr/bin/env python
"""This script provides all capabilities for the child processes."""
import os
# In this script we only have explicit use of MPI as our level of parallelism. This needs to be
# done right at the beginning of the script.
update = {
"NUMBA_NUM_THREADS": "1",
"OMP_NUM_THREADS": "1",
"OPENBLAS_NUM_THREADS": "1",
"NUMEXPR_NUM_THREADS": "1",
"MKL_NUM_THREADS": "1",
}
os.environ.update(update)
from mpi4py import MPI
import pandas as pd
import numpy as np
import respy as rp
from auxiliary import TAGS
if __name__ == "__main__":
comm = MPI.Comm.Get_parent()
num_slaves, rank = comm.Get_size(), comm.Get_rank()
status = MPI.Status()
# We need some additional task-specific information.
prob_info = comm.bcast(None)
subdir = f"subdir_child_{rank}"
os.mkdir(subdir)
os.chdir(subdir)
# We now set up the simulation function of `respy` and receive some task-specific information.
params, options, df = rp.get_example_model("kw_94_one")
simulate = rp.get_simulate_func(params, options)
rslt = list()
while True:
# Signal readiness
comm.send(None, dest=0)
# Receive instructions and act accordingly.
comm.recv(status=status)
tag = status.Get_tag()
if tag == TAGS.EXIT:
# We set up a container to store the results.
df = pd.DataFrame(rslt, columns=["qoi", "delta", "exp_edu"])
df.index.name = "sample"
df.to_pickle(f"rslt_child_{rank}.pkl")
# Now we are ready to acknowledge completion and disconnect.
comm.send(None, dest=0)
comm.Disconnect()
break
elif tag == TAGS.RUN:
# We are called to sample the quantity of interest and need to update the parameters
# accordingly.
sample = np.empty(prob_info["num_params"], dtype="float64")
comm.Recv([sample, MPI.DOUBLE])
(
params.loc["delta", "value"],
params.loc[("wage_a", "exp_edu"), "value"],
) = sample
stat = simulate(params).groupby("Identifier")["Experience_Edu"].max().mean()
rslt.append([stat, *sample])
else:
raise AssertionError