Source code for schedMain.taskClass

#!/usr/bin/env python

import os, re, sys, traceback

import schedMisc
import schedMain

#=====================================================================

# Represents one Task, which means one script to be executed in one dir.

[docs]class Task: # Creates Task and appends to allTasks. def __init__( self, bugLev, # Debug level globalDir, # Dir containing global info, including cmd ancDir, # Ancestor of all dirs to be processed execName, # Script name, like 'alpha.py' taskDir, # Dir in which to run the script pbsMap, # Results from showq. jobId -> state, like ST_WAIT useReadOnly, # bool: only print status; don't start tasks redoAll, # If True, rm all status.* files and set status = ST_INIT allTasks): # List of all known tasks self.bugLev = bugLev self.execName = execName self.taskDir = taskDir self.isNew = False self.status = schedMisc.ST_INIT self.jobId = None self.gotPosts = False self.lastPbsDate = None cmdDir = os.path.join( globalDir, schedMisc.cmdDirName) if not os.path.isdir( cmdDir): throwerr('const: cmdDir not found: %s' % (cmdDir,)) # Split execName, like 'alpha.sh', into sname='alpha', stype='sh'. toks = execName.split('.') if len(toks) != 2: throwerr('const: invalid execName: %s' % (execName,)) self.sname = toks[0] self.stype = toks[1] schedMisc.checkNiceName( taskDir + ':' + execName, self.sname) if self.stype not in ['sh', 'py', 'pbs']: throwerr('const: invalid execName: %s' % (execName,)) # Insure taskDir is absolute if not os.path.isabs( taskDir): self.throwerr('const: taskDir not absolute: %s' % (taskDir,)) if taskDir.endswith('/'): self.throwerr('const: taskDir ends with /: %s' % (taskDir,)) # Set execPath = search for execName in taskDir, then in cmdDir. self.execPath = os.path.join( taskDir, execName) if not os.path.exists( self.execPath): self.execPath = os.path.join( cmdDir, execName) if not os.path.exists( self.execPath): self.throwerr('const: execPath not found: %s' % (self.execPath,)) # Read status.* files and check pbsMap to get jobId, if any. self.updateStatus( ancDir, pbsMap) # If an old run of this Task is OK, we're done. if useReadOnly: self.isNew = False if self.bugLev >= 5: print 'Task.const: readOnly: %s %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir),) elif (not redoAll) and self.status == schedMisc.ST_OK: self.isNew = False if self.bugLev >= 5: print 'Task.const: old ok: %s %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir),) # Else we must run the task. else: if self.status >= schedMisc.ST_SUBMIT: # xxx if self.status == schedMisc.ST_SUBMIT # xxx and schedMisc.deltaTimeSec( # self.lastStatusDate, schedMisc.getCurDate()) < 2*60: # xxx assume it's coming? if pbsMap.has_key( self.jobId): print 'taskClass: HPC job is still running for task:\n%s' % (self,) else: self.resetStatus() self.setStatusInit() self.isNew = True if self.bugLev >= 5: print 'Task.const: NEW: %s %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir),) # Add Task to allTasks if schedMisc.findTask( execName, taskDir, allTasks) != None: self.throwerr('const: duplicate task') allTasks.append( self) # Recursive: Task() calls readWorkList for the # prereqs in dirPath / NM_PREWORK, # and readWorkList creates Tasks # Read the pre-requisites for this Task. wkPath = os.path.join( self.taskDir, '%s.%s' % (self.sname, schedMisc.NM_PREWORK,)) if os.path.exists( wkPath): self.preWorks = schedMain.readWorkList( self.bugLev, # Debug level. Typically 0, 1 or 5 globalDir, # Dir containing global info, including subdir cmd ancDir, # An ancestor dir of all dirs to be processed wkPath, # Path of the work list to read self.taskDir, # curDir: Current working dir pbsMap, # Results from showq. jobId -> state, like ST_WAIT useReadOnly, # bool: only print status; don't start tasks False, # redoAll: If true, force all items to run as new. allTasks) # Updated: list of all known Tasks else: self.preWorks = None #==================== # Formats this Task def __str__( self): res = '' res += ' sname: %s\n' % (self.sname,) res += ' execName: %s\n' % (self.execName,) res += ' execPath: %s\n' % (self.execPath,) res += ' taskDir: %s\n' % (self.taskDir,) res += ' status: %s %s\n' % ( self.status, self.getStatusName(),) res += ' jobId: %s\n' % (self.jobId,) res += ' gotPosts: %s\n' % (self.gotPosts,) res += ' lastPbsDate: %s\n' % (self.lastPbsDate,) return res #==================== # Returns the name corresponding to self.status
[docs] def getStatusName( self): stg = schedMisc.statusNames[self.status] return stg #==================== # Returns the path of a status file. # For example, for sname==alpha and stIx==ST_OK, # the path would 'alpha.status.ok'.
[docs] def getStatusPath( self, stIx): res = '%s.%s.%s' \ % (self.sname, schedMisc.NM_STATUS, schedMisc.statusNames[stIx],) return res #==================== # Removes all status files and sets status = ST_INIT, isNew = True.
[docs] def resetStatus( self): # Remove all status files for ix in range( 0, len( schedMisc.statusNames)): spath = os.path.join( self.taskDir, self.getStatusPath(ix)) if os.path.exists( spath): os.remove( spath) self.isNew = True self.jobId = None self.setStatusInit() #==================== # Formats this task in one line.
[docs] def formatShort( self, ancDir): # Ancestor of all dirs to be processed rtStg = 'old' if self.isNew: rtStg = 'new' res = '%s %s st: %s %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir), self.getStatusName(), rtStg,) return res #==================== # Returns one line of table format. Used by schedMain.py:printTasks.
[docs] def formatTable( self, ancDir): # Ancestor of all dirs to be processed # Coord with schedMain.py: printTasks newStg = '' if self.isNew: newStg = 'new' numPre = schedMisc.getNumNotOk( self.preWorks) if self.status == schedMisc.ST_INIT and numPre == 0: preStg = '*' else: preStg = ' ' preStg += '%3d' % (numPre,) res = ' %-20s %-8s %-3s %-6s %4s %s' \ % (self.execName, self.jobId, newStg, self.getStatusName(), preStg, schedMisc.shortName( ancDir, self.taskDir),) return res #==================== # If execPath ends with .sh or .py, run it immediately. # If execPath ends with .pbs, use qsub to submit it. # # However, if the file globalDir/SCHED_BASH exists, # instead of using qsub we run the pbs file with bash.
[docs] def runTask( self, hostType, # System type: peregrine or stampede or ... globalDir, # Dir containing global info, including cmd ancDir, # Ancestor of all dirs to be processed pbsMap): # Results from showq. jobId -> state, like ST_WAIT if self.status != schedMisc.ST_INIT: self.throwerr('runTask: wrong status: %s' % (self.status)) if not os.path.isdir( self.taskDir): os.mkdir( self.taskDir) # Remove any old status files for stIx in range( 0, len( schedMisc.statusNames)): spath = os.path.join( self.taskDir, self.getStatusPath(stIx)) if os.path.exists( spath): os.remove( spath) if self.execPath.endswith('.sh') or self.execPath.endswith('.py'): self.setStatusRun() if self.execPath.endswith('.sh'): interpretor = '/bin/bash' elif self.execPath.endswith('.py'): interpretor = 'python' else: self.throwerr('runTask: invalid self.execPath: %s' % (self.execPath,)) cmdLine = '%s %s -bugLev %d -globalDir %s -hostType %s' \ % (interpretor, self.execPath, self.bugLev, globalDir, hostType,) # Write the cmdLine to curDir, in case we want to review it. fpath = os.path.join( self.taskDir, self.sname + '.cmdLine') with open( fpath, 'w') as fout: print >> fout, cmdLine (rc, stdout, stderr, deltaSec) = schedMisc.runSubprocess( self.taskDir, # wkDir cmdLine, # cmdLine None, # stdInput self.taskDir + '/' + self.sname, # outPrefix False) # showInfo #print 'taskClass.runTask: cmdLine: %s rc: %d deltaSec: %g' \ # % (cmdLine, rc, deltaSec,) self.updateStatus( ancDir, pbsMap) if rc != 0 and self.status != schedMisc.ST_ERROR: self.setStatusError( 'run non-zero rc: %d\n stderr: %s\n stdout: %s\n' % (rc, stderr, stdout,)) if self.status not in [schedMisc.ST_OK, schedMisc.ST_ERROR]: self.setStatusError('process never wrote status file') #xxx change suffix to mpi or qsub or ? elif self.execPath.endswith('.pbs'): if hostType == 'hostLocal': submitCmd = 'bash' else: if hostType == 'peregrine': submitCmd = 'qsub' elif hostType == 'stampede': submitCmd = 'sbatch' else: self.throwerr('unknown hostType: %s' % (hostType,)) cmdLine = '%s %s' % (submitCmd, self.execPath,) # Write the cmdLine to curDir, in case we want to review it. fpath = os.path.join( self.taskDir, self.sname + '.cmdLine') with open( fpath, 'w') as fout: print >> fout, cmdLine (rc, stdout, stderr, deltaSec) = schedMisc.runSubprocess( self.taskDir, # wkDir cmdLine, # cmdLine None, # stdInput self.taskDir + '/' + self.sname, # outPrefix True) # showInfo #print 'taskClass.runTask: cmdLine: %s rc: %d deltaSec: %g' \ # % (cmdLine, rc, deltaSec,) if rc != 0: self.setStatusError( 'submit non-zero rc: %d\n stderr: %s\n stdout: %s\n' % (rc, stderr, stdout,)) else: if hostType != 'hostLocal': if hostType == 'peregrine': # stdout is like: # 439613.admin2 mat = re.match('^(\d+)\.admin\d$', stdout.strip()) if not mat: self.throwerr('unknown stdout from submit:\n%s\n' % (stdout,)) jobId = mat.group(1) # keep as a string self.setStatusSubmit( jobId) elif hostType == 'stampede': # stdout is like: # ----------------------------------------------------------------- # Welcome to the Stampede Supercomputer # ----------------------------------------------------------------- # # --> Verifying valid submit host (login3)...OK # --> Verifying valid jobname...OK # --> Enforcing max jobs per user...OK # --> Verifying availability of your home dir (...)...OK # --> Verifying availability of your work dir (...)...OK # --> Verifying availability of your scratch dir (...)...OK # --> Verifying valid ssh keys...OK # --> Verifying access to desired queue (development)...OK # --> Verifying job request is within current queue limits...OK # --> Checking available allocation (TG-DMR140018)...OK # Submitted batch job 2869448 lines = stdout.strip().split('\n') mat = re.match('^Submitted batch job (\d+)$', lines[-1]) if not mat: self.throwerr('unknown stdout from submit:\n%s\n' % (stdout,)) jobId = mat.group(1) # keep as a string self.setStatusSubmit( jobId) else: self.throwerr('unknown hostType: %s' % (hostType,)) else: self.setStatusError('invalid execPath: %s task:\n%s' % (self.execPath, self,)) #==================== # Checks for status files of the form (sname).status.(status), # and updates our self.status. # # Also check the pbsMap (info from Moab). # If it has new info # write the appropriate file, # (sname).status.(status), and update our self.status.
[docs] def updateStatus( self, ancDir, # Ancestor of all dirs to be processed pbsMap): # Results from showq. jobId -> state, like ST_WAIT if self.bugLev >= 10: print 'updateStatus entry: execName: %s taskDir: %s status: %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir), self.getStatusName(),) # Try to read the files sname.init, sname.submit, sname.wait, ... # to get fileStatus. fileStatus = schedMisc.ST_INIT if os.path.isdir( self.taskDir): for stIx in range( 0, len( schedMisc.statusNames)): spath = os.path.join( self.taskDir, self.getStatusPath( stIx)) if self.bugLev >= 10: print ' test stIx: %d spath: %s result: %s' \ % (stIx, spath, os.path.exists( spath),) if os.path.exists( spath): fileStatus = stIx with open( spath) as fin: lines = fin.readlines() if self.bugLev >= 10: print ' updateStatus: found spath: %s' \ % (schedMisc.shortName( ancDir, spath),) if stIx == schedMisc.ST_SUBMIT and len(lines) >= 2: self.jobId = lines[1].strip() # If anyone called setStatusError, ignore all else if stIx == schedMisc.ST_ERROR: break if self.bugLev >= 10: print ' updateStatus: exec: %s fileStatus: %s jobId: %s pbs: %s' \ % (self.execName, schedMisc.statusNames[ fileStatus], self.jobId, pbsMap.get( self.jobId),) # Check pbsMap to get pbsStatus. pbsStatus = schedMisc.ST_INIT if self.jobId != None: if pbsMap.has_key( self.jobId): pbsStatus = pbsMap[ self.jobId] self.lastPbsDate = schedMisc.getCurDate() else: # else not in pbsMap # Not found in in the qsub system. # If the last note from the qsub system occured long ago, # assume the job died. # Caution: when using the Lustre file system, # sometimes when node A writes a file it may be # several seconds befor the file is visible to node B. # Hence the delay below. if fileStatus in [ schedMisc.ST_WAIT, schedMisc.ST_RUN] \ and self.lastPbsDate != None: dsec = schedMisc.deltaTimeSec( self.lastPbsDate, schedMisc.getCurDate()) if self.bugLev >= 5: print ('updateStatus: testLate: execName: %s' + ' taskDir: %s status: %s dsec: %g') \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir), schedMisc.statusNames[self.status], dsec,) if dsec > 1*60: # xxx hardcoded spath = os.path.join( self.taskDir, self.getStatusPath( schedMisc.ST_ERROR),) if not os.path.exists( spath): self.setStatusError('job quit without writing status file') pbsStatus = schedMisc.ST_ERROR if pbsStatus == schedMisc.ST_OK and fileStatus < schedMisc.ST_OK: # The pbsMap says the task is done, but the task never wrote # the ending status.ok file. So the task must have died. self.setStatusError('task ended without writing status.ok file') newStatus = schedMisc.ST_ERROR else: newStatus = max( fileStatus, pbsStatus) if newStatus < self.status: self.throwerr('updateStatus: new status < old. newStatus: %s' % (newStatus,)) if newStatus != self.status: spath = os.path.join( self.taskDir, self.getStatusPath( newStatus)) if os.path.exists( spath): self.status = newStatus else: self.writeStatusFile( newStatus, extra=None) self.gotPosts = False if self.bugLev >= 10: print 'updateStatus exit: execName: %s taskDir: %s status: %s' \ % (self.execName, schedMisc.shortName( ancDir, self.taskDir), self.getStatusName(),) #==================== # Writes the file (sname).status.init, # with the current date, and sets our self.status = ST_INIT.
[docs] def setStatusInit( self): self.writeStatusFile( schedMisc.ST_INIT, extra=None) #==================== # Writes the file (sname).status.ok, # with the current date, and sets our self.status = ST_OK.
[docs] def setStatusSubmit( self, jobId): self.writeStatusFile( schedMisc.ST_SUBMIT, extra=jobId) #==================== # Writes the file (sname).status.wait, # with the current date, and sets our self.status = ST_WAIT.
[docs] def setStatusWait( self): self.writeStatusFile( schedMisc.ST_WAIT, extra=None) #==================== # Writes the file (sname).status.run, # with the current date, and sets our self.status = ST_RUN.
[docs] def setStatusRun( self): self.writeStatusFile( schedMisc.ST_RUN, extra=None) #==================== # Writes the file (sname).status.ok, # with the current date, and sets our self.status = ST_OK.
[docs] def setStatusOk( self): self.writeStatusFile( schedMisc.ST_OK, extra=None) self.gotPosts = False #==================== # Writes the file (sname).status.error, # with the current date, and sets our self.status = ST_ERROR.
[docs] def setStatusError( self, errMsg): self.writeStatusFile( schedMisc.ST_ERROR, extra=errMsg) self.gotPosts = False #==================== # Writes the file (sname).status.(statusName), # with the current date, and sets our self.status = stIx.
[docs] def writeStatusFile( self, stIx, extra=None): self.status = stIx schedMisc.writeStatusFile( self.sname, self.taskDir, stIx, extra) #==================== # Raises an Exception.
[docs] def throwerr( self, msg): fullMsg = 'taskClass.py: %s\ntask:\n%s' % (msg, self) raise Exception( fullMsg) #=====================================================================