#!/usr/bin/env python
import os, re, sys, traceback
import schedMisc
import schedMain
#=====================================================================
# Represents one Task, which means one script to be executed in one dir.
[docs]class Task:
# Creates Task and appends to allTasks.
def __init__(
self,
bugLev, # Debug level
globalDir, # Dir containing global info, including cmd
ancDir, # Ancestor of all dirs to be processed
execName, # Script name, like 'alpha.py'
taskDir, # Dir in which to run the script
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
useReadOnly, # bool: only print status; don't start tasks
redoAll, # If True, rm all status.* files and set status = ST_INIT
allTasks): # List of all known tasks
self.bugLev = bugLev
self.execName = execName
self.taskDir = taskDir
self.isNew = False
self.status = schedMisc.ST_INIT
self.jobId = None
self.gotPosts = False
self.lastPbsDate = None
cmdDir = os.path.join( globalDir, schedMisc.cmdDirName)
if not os.path.isdir( cmdDir):
throwerr('const: cmdDir not found: %s' % (cmdDir,))
# Split execName, like 'alpha.sh', into sname='alpha', stype='sh'.
toks = execName.split('.')
if len(toks) != 2:
throwerr('const: invalid execName: %s' % (execName,))
self.sname = toks[0]
self.stype = toks[1]
schedMisc.checkNiceName( taskDir + ':' + execName, self.sname)
if self.stype not in ['sh', 'py', 'pbs']:
throwerr('const: invalid execName: %s' % (execName,))
# Insure taskDir is absolute
if not os.path.isabs( taskDir):
self.throwerr('const: taskDir not absolute: %s' % (taskDir,))
if taskDir.endswith('/'):
self.throwerr('const: taskDir ends with /: %s' % (taskDir,))
# Set execPath = search for execName in taskDir, then in cmdDir.
self.execPath = os.path.join( taskDir, execName)
if not os.path.exists( self.execPath):
self.execPath = os.path.join( cmdDir, execName)
if not os.path.exists( self.execPath):
self.throwerr('const: execPath not found: %s' % (self.execPath,))
# Read status.* files and check pbsMap to get jobId, if any.
self.updateStatus( ancDir, pbsMap)
# If an old run of this Task is OK, we're done.
if useReadOnly:
self.isNew = False
if self.bugLev >= 5:
print 'Task.const: readOnly: %s %s' \
% (self.execName, schedMisc.shortName( ancDir, self.taskDir),)
elif (not redoAll) and self.status == schedMisc.ST_OK:
self.isNew = False
if self.bugLev >= 5:
print 'Task.const: old ok: %s %s' \
% (self.execName, schedMisc.shortName( ancDir, self.taskDir),)
# Else we must run the task.
else:
if self.status >= schedMisc.ST_SUBMIT:
# xxx if self.status == schedMisc.ST_SUBMIT
# xxx and schedMisc.deltaTimeSec(
# self.lastStatusDate, schedMisc.getCurDate()) < 2*60:
# xxx assume it's coming?
if pbsMap.has_key( self.jobId):
print 'taskClass: HPC job is still running for task:\n%s' % (self,)
else:
self.resetStatus()
self.setStatusInit()
self.isNew = True
if self.bugLev >= 5:
print 'Task.const: NEW: %s %s' \
% (self.execName, schedMisc.shortName( ancDir, self.taskDir),)
# Add Task to allTasks
if schedMisc.findTask( execName, taskDir, allTasks) != None:
self.throwerr('const: duplicate task')
allTasks.append( self)
# Recursive: Task() calls readWorkList for the
# prereqs in dirPath / NM_PREWORK,
# and readWorkList creates Tasks
# Read the pre-requisites for this Task.
wkPath = os.path.join(
self.taskDir, '%s.%s' % (self.sname, schedMisc.NM_PREWORK,))
if os.path.exists( wkPath):
self.preWorks = schedMain.readWorkList(
self.bugLev, # Debug level. Typically 0, 1 or 5
globalDir, # Dir containing global info, including subdir cmd
ancDir, # An ancestor dir of all dirs to be processed
wkPath, # Path of the work list to read
self.taskDir, # curDir: Current working dir
pbsMap, # Results from showq. jobId -> state, like ST_WAIT
useReadOnly, # bool: only print status; don't start tasks
False, # redoAll: If true, force all items to run as new.
allTasks) # Updated: list of all known Tasks
else: self.preWorks = None
#====================
# Formats this Task
def __str__( self):
res = ''
res += ' sname: %s\n' % (self.sname,)
res += ' execName: %s\n' % (self.execName,)
res += ' execPath: %s\n' % (self.execPath,)
res += ' taskDir: %s\n' % (self.taskDir,)
res += ' status: %s %s\n' % ( self.status, self.getStatusName(),)
res += ' jobId: %s\n' % (self.jobId,)
res += ' gotPosts: %s\n' % (self.gotPosts,)
res += ' lastPbsDate: %s\n' % (self.lastPbsDate,)
return res
#====================
# Returns the name corresponding to self.status
[docs] def getStatusName( self):
stg = schedMisc.statusNames[self.status]
return stg
#====================
# Returns the path of a status file.
# For example, for sname==alpha and stIx==ST_OK,
# the path would 'alpha.status.ok'.
[docs] def getStatusPath( self, stIx):
res = '%s.%s.%s' \
% (self.sname, schedMisc.NM_STATUS, schedMisc.statusNames[stIx],)
return res
#====================
# Removes all status files and sets status = ST_INIT, isNew = True.
[docs] def resetStatus( self):
# Remove all status files
for ix in range( 0, len( schedMisc.statusNames)):
spath = os.path.join( self.taskDir, self.getStatusPath(ix))
if os.path.exists( spath):
os.remove( spath)
self.isNew = True
self.jobId = None
self.setStatusInit()
#====================
# Formats this task in one line.
[docs] def runTask(
self,
hostType, # System type: peregrine or stampede or ...
globalDir, # Dir containing global info, including cmd
ancDir, # Ancestor of all dirs to be processed
pbsMap): # Results from showq. jobId -> state, like ST_WAIT
if self.status != schedMisc.ST_INIT:
self.throwerr('runTask: wrong status: %s' % (self.status))
if not os.path.isdir( self.taskDir):
os.mkdir( self.taskDir)
# Remove any old status files
for stIx in range( 0, len( schedMisc.statusNames)):
spath = os.path.join( self.taskDir, self.getStatusPath(stIx))
if os.path.exists( spath): os.remove( spath)
if self.execPath.endswith('.sh') or self.execPath.endswith('.py'):
self.setStatusRun()
if self.execPath.endswith('.sh'): interpretor = '/bin/bash'
elif self.execPath.endswith('.py'): interpretor = 'python'
else:
self.throwerr('runTask: invalid self.execPath: %s' % (self.execPath,))
cmdLine = '%s %s -bugLev %d -globalDir %s -hostType %s' \
% (interpretor, self.execPath, self.bugLev,
globalDir, hostType,)
# Write the cmdLine to curDir, in case we want to review it.
fpath = os.path.join( self.taskDir, self.sname + '.cmdLine')
with open( fpath, 'w') as fout:
print >> fout, cmdLine
(rc, stdout, stderr, deltaSec) = schedMisc.runSubprocess(
self.taskDir, # wkDir
cmdLine, # cmdLine
None, # stdInput
self.taskDir + '/' + self.sname, # outPrefix
False) # showInfo
#print 'taskClass.runTask: cmdLine: %s rc: %d deltaSec: %g' \
# % (cmdLine, rc, deltaSec,)
self.updateStatus( ancDir, pbsMap)
if rc != 0 and self.status != schedMisc.ST_ERROR:
self.setStatusError(
'run non-zero rc: %d\n stderr: %s\n stdout: %s\n'
% (rc, stderr, stdout,))
if self.status not in [schedMisc.ST_OK, schedMisc.ST_ERROR]:
self.setStatusError('process never wrote status file')
#xxx change suffix to mpi or qsub or ?
elif self.execPath.endswith('.pbs'):
if hostType == 'hostLocal': submitCmd = 'bash'
else:
if hostType == 'peregrine':
submitCmd = 'qsub'
elif hostType == 'stampede':
submitCmd = 'sbatch'
else: self.throwerr('unknown hostType: %s' % (hostType,))
cmdLine = '%s %s' % (submitCmd, self.execPath,)
# Write the cmdLine to curDir, in case we want to review it.
fpath = os.path.join( self.taskDir, self.sname + '.cmdLine')
with open( fpath, 'w') as fout:
print >> fout, cmdLine
(rc, stdout, stderr, deltaSec) = schedMisc.runSubprocess(
self.taskDir, # wkDir
cmdLine, # cmdLine
None, # stdInput
self.taskDir + '/' + self.sname, # outPrefix
True) # showInfo
#print 'taskClass.runTask: cmdLine: %s rc: %d deltaSec: %g' \
# % (cmdLine, rc, deltaSec,)
if rc != 0:
self.setStatusError(
'submit non-zero rc: %d\n stderr: %s\n stdout: %s\n'
% (rc, stderr, stdout,))
else:
if hostType != 'hostLocal':
if hostType == 'peregrine':
# stdout is like:
# 439613.admin2
mat = re.match('^(\d+)\.admin\d$', stdout.strip())
if not mat:
self.throwerr('unknown stdout from submit:\n%s\n' % (stdout,))
jobId = mat.group(1) # keep as a string
self.setStatusSubmit( jobId)
elif hostType == 'stampede':
# stdout is like:
# -----------------------------------------------------------------
# Welcome to the Stampede Supercomputer
# -----------------------------------------------------------------
#
# --> Verifying valid submit host (login3)...OK
# --> Verifying valid jobname...OK
# --> Enforcing max jobs per user...OK
# --> Verifying availability of your home dir (...)...OK
# --> Verifying availability of your work dir (...)...OK
# --> Verifying availability of your scratch dir (...)...OK
# --> Verifying valid ssh keys...OK
# --> Verifying access to desired queue (development)...OK
# --> Verifying job request is within current queue limits...OK
# --> Checking available allocation (TG-DMR140018)...OK
# Submitted batch job 2869448
lines = stdout.strip().split('\n')
mat = re.match('^Submitted batch job (\d+)$', lines[-1])
if not mat:
self.throwerr('unknown stdout from submit:\n%s\n' % (stdout,))
jobId = mat.group(1) # keep as a string
self.setStatusSubmit( jobId)
else: self.throwerr('unknown hostType: %s' % (hostType,))
else:
self.setStatusError('invalid execPath: %s task:\n%s'
% (self.execPath, self,))
#====================
# Checks for status files of the form (sname).status.(status),
# and updates our self.status.
#
# Also check the pbsMap (info from Moab).
# If it has new info # write the appropriate file,
# (sname).status.(status), and update our self.status.
[docs] def updateStatus(
self,
ancDir, # Ancestor of all dirs to be processed
pbsMap): # Results from showq. jobId -> state, like ST_WAIT
if self.bugLev >= 10:
print 'updateStatus entry: execName: %s taskDir: %s status: %s' \
% (self.execName,
schedMisc.shortName( ancDir, self.taskDir),
self.getStatusName(),)
# Try to read the files sname.init, sname.submit, sname.wait, ...
# to get fileStatus.
fileStatus = schedMisc.ST_INIT
if os.path.isdir( self.taskDir):
for stIx in range( 0, len( schedMisc.statusNames)):
spath = os.path.join( self.taskDir, self.getStatusPath( stIx))
if self.bugLev >= 10:
print ' test stIx: %d spath: %s result: %s' \
% (stIx, spath, os.path.exists( spath),)
if os.path.exists( spath):
fileStatus = stIx
with open( spath) as fin:
lines = fin.readlines()
if self.bugLev >= 10:
print ' updateStatus: found spath: %s' \
% (schedMisc.shortName( ancDir, spath),)
if stIx == schedMisc.ST_SUBMIT and len(lines) >= 2:
self.jobId = lines[1].strip()
# If anyone called setStatusError, ignore all else
if stIx == schedMisc.ST_ERROR: break
if self.bugLev >= 10:
print ' updateStatus: exec: %s fileStatus: %s jobId: %s pbs: %s' \
% (self.execName,
schedMisc.statusNames[ fileStatus],
self.jobId,
pbsMap.get( self.jobId),)
# Check pbsMap to get pbsStatus.
pbsStatus = schedMisc.ST_INIT
if self.jobId != None:
if pbsMap.has_key( self.jobId):
pbsStatus = pbsMap[ self.jobId]
self.lastPbsDate = schedMisc.getCurDate()
else: # else not in pbsMap
# Not found in in the qsub system.
# If the last note from the qsub system occured long ago,
# assume the job died.
# Caution: when using the Lustre file system,
# sometimes when node A writes a file it may be
# several seconds befor the file is visible to node B.
# Hence the delay below.
if fileStatus in [ schedMisc.ST_WAIT, schedMisc.ST_RUN] \
and self.lastPbsDate != None:
dsec = schedMisc.deltaTimeSec(
self.lastPbsDate, schedMisc.getCurDate())
if self.bugLev >= 5:
print ('updateStatus: testLate: execName: %s'
+ ' taskDir: %s status: %s dsec: %g') \
% (self.execName,
schedMisc.shortName( ancDir, self.taskDir),
schedMisc.statusNames[self.status],
dsec,)
if dsec > 1*60: # xxx hardcoded
spath = os.path.join(
self.taskDir, self.getStatusPath( schedMisc.ST_ERROR),)
if not os.path.exists( spath):
self.setStatusError('job quit without writing status file')
pbsStatus = schedMisc.ST_ERROR
if pbsStatus == schedMisc.ST_OK and fileStatus < schedMisc.ST_OK:
# The pbsMap says the task is done, but the task never wrote
# the ending status.ok file. So the task must have died.
self.setStatusError('task ended without writing status.ok file')
newStatus = schedMisc.ST_ERROR
else:
newStatus = max( fileStatus, pbsStatus)
if newStatus < self.status:
self.throwerr('updateStatus: new status < old. newStatus: %s'
% (newStatus,))
if newStatus != self.status:
spath = os.path.join( self.taskDir, self.getStatusPath( newStatus))
if os.path.exists( spath):
self.status = newStatus
else:
self.writeStatusFile( newStatus, extra=None)
self.gotPosts = False
if self.bugLev >= 10:
print 'updateStatus exit: execName: %s taskDir: %s status: %s' \
% (self.execName,
schedMisc.shortName( ancDir, self.taskDir),
self.getStatusName(),)
#====================
# Writes the file (sname).status.init,
# with the current date, and sets our self.status = ST_INIT.
[docs] def setStatusInit( self):
self.writeStatusFile( schedMisc.ST_INIT, extra=None)
#====================
# Writes the file (sname).status.ok,
# with the current date, and sets our self.status = ST_OK.
[docs] def setStatusSubmit( self, jobId):
self.writeStatusFile( schedMisc.ST_SUBMIT, extra=jobId)
#====================
# Writes the file (sname).status.wait,
# with the current date, and sets our self.status = ST_WAIT.
[docs] def setStatusWait( self):
self.writeStatusFile( schedMisc.ST_WAIT, extra=None)
#====================
# Writes the file (sname).status.run,
# with the current date, and sets our self.status = ST_RUN.
[docs] def setStatusRun( self):
self.writeStatusFile( schedMisc.ST_RUN, extra=None)
#====================
# Writes the file (sname).status.ok,
# with the current date, and sets our self.status = ST_OK.
[docs] def setStatusOk( self):
self.writeStatusFile( schedMisc.ST_OK, extra=None)
self.gotPosts = False
#====================
# Writes the file (sname).status.error,
# with the current date, and sets our self.status = ST_ERROR.
[docs] def setStatusError( self, errMsg):
self.writeStatusFile( schedMisc.ST_ERROR, extra=errMsg)
self.gotPosts = False
#====================
# Writes the file (sname).status.(statusName),
# with the current date, and sets our self.status = stIx.
[docs] def writeStatusFile( self, stIx, extra=None):
self.status = stIx
schedMisc.writeStatusFile( self.sname, self.taskDir, stIx, extra)
#====================
# Raises an Exception.
[docs] def throwerr( self, msg):
fullMsg = 'taskClass.py: %s\ntask:\n%s' % (msg, self)
raise Exception( fullMsg)
#=====================================================================