#!/usr/bin/env python
import copy, getpass, json, os, re, socket, sys, time, traceback
import schedMisc
import taskClass
[docs]def aaOverview():
'''
This is the main scheduling system.
It reads the file ``initWork`` to create an initial work list.
Then it repeatedly executes tasks that have satisfied their
prerequisites, as those tasks add future tasks to the work list.
For example, suppose the ``initWork`` file contains the line::
alpha.sh some/directory
Then schedMain will look in ``some/directory`` for the
file ``alpha.preWork``. If present it contains prerequisites
for alpha.sh.
After all the prerequisites have completed with status OK,
schedMain will look for the script ``alpha.sh``,
first in ``some/directory``, then in the global
script area, ``globalDir/cmd``.
Then schedMain will run the script ``alpha.sh`` in ``some/directory``.
If the work list specifies a script ending in ".pbs",
like ``alpha.pbs``, instead of running the scritp schedMain will
issue ``qsub alpha.pbs``.
When a task ``alpha.sh`` completes typically it writes two files::
alpha.postOkWork # list of tasks to start next
alpha.status.ok # indicates OK completion
After detecting that the script wrote file ``alpha.status.ok``,
schedMain
adds any tasks listed in ``alpha.postOkWork`` to the work list.
The lines in file ``initWork``, like all work lists
(alpha.preWork, alpha.postOkWork, alpha.postErrorWork),
have the format::
scriptName dirPath # scriptName may be *.sh, *.py, or *.pbs
For example::
alpha.sh some/directory
The **sname** is the base name of the scrip - for example,
if we strip the suffix (".sh" or ".py" or ".pbs") off ``alpha.sh``
we get the base sname, in this case ``alpha``.
The sname is the root of a handful of files related to that script
in that dirPath. Using the alpha example::
alpha.preWork # Work list: prerequisites for this task
alpha.postOkWork # Work list: tasks to start after this task
# writes file status.ok
alpha.postErrorWork # (rarely used) Work list: tasks to start after
# this task writes file status.error
alpha.status.init # time this task was initialized
alpha.status.submit # time this task was submitted (like qsub)
alpha.status.run # time this task was started running
alpha.status.ok # time this task finished successfully
alpha.status.error # time and error message if finished unssuccessfully
Sometimes some of the status files are omitted,
but either status.ok or status.error MUST be written.
To write a status file from a shell script alpha.sh use::
date '+%Y-%m-%d_%H:%M:%S' > alpha.status.ok
# Or:
date '+%Y-%m-%d_%H:%M:%S' > alpha.status.error
echo 'Some error message' >> alpha.status.error
echo 'More error message' >> alpha.status.error
To write a status file from a python script alpha.py use::
import schedMisc
schedMisc.setStatusOk( 'alpha', os.getcwd())
# Or:
import schedMisc
schedMisc.setStatusError( 'alpha', os.getcwd(),
'Some error message.\\nMore error message.')
# Or:
import datetime
dateFmt = '%Y-%m-%d_%H:%M:%S'
dateStg = datetime.datetime.now().strftime( dateFmt)
fpath = os.path.join( os.getcwd(), 'alpha.status.ok')
with open( fpath, 'w') as fout:
print >> fout, dateStg
# Or:
fpath = os.path.join( os.getcwd(), 'alpha.status.error')
with open( fpath, 'w') as fout:
print >> fout, dateStg
print >> fout, 'Some error message'
print >> fout, 'More error message'
'''
#=====================================================================
[docs]def badparms( msg):
'''
Prints an error message and usage info, and exits with rc 1.
'''
print '\nError: schedMain.py: %s' % (msg,)
print main.__doc__
sys.exit(1)
#=====================================================================
# Gets the command line parameters and calls scheduleAll,
# the main scheduler loop.
[docs]def main():
'''
For an overview, see :func:`aaOverview`.
Command line parameters are:
============= ======== ===================================================
-bugLev <string> Debug level. Typically 0, 1, or 5.
-hostType <string> System type: hostLocal or peregrine or ...
-globalDir <string> Dir containing global info, including subdir cmd
-ancDir <string> An ancestor dir of all dirs to be processed
-initWork <string> File containing the initial work list
-delaySec <string> Schedule loop delay, seconds
-redoAll <bool> n/y: on restart, redo all even if prior run was ok
-useReadOnly <bool> n/y: only print status; do not start tasks
============= ======== ===================================================
'''
parmSpecMap = {
# Name Type Default
'bugLev': [ 'int', 0, ],
'hostType': [ 'str', 'hostLocal', ],
'globalDir': [ 'str', None, ],
'ancDir': [ 'str', None, ],
'initWork': [ 'str', None, ],
'delaySec': [ 'float', None, ],
'redoAll': [ 'bool', None, ],
'useReadOnly': [ 'bool', False, ],
}
parmMap = schedMisc.getParmMap( __file__, os.getcwd(), parmSpecMap)
if type(parmMap).__name__ == 'str': # if error msg
badparms( parmMap)
bugLev = parmMap['bugLev']
hostType = parmMap['hostType']
globalDir = parmMap['globalDir']
ancDir = parmMap['ancDir']
initWork = parmMap['initWork']
delaySec = parmMap['delaySec']
redoAll = parmMap['redoAll']
useReadOnly = parmMap['useReadOnly']
globalDir = os.path.abspath( globalDir)
ancDir = os.path.abspath( ancDir)
# Do everything
rc = scheduleAll(
bugLev, hostType, globalDir, ancDir,
initWork, delaySec, redoAll, useReadOnly)
return rc
#=====================================================================
# Checks and writes the file "schedMain.lockFile"
# in the user's home dir to insure only one schedMain
# runs at a time.
#
# Sets allTasks = readWorkList( initWork).
# Loops repeatedly through all the allTasks list,
# until there is no more work to be done.
[docs]def scheduleAll(
bugLev, # Debug level. Typically 0, 1 or 5
hostType, # System type: peregrine or stampede or ...
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
initWork, # File containing the initial work list
delaySec, # Schedule loop delay, seconds
redoAll, # bool: on restart, redo all even if prior run was ok
useReadOnly): # bool: only print status; don't start tasks
cmdDir = os.path.join( globalDir, schedMisc.cmdDirName)
if not os.path.isdir( cmdDir):
throwerr('scheduleAll: cmdDir not found: %s' % (cmdDir,))
# Use lockFile in user's home dir to make sure we don't have
# multiple schedMains running for the same directory.
lockMap = readLockFile()
cwd = os.getcwd()
if lockMap.has_key(cwd):
throwerr(
('scheduleAll: another schedMain is running.\n'
+ ' cwd: %s\n prior schedMain: %s')
% ( cwd, lockMap[cwd],))
# Get pbsMap: jobId -> state, like schedMisc.ST_WAIT
if hostType == 'hostLocal':
pbsMap = {}
else:
pbsMap = schedMisc.getPbsIdMap( bugLev, hostType, getpass.getuser())
# Get the initial work list from the current dir.
wkPath = os.path.join( os.getcwd(), initWork)
allTasks = []
readWorkList(
bugLev, # Debug level. Typically 0, 1 or 5
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
wkPath, # Path of the work list to read
os.getcwd(), # curDir: Current working dir
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
useReadOnly, # bool: only print status; don't start tasks
redoAll, # redoAll: if true, force all items to run as new.
allTasks) # Updated: list of all known Tasks
lockMap = writeLockFile()
# Loop until all the tasks in allTasks are complete
rc = 0
try:
schedDone = False # assume schedMain has more work to do
while not schedDone:
checkLockFile( lockMap)
# One scan through allTasks
schedDone = scheduleTasks(
bugLev, # Debug level. Typically 0, 1 or 5
hostType, # System type: peregrine or stampede or ...
useReadOnly, # bool: only print status; don't start tasks
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
initWork, # File containing the initial work list
allTasks) # Updated: list of all known Tasks
if not useReadOnly:
time.sleep( delaySec)
except Exception, exc:
print '\nscheduleAll: caught Exception:'
traceback.print_exc( file=sys.stdout)
rc = 1
finally:
if bugLev >= 1: print 'scheduleAll: cleanup'
removeLockEntry()
return rc
#=====================================================================
# Performs one scan through the allTasks list.
# Returns boolean schedDone: schedMain is done.
#
# For each task in allTasks:
# updateStatus: Read any taskName.status.* files; get pbs scheduler info
# if task status==init and prereqs done: runTask
# if task status==ok: read postOkWork list and add to our work list
[docs]def scheduleTasks(
bugLev, # Debug level. Typically 0, 1 or 5
hostType, # System type: peregrine or stampede or ...
useReadOnly, # bool: only print status; don't start tasks
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
initWork, # File containing the initial work list
allTasks): # Updated: list of all known Tasks
# Get pbsMap: jobId -> state, like schedMisc.ST_WAIT
if hostType == 'hostLocal': pbsMap = {}
else: pbsMap = schedMisc.getPbsIdMap( bugLev, hostType, getpass.getuser())
# Scan allTasks and update the status of each.
# If any are done, read their POSTOKWORK or POSTERRORWORK file.
schedDone = True
isMod = True
while isMod:
isMod = False
for task in allTasks:
task.updateStatus( ancDir, pbsMap)
isModTmp = updateWorkList(
bugLev, # Debug level. Typically 0, 1 or 5
task, # The task to check
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
useReadOnly, # bool: only print status; don't start tasks
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
allTasks) # Updated: list of all known Tasks
isMod = isMod or isModTmp
if bugLev >= 0:
printTasks( ancDir, allTasks)
# Start the tasks that are ready.
schedDone = True # assume nothing to do: schedMain is done
if not useReadOnly:
for task in allTasks:
if bugLev >= 5:
print ' scheduleTasks: loop hd. task: %s' % task.formatShort( ancDir)
if task.status in [
schedMisc.ST_SUBMIT, schedMisc.ST_WAIT, schedMisc.ST_RUN]:
schedDone = False
if task.status == schedMisc.ST_INIT:
if schedMisc.getNumNotOk( task.preWorks) == 0:
if bugLev >= 0:
print 'scheduleTasks: start task: %-15s taskDir: %s' \
% (task.execName, schedMisc.shortName( ancDir, task.taskDir),)
task.runTask( hostType, globalDir, ancDir, pbsMap)
schedDone = False
return schedDone
#=====================================================================
# Update the status of the given task.
# If it's done, read the POSTOKWORK or POSTERRORWORK file.
[docs]def updateWorkList(
bugLev, # Debug level. Typically 0, 1 or 5
task, # The task to check
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
useReadOnly, # bool: only print status; don't start tasks
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
allTasks): # Updated: list of all known Tasks
isMod = False
mergeName = None # name of list of new work tasks
if task.status == schedMisc.ST_OK:
if not task.gotPosts:
mergeName = schedMisc.NM_POSTOKWORK # file name for new work tasks
task.gotPosts = True
elif task.status == schedMisc.ST_ERROR:
if not task.gotPosts:
mergeName = schedMisc.NM_POSTERRORWORK # file name for new work tasks
task.gotPosts = True
# If we have new work from a postOkWork or postErrorWork file,
# merge it into allTasks.
if mergeName != None:
isMod = True
# If the current task is new, force all postReqs to run also.
if task.isNew: redoAll = True
else: redoAll = False
# Read sname.postOkWork or sname.postErrorWork and add to allTasks.
wkname = '%s.%s' % (task.sname, mergeName,)
wkPath = os.path.join( task.taskDir, wkname)
if os.path.exists( wkPath):
readWorkList(
bugLev, # Debug level. Typically 0, 1 or 5
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
wkPath, # Path of the work list to read
task.taskDir, # curDir: Current working dir
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
useReadOnly, # bool: only print status; don't start tasks
redoAll, # If true, force all items to run as new.
allTasks) # Updated: list of all known Tasks
return isMod
#=====================================================================
# Prints a table of all tasks
[docs]def printTasks( ancDir, tasks):
counts = schedMisc.ST_NUM_STAT * [0]
for task in tasks:
counts[task.status] += 1
msg = ''
for istat in range( schedMisc.ST_NUM_STAT):
cnt = counts[istat]
if cnt > 0:
msg += ' %s:%d' % (schedMisc.statusNames[istat], cnt,)
print ''
print 'schedMain %s' % (schedMisc.formatDate( schedMisc.getCurDate()),)
if msg != '':
print 'task counts: %s' % (msg,)
# Coord with taskClass.py: format
print ' %-20s %-8s %-3s %-6s %4s %s' \
% ('execName', 'jobId', 'new', 'status', 'npre', 'taskDir',)
print ' %-20s %-8s %-3s %-6s %4s %s' \
% ('--------', '-----', '---', '------', '----', '-------',)
for task in tasks:
print task.formatTable( ancDir)
print ''
#=====================================================================
# Reads a file of work items.
# Updates allTasks and returns a list readTasks.
#
# In the file wkPath, each line has the format:
# scriptName dirPath
# We strip the suffix (".sh" or ".py" or ".pbs") off scriptName
# to get sname.
# Blank lines and lines starting with whitespace, then '#', are ignored.
[docs]def readWorkList(
bugLev, # Debug level. Typically 0, 1 or 5
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
wkPath, # Path of the work list to read
curDir, # Current working dir
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
useReadOnly, # bool: only print status; don't start tasks
redoAll, # bool. If true, force all items to run as new.
allTasks): # Updated: list of all known Tasks
if bugLev >= 5:
print ' readWorkList entry. wkPath: %s' \
% (schedMisc.shortName( ancDir, wkPath),)
if not os.path.isabs( wkPath):
throwerr('readWorkList: wkPath not abs: %s' % (wkPath,))
if not os.path.exists( wkPath):
throwerr('readWorkList: wkPath not found: %s' % (wkPath,))
with open( wkPath) as fin:
lines = fin.readlines()
readTasks = []
for line in lines:
line = line.strip()
if len(line) > 0 and not line.startswith('#'):
toks = line.split()
if len(toks) != 2:
throwerr('readWorkList: invalid line. wkPath: %s line: %s'
% (wkPath, line,))
(execName, dirPath) = toks
if not os.path.isabs( dirPath):
dirPath = os.path.join( curDir, dirPath)
dirPath = os.path.abspath( dirPath) # get rid of ending /
if not dirPath.startswith( ancDir):
throwerr(('readWorkList: worklist dir does not start with ancDir.'
+ '\n wkPath: %s line: %s') % (wkPath, line,))
task = schedMisc.findTask( execName, dirPath, allTasks)
if task == None:
# Create Task and append to allTasks.
# Recursive: Task() calls readWorkList for the
# prereqs in dirPath / NM_PREWORK,
# and readWorkList creates Tasks
task = taskClass.Task(
bugLev, globalDir, ancDir, execName, dirPath, pbsMap,
useReadOnly, # bool: only print status; don't start tasks
redoAll,
allTasks) # updated
else:
if redoAll and not useReadOnly:
task.resetStatus()
if bugLev >= 5:
print ' readWorkList: reset task: %s' \
% (task.formatShort( ancDir),)
readTasks.append( task)
if bugLev >= 5:
print ' readWorkList exit. wkPath: %s' \
% (schedMisc.shortName( ancDir, wkPath),)
for task in readTasks:
print ' readTask: %s' % (task.formatShort( ancDir),)
return readTasks
#=====================================================================
# Returns the path to the lock file in the users home dir.
[docs]def getLockPath():
homeDir = os.path.expanduser('~')
path = os.path.join( homeDir, schedMisc.NM_LOCKFILE)
return path
#=====================================================================
# Reads the lockFile in the user's home dir
# and returns its content.
[docs]def readLockFile():
lockMap = {}
path = getLockPath()
if os.path.exists( path):
with open( path) as fin:
lockMap = json.load( fin)
return lockMap
#=====================================================================
# Writes the lockFile in the user's home dir.
[docs]def writeLockFile():
lockMap = readLockFile()
lockMap[os.getcwd()] = {
'host': socket.gethostname(),
'user': getpass.getuser(),
'pid': os.getpid(),
'date': schedMisc.formatDate( schedMisc.getCurDate()),
}
with open( getLockPath(), 'w') as fout:
json.dump( lockMap, fout, indent=2, sort_keys=True)
return lockMap
#=====================================================================
# Checks that the lockFile in the user's home dir
# has the specified value.
[docs]def checkLockFile( lockMap):
lmap = readLockFile()
cwd = os.getcwd()
if lockMap[cwd] != lmap[cwd]:
throwerr('checkLockFile: lockValue mismatch.\n old: %s\n new: %s'
% (lockMap[cwd], lmap[cwd],))
#=====================================================================
# Removes the lockFile in the user's home dir.
[docs]def removeLockEntry():
lockMap = readLockFile()
del lockMap[os.getcwd()]
with open( getLockPath(), 'w') as fout:
json.dump( lockMap, fout, indent=2, sort_keys=True)
#=====================================================================
# Raises an Exception.
[docs]def throwerr( msg):
fullMsg = 'schedMain.py: ' + msg
raise Exception( fullMsg)
#=====================================================================
if __name__ == '__main__': main()