from .jobfolder import JobFolderProcess
[docs]class PoolProcess(JobFolderProcess):
""" Executes folder in child processes.
Much as its base class,
:py:class:`~pylada.process.jobfolder.JobFolderProcess`, this process
specialization is intended to run jobs in a jobfolder in parallel [*]_.
However, it allows to customize the number of processors dedicated to
each job, rather than use the same number of processors for each job.
The customization is done *via* the function :py:attr:`processalloc`. It
takes one argument, the executable jobfolder, and returns an integer
signifying the requested number of processors.
.. code-block:: python
def processalloc(folder):
return (len(folder.structure) // 2) * 2
process = PoolProcess(jobfolder, outdir='here', processalloc=processalloc)
process.start(comm)
try: process.wait()
except Fail: pass
The interface is much the same as any other process. However, it takes as
argument this :py:attr:`processalloc` function, on top of the jobfolder
itself. In this case, each folder will be launched with approximately as
many processors as there are atoms in the structure [*]_.
Once it is launched, the :py:class:`PoolProcess` instance will attempt to
run as many jobs as possible in parallel, until there it runs out of
processors to allocate. Howe many processors, and which machines, is
determined by the communicator passed to :py:meth:`start`. Each time an
executable folder is finished [*]_, it tries again to pack jobs into the
available processor pool.
.. note::
Upon failure, :py:exc:`~pylada.process.Fail` is raised only
once all the folders have been executed, not when the failure is
detected.
.. [*] Several job-folders are executed simultaneously, not
withstanding the possibility that each of these is also executed in
parallel *via* MPI.
.. [*] Apparently, this is a pretty good rule-of-thumb for VASP
calculations.
.. [*] More, specifically, each time
:py:meth:`~pylada.process.jobfolder.JobFolderProcess.poll` is called.
"""
[docs] def __init__( self, jobfolder, outdir, processalloc, maxtrials=1,
keepalive=False, **kwargs ):
""" Initializes a process.
:param jobfolder:
Jobfolder for which executable folders should be launched.
The name of the folders to launch are determined which
:py:meth:`__init__` is acalled. If ``jobfolder`` changes, then one
should call :py:meth:`update`.
:type jobfolder: :py:class:`~pylada.jobfolder.jobfolder.JobFolder`
:param str outdir:
Path where the python child process should be executed.
:param processalloc:
Function which determines how many processors each job requires.
This is determined for each job when this instance is created. To
change :py:attr:`~pylada.process.jobfolder.JobFolderProcess.jobfolder`,
one should call :py:meth:`update`.
:type processalloc:
(:py:class:`~pylada.jobfolder.jobfolder.JobFolder`)->int
:param bool keepalive:
Whether to relinquish communicator once jobs are completed. If
True, the communicator is not relinquished. The jobfolder can be
:py:meth:`updated <update>` and new jobs started. To finally
relinquish the communicator,
:py:attr:`~pylada.process.jobfolder.JobFolderProcess.keepalive`
should be set to False. Both
:py:meth:`~pylada.process.jobfolder.JobFolderProcess.kill` and
:py:meth:`~pylada.process.jobfolder.JobFolderProcess.terminate` ignore
this attribute and relinquish the communicator. However, since both
side effects, this may not be the best way to do so.
:param int maxtrials:
Maximum number of times to try re-launching each process upon
failure.
:param kwargs:
Keyword arguments to the functionals in the executable folders. These
arguments will be applied indiscriminately to all folders.
"""
super(PoolProcess, self).__init__( jobfolder, outdir, maxtrials,
keepalive=keepalive, **kwargs )
del self.nbpools # not needed here.
self.processalloc = processalloc
""" Determines number of processors to allocate to each job.
This is a function which takes a
:py:class:`~pylada.jobfolder.jobfolder.JobFolder` instance and returns an
integer.
"""
self._alloc = {}
""" Maps job vs rquested process allocation. """
for name in self._torun:
self._alloc[name] = self.processalloc(self.jobfolder[name])
assert len(set(self._alloc.keys())) == len(self._alloc)
def _next(self):
""" Adds more processes.
This is the subroutine to overload in a derived class which would
implement some sort of scheduling.
"""
from os.path import join
from ..error import IndexError
from .call import CallProcess
from .iterator import IteratorProcess
# nothing else to do.
if len(self._torun) == 0: return
# no more machines to allocate...
if self._comm['n'] == 0: return
# cannot add more processes.
if isinstance(self.processalloc, int):
return super(PoolProcess, self)._next()
# split processes into local comms. Make sure we don't oversuscribe.
jobs = self._getjobs()
assert sum(self._alloc[u] for u in jobs) <= self._comm['n']
try:
# Loop until all requisite number of processes is created,
# or until run out of jobs, or until run out of comms.
for name in jobs:
self._torun = self._torun - set([name])
nprocs = self._alloc.pop(name)
# checks folder is still valid.
if name not in self.jobfolder:
raise IndexError("Job-folder {0} no longuer exists.".format(name))
jobfolder = self.jobfolder[name]
if not jobfolder.is_executable:
raise IndexError("Job-folder {0} is no longuer executable.".format(name))
# creates parameter dictionary.
params = jobfolder.params.copy()
params.update(self.params)
params['maxtrials'] = self.maxtrials
# chooses between an iterator process and a call process.
if hasattr(jobfolder.functional, 'iter'):
process = IteratorProcess(jobfolder.functional, join(self.outdir, name), **params)
else:
process = CallProcess(self.functional, join(self.outdir, name), **params)
# appends process and starts it.
self.process.append((name, process))
try: process.start(self._comm.lend(nprocs))
except Exception as e:
self.errors[name] = e
name, process = self.process.pop(-1)
process._cleanup()
except:
self.terminate()
raise
def _getjobs(self):
""" List of jobs to run. """
from operator import itemgetter
N = self._comm['n']
# creates list of possible jobs.
availables = sorted( [(key, u) for key, u in self._alloc.iteritems() if u <= N],
key=itemgetter(1) )
if len(availables) == 0: return []
# hecks first if any jobs fits exactly the available number of nodes.
if availables[-1][1] == N: return [availables[-1][0]]
# creates a map of bins:
bins = {}
for key, u in availables:
if u not in bins: bins[u] = 1
else: bins[u] += 1
def get(n, bins, xvec):
""" Loops over possible combinations. """
from random import choice
key = choice(list(bins.keys()))
for u in xrange(min(bins[key], n // key), -1, -1):
newbins = bins.copy()
del newbins[key]
newn = n - u * key
if newn == 0:
yield xvec + [(key, u)], True
break
for v in list(newbins.keys()):
if v > newn: del newbins[v]
if len(newbins) == 0:
yield xvec + [(key, u)], False
continue
for othervec, perfect in get(newn, newbins, xvec + [(key, u)]):
yield othervec, perfect
if perfect: return
xvec = []
nprocs, njobs = 0, 0
for u, perfect in get(N, bins, []):
if perfect: xvec = u; break
p, j = sum(a*b for a, b in u), sum(a for a, b in u)
if p > nprocs or (p == nprocs and j < njobs):
xvec, nprocs, njobs = list(u), p, j
# now we have a vector with the right number of jobs, but not what those
# jobs are.
results = []
for key, value in xvec:
withkeyprocs = [name for name, n in availables if n == key]
results.extend(withkeyprocs[:value])
return results
[docs] def start(self, comm):
""" Start executing job-folders. """
from .process import Process
from .mpi import MPISizeError
if isinstance(self.processalloc, int):
self.nbpools = comm['n'] // self.processalloc
return super(PoolProcess, self).start(comm)
if Process.start(self, comm): return True
# check max job size.
toolarge = [key for key, u in self._alloc.iteritems() if u > comm['n']]
if len(toolarge):
raise MPISizeError( "The following jobs require too many processors:\n"\
"{0}\n".format(toolarge) )
self._next()
return False
[docs] def update(self, jobfolder, deleteold=False):
""" Updates list of jobs.
Adds jobfolders which are not in ``self.jobfolder`` but in the input.
Deletes those which in ``self.jobfolder`` but not in the input.
Does nothing if job is currently running.
Finished jobs are not updated.
If ``deleteold`` is True, then removed finished jobs from job-folder.
Processes jobfolder from root, even if passed a child folder.
"""
running = set([n for n in self.process])
for name, value in jobfolder.root.iteritems():
if name in running: continue
elif name not in self.jobfolder.root:
newjob = self.jobfolder.root / name
newjob.functional = value.functional
newjob.params.update(value.params)
for key, value in value.__dict__.iteritems():
if key in ['children', 'params', '_functional', 'parent']: continue
setattr(self, key, value)
self._torun.add(name)
self._alloc[name] = self.processalloc(self.jobfolder.root[name])
elif name not in self._finished:
self.jobfolder.root[name] = value
self._alloc[name] = self.processalloc(self.jobfolder.root[name])
for name in self.jobfolder.root.iterkeys():
if name in self._finished and deleteold:
del self.jobfolder.root[name]
self._alloc.pop(name)
elif name not in jobfolder.root:
if name in running: continue
del self.jobfolder.root[name]
if name in self._torun: self._torun.remove(name)
self._alloc.pop(name, None)