Source code for schedMain.schedMain

#!/usr/bin/env python

import copy, getpass, json, os, re, socket, sys, time, traceback

import schedMisc
import taskClass


[docs]def aaOverview(): ''' This is the main scheduling system. It reads the file ``initWork`` to create an initial work list. Then it repeatedly executes tasks that have satisfied their prerequisites, as those tasks add future tasks to the work list. For example, suppose the ``initWork`` file contains the line:: alpha.sh some/directory Then schedMain will look in ``some/directory`` for the file ``alpha.preWork``. If present it contains prerequisites for alpha.sh. After all the prerequisites have completed with status OK, schedMain will look for the script ``alpha.sh``, first in ``some/directory``, then in the global script area, ``globalDir/cmd``. Then schedMain will run the script ``alpha.sh`` in ``some/directory``. If the work list specifies a script ending in ".pbs", like ``alpha.pbs``, instead of running the scritp schedMain will issue ``qsub alpha.pbs``. When a task ``alpha.sh`` completes typically it writes two files:: alpha.postOkWork # list of tasks to start next alpha.status.ok # indicates OK completion After detecting that the script wrote file ``alpha.status.ok``, schedMain adds any tasks listed in ``alpha.postOkWork`` to the work list. The lines in file ``initWork``, like all work lists (alpha.preWork, alpha.postOkWork, alpha.postErrorWork), have the format:: scriptName dirPath # scriptName may be *.sh, *.py, or *.pbs For example:: alpha.sh some/directory The **sname** is the base name of the scrip - for example, if we strip the suffix (".sh" or ".py" or ".pbs") off ``alpha.sh`` we get the base sname, in this case ``alpha``. The sname is the root of a handful of files related to that script in that dirPath. Using the alpha example:: alpha.preWork # Work list: prerequisites for this task alpha.postOkWork # Work list: tasks to start after this task # writes file status.ok alpha.postErrorWork # (rarely used) Work list: tasks to start after # this task writes file status.error alpha.status.init # time this task was initialized alpha.status.submit # time this task was submitted (like qsub) alpha.status.run # time this task was started running alpha.status.ok # time this task finished successfully alpha.status.error # time and error message if finished unssuccessfully Sometimes some of the status files are omitted, but either status.ok or status.error MUST be written. To write a status file from a shell script alpha.sh use:: date '+%Y-%m-%d_%H:%M:%S' > alpha.status.ok # Or: date '+%Y-%m-%d_%H:%M:%S' > alpha.status.error echo 'Some error message' >> alpha.status.error echo 'More error message' >> alpha.status.error To write a status file from a python script alpha.py use:: import schedMisc schedMisc.setStatusOk( 'alpha', os.getcwd()) # Or: import schedMisc schedMisc.setStatusError( 'alpha', os.getcwd(), 'Some error message.\\nMore error message.') # Or: import datetime dateFmt = '%Y-%m-%d_%H:%M:%S' dateStg = datetime.datetime.now().strftime( dateFmt) fpath = os.path.join( os.getcwd(), 'alpha.status.ok') with open( fpath, 'w') as fout: print >> fout, dateStg # Or: fpath = os.path.join( os.getcwd(), 'alpha.status.error') with open( fpath, 'w') as fout: print >> fout, dateStg print >> fout, 'Some error message' print >> fout, 'More error message' ''' #=====================================================================
[docs]def badparms( msg): ''' Prints an error message and usage info, and exits with rc 1. ''' print '\nError: schedMain.py: %s' % (msg,) print main.__doc__ sys.exit(1) #===================================================================== # Gets the command line parameters and calls scheduleAll, # the main scheduler loop.
[docs]def main(): ''' For an overview, see :func:`aaOverview`. Command line parameters are: ============= ======== =================================================== -bugLev <string> Debug level. Typically 0, 1, or 5. -hostType <string> System type: hostLocal or peregrine or ... -globalDir <string> Dir containing global info, including subdir cmd -ancDir <string> An ancestor dir of all dirs to be processed -initWork <string> File containing the initial work list -delaySec <string> Schedule loop delay, seconds -redoAll <bool> n/y: on restart, redo all even if prior run was ok -useReadOnly <bool> n/y: only print status; do not start tasks ============= ======== =================================================== ''' parmSpecMap = { # Name Type Default 'bugLev': [ 'int', 0, ], 'hostType': [ 'str', 'hostLocal', ], 'globalDir': [ 'str', None, ], 'ancDir': [ 'str', None, ], 'initWork': [ 'str', None, ], 'delaySec': [ 'float', None, ], 'redoAll': [ 'bool', None, ], 'useReadOnly': [ 'bool', False, ], } parmMap = schedMisc.getParmMap( __file__, os.getcwd(), parmSpecMap) if type(parmMap).__name__ == 'str': # if error msg badparms( parmMap) bugLev = parmMap['bugLev'] hostType = parmMap['hostType'] globalDir = parmMap['globalDir'] ancDir = parmMap['ancDir'] initWork = parmMap['initWork'] delaySec = parmMap['delaySec'] redoAll = parmMap['redoAll'] useReadOnly = parmMap['useReadOnly'] globalDir = os.path.abspath( globalDir) ancDir = os.path.abspath( ancDir) # Do everything rc = scheduleAll( bugLev, hostType, globalDir, ancDir, initWork, delaySec, redoAll, useReadOnly) return rc #===================================================================== # Checks and writes the file "schedMain.lockFile" # in the user's home dir to insure only one schedMain # runs at a time. # # Sets allTasks = readWorkList( initWork). # Loops repeatedly through all the allTasks list, # until there is no more work to be done.
[docs]def scheduleAll( bugLev, # Debug level. Typically 0, 1 or 5 hostType, # System type: peregrine or stampede or ... globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed initWork, # File containing the initial work list delaySec, # Schedule loop delay, seconds redoAll, # bool: on restart, redo all even if prior run was ok useReadOnly): # bool: only print status; don't start tasks cmdDir = os.path.join( globalDir, schedMisc.cmdDirName) if not os.path.isdir( cmdDir): throwerr('scheduleAll: cmdDir not found: %s' % (cmdDir,)) # Use lockFile in user's home dir to make sure we don't have # multiple schedMains running for the same directory. lockMap = readLockFile() cwd = os.getcwd() if lockMap.has_key(cwd): throwerr( ('scheduleAll: another schedMain is running.\n' + ' cwd: %s\n prior schedMain: %s') % ( cwd, lockMap[cwd],)) # Get pbsMap: jobId -> state, like schedMisc.ST_WAIT if hostType == 'hostLocal': pbsMap = {} else: pbsMap = schedMisc.getPbsIdMap( bugLev, hostType, getpass.getuser()) # Get the initial work list from the current dir. wkPath = os.path.join( os.getcwd(), initWork) allTasks = [] readWorkList( bugLev, # Debug level. Typically 0, 1 or 5 globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed wkPath, # Path of the work list to read os.getcwd(), # curDir: Current working dir pbsMap, # Results from showq. jobId -> state, like ST_WAIT useReadOnly, # bool: only print status; don't start tasks redoAll, # redoAll: if true, force all items to run as new. allTasks) # Updated: list of all known Tasks lockMap = writeLockFile() # Loop until all the tasks in allTasks are complete rc = 0 try: schedDone = False # assume schedMain has more work to do while not schedDone: checkLockFile( lockMap) # One scan through allTasks schedDone = scheduleTasks( bugLev, # Debug level. Typically 0, 1 or 5 hostType, # System type: peregrine or stampede or ... useReadOnly, # bool: only print status; don't start tasks globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed initWork, # File containing the initial work list allTasks) # Updated: list of all known Tasks if not useReadOnly: time.sleep( delaySec) except Exception, exc: print '\nscheduleAll: caught Exception:' traceback.print_exc( file=sys.stdout) rc = 1 finally: if bugLev >= 1: print 'scheduleAll: cleanup' removeLockEntry() return rc #===================================================================== # Performs one scan through the allTasks list. # Returns boolean schedDone: schedMain is done. # # For each task in allTasks: # updateStatus: Read any taskName.status.* files; get pbs scheduler info # if task status==init and prereqs done: runTask # if task status==ok: read postOkWork list and add to our work list
[docs]def scheduleTasks( bugLev, # Debug level. Typically 0, 1 or 5 hostType, # System type: peregrine or stampede or ... useReadOnly, # bool: only print status; don't start tasks globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed initWork, # File containing the initial work list allTasks): # Updated: list of all known Tasks # Get pbsMap: jobId -> state, like schedMisc.ST_WAIT if hostType == 'hostLocal': pbsMap = {} else: pbsMap = schedMisc.getPbsIdMap( bugLev, hostType, getpass.getuser()) # Scan allTasks and update the status of each. # If any are done, read their POSTOKWORK or POSTERRORWORK file. schedDone = True isMod = True while isMod: isMod = False for task in allTasks: task.updateStatus( ancDir, pbsMap) isModTmp = updateWorkList( bugLev, # Debug level. Typically 0, 1 or 5 task, # The task to check globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed useReadOnly, # bool: only print status; don't start tasks pbsMap, # Results from showq. jobId -> state, like ST_WAIT allTasks) # Updated: list of all known Tasks isMod = isMod or isModTmp if bugLev >= 0: printTasks( ancDir, allTasks) # Start the tasks that are ready. schedDone = True # assume nothing to do: schedMain is done if not useReadOnly: for task in allTasks: if bugLev >= 5: print ' scheduleTasks: loop hd. task: %s' % task.formatShort( ancDir) if task.status in [ schedMisc.ST_SUBMIT, schedMisc.ST_WAIT, schedMisc.ST_RUN]: schedDone = False if task.status == schedMisc.ST_INIT: if schedMisc.getNumNotOk( task.preWorks) == 0: if bugLev >= 0: print 'scheduleTasks: start task: %-15s taskDir: %s' \ % (task.execName, schedMisc.shortName( ancDir, task.taskDir),) task.runTask( hostType, globalDir, ancDir, pbsMap) schedDone = False return schedDone #===================================================================== # Update the status of the given task. # If it's done, read the POSTOKWORK or POSTERRORWORK file.
[docs]def updateWorkList( bugLev, # Debug level. Typically 0, 1 or 5 task, # The task to check globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed useReadOnly, # bool: only print status; don't start tasks pbsMap, # Results from showq. jobId -> state, like ST_WAIT allTasks): # Updated: list of all known Tasks isMod = False mergeName = None # name of list of new work tasks if task.status == schedMisc.ST_OK: if not task.gotPosts: mergeName = schedMisc.NM_POSTOKWORK # file name for new work tasks task.gotPosts = True elif task.status == schedMisc.ST_ERROR: if not task.gotPosts: mergeName = schedMisc.NM_POSTERRORWORK # file name for new work tasks task.gotPosts = True # If we have new work from a postOkWork or postErrorWork file, # merge it into allTasks. if mergeName != None: isMod = True # If the current task is new, force all postReqs to run also. if task.isNew: redoAll = True else: redoAll = False # Read sname.postOkWork or sname.postErrorWork and add to allTasks. wkname = '%s.%s' % (task.sname, mergeName,) wkPath = os.path.join( task.taskDir, wkname) if os.path.exists( wkPath): readWorkList( bugLev, # Debug level. Typically 0, 1 or 5 globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed wkPath, # Path of the work list to read task.taskDir, # curDir: Current working dir pbsMap, # Results from showq. jobId -> state, like ST_WAIT useReadOnly, # bool: only print status; don't start tasks redoAll, # If true, force all items to run as new. allTasks) # Updated: list of all known Tasks return isMod #===================================================================== # Prints a table of all tasks
[docs]def printTasks( ancDir, tasks): counts = schedMisc.ST_NUM_STAT * [0] for task in tasks: counts[task.status] += 1 msg = '' for istat in range( schedMisc.ST_NUM_STAT): cnt = counts[istat] if cnt > 0: msg += ' %s:%d' % (schedMisc.statusNames[istat], cnt,) print '' print 'schedMain %s' % (schedMisc.formatDate( schedMisc.getCurDate()),) if msg != '': print 'task counts: %s' % (msg,) # Coord with taskClass.py: format print ' %-20s %-8s %-3s %-6s %4s %s' \ % ('execName', 'jobId', 'new', 'status', 'npre', 'taskDir',) print ' %-20s %-8s %-3s %-6s %4s %s' \ % ('--------', '-----', '---', '------', '----', '-------',) for task in tasks: print task.formatTable( ancDir) print '' #===================================================================== # Reads a file of work items. # Updates allTasks and returns a list readTasks. # # In the file wkPath, each line has the format: # scriptName dirPath # We strip the suffix (".sh" or ".py" or ".pbs") off scriptName # to get sname. # Blank lines and lines starting with whitespace, then '#', are ignored.
[docs]def readWorkList( bugLev, # Debug level. Typically 0, 1 or 5 globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed wkPath, # Path of the work list to read curDir, # Current working dir pbsMap, # Results from showq. jobId -> state, like ST_WAIT useReadOnly, # bool: only print status; don't start tasks redoAll, # bool. If true, force all items to run as new. allTasks): # Updated: list of all known Tasks if bugLev >= 5: print ' readWorkList entry. wkPath: %s' \ % (schedMisc.shortName( ancDir, wkPath),) if not os.path.isabs( wkPath): throwerr('readWorkList: wkPath not abs: %s' % (wkPath,)) if not os.path.exists( wkPath): throwerr('readWorkList: wkPath not found: %s' % (wkPath,)) with open( wkPath) as fin: lines = fin.readlines() readTasks = [] for line in lines: line = line.strip() if len(line) > 0 and not line.startswith('#'): toks = line.split() if len(toks) != 2: throwerr('readWorkList: invalid line. wkPath: %s line: %s' % (wkPath, line,)) (execName, dirPath) = toks if not os.path.isabs( dirPath): dirPath = os.path.join( curDir, dirPath) dirPath = os.path.abspath( dirPath) # get rid of ending / if not dirPath.startswith( ancDir): throwerr(('readWorkList: worklist dir does not start with ancDir.' + '\n wkPath: %s line: %s') % (wkPath, line,)) task = schedMisc.findTask( execName, dirPath, allTasks) if task == None: # Create Task and append to allTasks. # Recursive: Task() calls readWorkList for the # prereqs in dirPath / NM_PREWORK, # and readWorkList creates Tasks task = taskClass.Task( bugLev, globalDir, ancDir, execName, dirPath, pbsMap, useReadOnly, # bool: only print status; don't start tasks redoAll, allTasks) # updated else: if redoAll and not useReadOnly: task.resetStatus() if bugLev >= 5: print ' readWorkList: reset task: %s' \ % (task.formatShort( ancDir),) readTasks.append( task) if bugLev >= 5: print ' readWorkList exit. wkPath: %s' \ % (schedMisc.shortName( ancDir, wkPath),) for task in readTasks: print ' readTask: %s' % (task.formatShort( ancDir),) return readTasks #===================================================================== # Returns the path to the lock file in the users home dir.
[docs]def getLockPath(): homeDir = os.path.expanduser('~') path = os.path.join( homeDir, schedMisc.NM_LOCKFILE) return path #===================================================================== # Reads the lockFile in the user's home dir # and returns its content.
[docs]def readLockFile(): lockMap = {} path = getLockPath() if os.path.exists( path): with open( path) as fin: lockMap = json.load( fin) return lockMap #===================================================================== # Writes the lockFile in the user's home dir.
[docs]def writeLockFile(): lockMap = readLockFile() lockMap[os.getcwd()] = { 'host': socket.gethostname(), 'user': getpass.getuser(), 'pid': os.getpid(), 'date': schedMisc.formatDate( schedMisc.getCurDate()), } with open( getLockPath(), 'w') as fout: json.dump( lockMap, fout, indent=2, sort_keys=True) return lockMap #===================================================================== # Checks that the lockFile in the user's home dir # has the specified value.
[docs]def checkLockFile( lockMap): lmap = readLockFile() cwd = os.getcwd() if lockMap[cwd] != lmap[cwd]: throwerr('checkLockFile: lockValue mismatch.\n old: %s\n new: %s' % (lockMap[cwd], lmap[cwd],)) #===================================================================== # Removes the lockFile in the user's home dir.
[docs]def removeLockEntry(): lockMap = readLockFile() del lockMap[os.getcwd()] with open( getLockPath(), 'w') as fout: json.dump( lockMap, fout, indent=2, sort_keys=True) #===================================================================== # Raises an Exception.
[docs]def throwerr( msg): fullMsg = 'schedMain.py: ' + msg raise Exception( fullMsg) #=====================================================================
if __name__ == '__main__': main()