#!/usr/bin/env python
import copy, datetime, os, re, shutil, subprocess, sys, traceback
# Misc constants and utilities for schedMain.py
#=====================================================================
# Task status, used in taskClass.py.
ST_UNKNOWN = 0
ST_INIT = 1
ST_SUBMIT = 2
ST_WAIT = 3
ST_RUN = 4
ST_OK = 5
ST_ERROR = 6
ST_NUM_STAT = 7 # number of status levels
statusNames = ['unknown', 'init', 'submit', 'wait', 'run', 'ok', 'error']
# Various file names
NM_PREWORK = "preWork" # pre-requisite tasks
NM_POSTOKWORK = "postOkWork" # tasks to start after OK completion
NM_POSTERRORWORK = "postErrorWork" # tasks to start after error completion
NM_STATUS = "status" # status file name
NM_LOCKFILE = "schedMain.lockFile" # global lockfile name
dateFmt = '%Y-%m-%d_%H:%M:%S'
cmdDirName = 'cmd' # name of global subdir with scripts
#=====================================================================
[docs]def getPbsIdMap( bugLev, hostType, userId):
if hostType == 'hostLocal': pbsMap = {}
else: pbsMap = getPbsIdMapSub( bugLev, hostType, userId)
return pbsMap
#=====================================================================
# Get info from the "showq" Moab command.
# Returns pbsMap: jobId -> state, like schedmisc.ST_WAIT
#
# peregrine:
# Torque for resource manager(qsub)
# BatchHold, Hold, Idle, Running
# Moab for workload manager
#
# stampede:
# resource manager:
# Simple Linux Utility for Resource Management (SLURM)
# Waiting, Running, Complte
[docs]def getPbsIdMapSub( bugLev, hostType, userId):
if hostType == 'peregrine': cmdLine = 'showq'
elif hostType == 'stampede': cmdLine = 'showq'
else: throwerr('unknown hostType: %s' % (hostType,))
(qrc, qstdout, qstderr, qdeltaSec) = runSubprocess(
os.getcwd(), # wkDir
cmdLine, # cmdLine
None, # stdInput
os.getcwd() + '/showq', # outPrefix
False) # showInfo
if qrc != 0:
throwerr('getPbsIdMap: showq err. rc: %d\n stderr: %s\n stdout: %s\n'
% (qrc, qstderr, qstdout,))
# At peregrine stdout has the format:
# 434982 ssulliva Running 24 00:00:26 Fri Jan 24 12:49:42
# 434981 ssulliva Idle 1 00:01:00 Fri Jan 24 12:48:12
# 428796 slany BatchHold 32 5:00:00:00 Wed Jan 22 08:37:33
# 462361 ssulliva Canceling 24 2:37:59 Tue Feb 11 11:34:12
# where the fields are:
# jobId userId status procs timea timeb
# if status==Running:
# timea = remaining, timeb = startTime
# if status==Idle or BatchHold:
# timea = wcLimit, timeb = queueTime
# At stampede stdout has the format:
# ACTIVE JOBS--------------------
# JOBID JOBNAME USERNAME STATE CORE REMAINING STARTTIME
# ==========================================================================
# 2808140 ffs-rst-1 ahakbari Running 256 44:34:33 Sun Feb 23 04:34:03
# 2808152 ffs-rst-2 ahakbari Running 256 44:34:33 Sun Feb 23 04:34:03
# 2850511 qm.875 sylee78 Running 32 23:33:50 Sat Feb 22 07:33:20
# WAITING JOBS--------------------
# JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME
# ==========================================================================
# 2857287 mpipi tg457571 Waiting 32 0:05:00 Fri Feb 21 10:24:35
# 2857289 mpipi rlong021 Waiting 32 0:05:00 Fri Feb 21 10:24:35
# 2857303 mpipi hmoncada Waiting 32 0:05:00 Fri Feb 21 10:43:37
# BLOCKED JOBS--
# JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME
# ==========================================================================
# 2808167 ffs-rst-1 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:08:31
# 2808172 ffs-rst-2 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:08:47
# 2808183 ffs-rst-1 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:09:49
# COMPLETING/ERRORED JOBS---------
# JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME
# ==========================================================================
# 2856612 3DRun80 rkashyap Complte 512 0:58:50 Fri Feb 21 09:08:22
# 2868475 enzoSymm tg803229 Complte 32 7:58:34 Sun Feb 23 07:58:06
pbsMap = {} # jobId -> state, like ST_WAIT
qlines = qstdout.split('\n')
for qline in qlines:
qline = qline.strip()
qtoks = qline.split()
if hostType == 'peregrine':
if len(qtoks) == 9 and re.match('^\d+$', qtoks[0]): # ignore headings
(qjobId, quserId, qstate) = qtoks[0:3]
if qstate in ['BatchHold', 'Hold', 'Idle', 'SystemHold', 'UserHold']:
status = ST_WAIT
elif qstate in ['Running']: status = ST_RUN
else:
print 'getPbsMap: unknown status: %s' % (qline,)
status = ST_WAIT # assume it's some variant of wait
pbsMap[qjobId] = status
elif hostType == 'stampede':
if len(qtoks) == 10 and re.match('^\d+$', qtoks[0]): # ignore headings
(qjobId, qjobName, quserId, qstate) = qtoks[0:4]
if qstate in ['Waiting']: status = ST_WAIT
elif qstate in ['Running']: status = ST_RUN
elif qstate in ['Complte']: status = ST_OK
else:
print 'getPbsMap: unknown status: %s' % (qline,)
status = ST_WAIT # assume it's some variant of wait
pbsMap[qjobId] = status
else: throwerr('unknown hostType: %s' % (hostType,))
return pbsMap
#=====================================================================
# Not used.
# Get info from the "showq" Moab command.
# Returns a map: jobName -> state
# where state is one of the Moab tags: BatchHold, Idle, Running, etc.
[docs]def getPbsNameMapUnused( bugLev, userId):
# xxx hardcoded 'showq'
(qrc, qstdout, qstderr, qdeltaSec) = runSubprocess(
os.getcwd(), # wkDir
'showq', # cmdLine
None, # stdInput
os.getcwd() + '/showq', # outPrefix
False) # showInfo
if qrc != 0:
throwerr('getPbsNameMap: showq err. rc: %d\n stderr: %s\n stdout: %s\n'
% (qrc, qstderr, qstdout,))
# stdout has the format:
# 434982 ssulliva Running 24 00:00:26 Fri Jan 24 12:49:42
# 434981 ssulliva Idle 1 00:01:00 Fri Jan 24 12:48:12
# 428796 slany BatchHold 32 5:00:00:00 Wed Jan 22 08:37:33
# where the fields are:
# jobId userId status procs timea timeb
# if status==Running:
# timea = remaining, timeb = startTime
# if status==Idle or BatchHold:
# timea = wcLimit, timeb = queueTime
pbsMap = {} # jobName -> state, like ST_WAIT
qlines = qstdout.split('\n')
for qline in qlines:
qline = qline.strip()
qtoks = qline.split()
if len(qtoks) == 9: # ignore the headings
(qjobId, quserId, qstatus) = qtoks[0:3]
# Issue 'checkjob' and scan for lines like
# AName: someJobName
# State: Running
(crc, cstdout, cstderr, cdeltaSec) = runSubprocess(
os.getcwd(), # wkDir
'checkjob %s' % (qjobId,), # cmdLine
None, # stdInput
os.getcwd() + '/checkjob', # outPrefix
False) # showInfo
if crc != 0:
throwerr(
'getPbsNameMap: checkjob err. rc: %d\n stderr: %s\n stdout: %s\n'
% (crc, cstderr, cstdout,))
clines = cstdout.split('\n')
cname = None
cstate = None
for cline in clines:
ctoks = cline.strip().split()
if len(ctoks) == 2:
ctag = ctoks[0].lower()
if ctag == 'aname:': cname = toks[1]
if ctag == 'state:': cstate = toks[1]
if cname != None and cstate != None:
pbsMap[cname] = cstate
return pbsMap
#=====================================================================
# Call subprocess.Popen( cmd, shell=True)
# Returns (rc, stdout, stderr, deltaTimeSec).
[docs]def runSubprocess(
wkDir, # work dir to use for cmd
cmd, # command line
stdInput, # stdin to pass to command. Usually None.
outPrefix, # prefix for 'x.stdout', 'x.stderr'
showInfo): # bool. If True, print cmd, wkDir.
timea = datetime.datetime.now()
if showInfo:
print 'runSubprocess: cmd: %s' % (cmd,)
print 'runSubprocess: wkDir: %s' % (wkDir,)
pipe = subprocess.PIPE
# Append the scheduler dir to PYTHONPATH
ourDir = os.path.dirname( __file__)
subEnv = copy.deepcopy( os.environ)
subEnv['PYTHONPATH'] += ':' + ourDir
proc = subprocess.Popen(
cmd,
shell=True,
cwd=wkDir,
stdin=pipe, stdout=pipe, stderr=pipe,
bufsize=10*1000*1000,
env=subEnv)
(stdout, stderr) = proc.communicate( stdInput)
rc = proc.returncode
timeb = datetime.datetime.now()
if stdout == None: stdout = ''
if stderr == None: stderr = ''
if rc != 0:
msg = 'subprocess failed.\n'
msg += 'wkDir: %s\n' % (wkDir,)
msg += 'cmd: %s\n' % (cmd,)
msg += 'rc: %d\n' % (rc,)
with open( outPrefix + '.stderr', 'w') as fout:
fout.write( stderr)
with open( outPrefix + '.stdout', 'w') as fout:
fout.write( stdout)
if showInfo:
if stderr != None and len(stderr) > 0:
print '===== start stderr ====='
sys.stdout.write( stderr)
print '===== end stderr ====='
if stdout != None and len(stdout) > 0:
print '===== start stdout ====='
sys.stdout.write( stdout)
print '===== end stdout ====='
return (rc, stdout, stderr, deltaTimeSec( timea, timeb))
#=====================================================================
# Returns the number of tasks not OK.
[docs]def getNumNotOk( tasks):
num = 0
if tasks != None:
for task in tasks:
if task.status != ST_OK: num += 1
return num
#=====================================================================
# Returns the current date.
[docs]def getCurDate():
return datetime.datetime.now()
#=====================================================================
# Formats a date.
[docs]def parseDate( stg):
if stg == None or len(stg) != 19:
throwerr('parseDate: invalid stg: "%s"' % (stg,))
else: res = datetime.datetime.strptime( stg, dateFmt)
return res
#=====================================================================
# Returns the difference dateb - datea, in seconds.
[docs]def deltaTimeSec( timea, timeb):
td = timeb - timea
deltaSec = 24*3600*td.days + td.seconds + 1.e-6*td.microseconds
return deltaSec
#=====================================================================
# Writes the file (sname).status.run,
# with the current date.
[docs]def setStatusRun( sname, dirPath ):
writeStatusFile( sname, dirPath, ST_RUN, extra=None)
#=====================================================================
# Writes the file (sname).status.ok,
# with the current date.
[docs]def setStatusOk( sname, dirPath ):
writeStatusFile( sname, dirPath, ST_OK, extra=None)
#=====================================================================
# Writes the file (sname).status.error,
# with the current date, followed by the specified errMsg.
[docs]def setStatusError( sname, dirPath, errMsg):
writeStatusFile( sname, dirPath, ST_ERROR, extra=errMsg)
#=====================================================================
# Writes the file (sname).status.(statusName),
# with the current date, followed by the specified extra info.
[docs]def writeStatusFile( sname, dirPath, stIx, extra=None):
spath = os.path.join(
dirPath,
'%s.%s.%s' % (sname, NM_STATUS, statusNames[stIx],))
if os.path.exists( spath):
print 'writeStatusFile: status file already exists: %s' % (spath,)
with open( spath) as fin:
print ' old content: %s\n' % (fin.read(),)
print ' new date: %s' % (formatDate( getCurDate()),)
else:
with open( spath, 'w') as fout:
print >> fout, formatDate( getCurDate())
if extra != None: print >> fout, extra
#=====================================================================
# Searches the allTasks list for a match on execName and taskDir.
# Returns the Task or None.
[docs]def findTask( execName, taskDir, allTasks):
toks = execName.split('.')
if len(toks) != 2:
throwerr('findTask: invalid execName: %s' % (execName,))
sname = toks[0]
stype = toks[1]
checkNiceName( taskDir + ':' + execName, sname)
if stype not in ['sh', 'py', 'pbs']:
throwerr('findTask: invalid execName: %s' % (execName,))
if not os.path.isabs( taskDir):
throwerr('findTask: taskDir not absolute: %s' % (taskDir,))
if taskDir.endswith('/'):
throwerr('findTask: taskDir ends with /: %s' % (taskDir,))
res = None
for task in allTasks:
if sname == task.sname and taskDir == task.taskDir:
if execName != task.execName:
throwerr('findTask: duelling execNames for task: %s' % (task,))
res = task
break
return res
#=====================================================================
# Removes an entire directory tree.
[docs]def rmTree(
ancDir=None, # Some ancestor dir of rmDir, for safety checking
rmDir=None): # The directory tree to remove
if ancDir == None or len(ancDir) < 2:
throwerr('rmTree: ancDir is None or too short')
if rmDir == None: throwerr('rmTree: rmDir == None')
if not os.path.isabs( ancDir): throwerr('rmTree: ancDir is not abs')
if not os.path.isabs( rmDir): throwerr('rmTree: rmDir is not abs')
if not rmDir.startswith( ancDir):
throwerr('rmTree: rmDir must start with ancDir')
if os.path.isdir( rmDir):
shutil.rmtree( rmDir)
elif os.path.isfile( rmDir) or os.path.islink( rmDir):
os.remove( rmDir) # remove ordinary file
else: throwerr('rmTree: unknown file type: %s' % (rmDir,))
#=====================================================================
# Gets command line parameters from the command line,
# and overrides from globalDir/taskParms,
# and overrides from curDir/taskParms.
#
# The taskParms file has the format;
# scriptNameRegex dirRegex key value
#
# Example of file line:
# ^myScript.py$ someDir/alpha -bugLev 10
# For the script myScript.py, running in any directory
# whose absolute path contains the string 'someDir/alpha',
# set the parameter: -bugLev 10
[docs]def getParmMap(
scriptName,
curDir,
specMap):
if not specMap.has_key('globalDir'):
throwerr('getParms: globalDir not in specMap. script: %s curDir: %s' \
% (scriptName, curDir,))
errMsgs = []
rmap = {} # final result map: keyword -> value
# Get command line parms
if len(sys.argv) % 2 != 1:
errMsgs.append('parms must be key/value pairs.')
if len(errMsgs) == 0:
for iarg in range( 1, len(sys.argv), 2):
key = sys.argv[iarg]
value = sys.argv[iarg+1]
if not key.startswith('-'):
errMsgs.append('invalid key (no -): %s' % (key,))
key = key[1:] # strip leading '-'
rmap[key] = getParmValue( specMap, key, value)
if rmap[key] == None: errMsgs.append('unknown parm: %s' % (key,))
# Find name of globalDir
if not rmap.has_key('globalDir'):
errMsgs.append('globalDir not specified')
# Get overrides from globalDir/taskParms
# Get overrides from curDir/taskParms
if rmap.has_key('globalDir'):
for dirName in [ rmap['globalDir'], curDir]:
fpath = os.path.join( dirName, 'taskParms')
if len(errMsgs) == 0 and os.path.exists( fpath):
with open( fpath) as fin:
flines = fin.readlines()
for fline in flines:
line = fline.strip()
ix = line.find('#')
if ix >= 0: line = line[:ix].strip()
if len(line) > 0:
toks = line.split()
if len(toks) != 4:
errMsgs.append('invalid line. file path: "%s"\n line: "%s"' \
% (fpath, line))
# Check for scriptName and dirName regex matches
if re.search( toks[0], scriptName) \
and re.search( toks[1], os.path.abspath( curDir)):
key = toks[2]
value = toks[3]
rmap[key] = getParmValue( specMap, key, value)
if rmap[key] == None:
errMsgs.append('unknown parm. file path: "%s"\n line: "%s"' \
% (fpath, line))
keys = specMap.keys()
keys.sort()
for key in keys:
if not rmap.has_key( key):
specs = specMap[key]
defVal = specs[1] # default value
if defVal == None:
errMsgs.append('parm not specified: %s' % (key,))
else: rmap[key] = defVal
if len(errMsgs) == 0: resObj = rmap
else:
errMsgs.append('script: %s' % (os.path.abspath( scriptName,)))
errMsgs.append('curDir: %s' % (os.path.abspath( curDir,)))
resObj = '\n ' + '\n '.join( errMsgs) + '\n'
return resObj
#=====================================================================
# Returns None on error
[docs]def getParmValue(
specMap,
key,
value):
vres = None # default = error
specs = specMap.get( key, None) # spec = [type, default]
if specs != None:
specTp = specs[0]
if specTp == 'str': vres = value
elif specTp == 'int':
try: vres = int( value)
except ValueError, exc: pass
elif specTp == 'float':
try: vres = float( value)
except ValueError, exc: pass
elif specTp == 'bool':
if value == 'n': vres = False
elif value == 'y': vres = True
# Else leave vres as None
else:
throwerr('unknown specMap entry. key: %s specs: %s' % (key, specs,))
return vres
#=====================================================================
# Gets command line parameters from a variety of sources:
# * lists, like sys.argv, with format: [key, value, key, value,...]
# * maps, with format: keyword->value
# * files, with lines having the format
# scriptNameRegex dirRegex key value
#
# The last specified value wins.
#
# Example of file line:
# ^myScript.py$ someDir/alpha -bugLev 10
# For the script myScript.py, running in any directory
# whose absolute path contains the string 'someDir/alpha',
# set the parameter: -bugLev 10
[docs]def getParmsOld(
scriptName,
curDir,
*specs):
rmap = {} # final result map: keyword -> value
for spec in specs:
if type(spec).__name__ == 'list':
if len(spec) % 2 != 1:
throwerr(
'getParms: parms must be key/value pairs. script: %s curDir: %s' \
% (scriptName, curDir,))
for iarg in range( 1, len(spec), 2):
key = spec[iarg]
val = spec[iarg+1]
rmap[key] = val
elif type(spec).__name__ == 'dict':
keys = dict.keys()
for key in keys:
rmap[key] = spec[key]
elif type(spec).__name__ == 'str': # file name
with open( spec) as fin:
flines = fin.readlines()
for fline in flines:
line = fline.strip()
ix = line.find('#')
if ix >= 0: line = line[:ix].strip()
if len(line) > 0:
toks = line.split()
if len(toks) != 4:
throwerr(
'invalid parm. script: %s curDir: %s file: %s line: %s' \
% (scriptName, curDir, spec, fline))
if re.search( toks[0], scriptName) \
and re.search( toks[1], os.path.abspath( curDir)):
resMap[toks[2]] = toks[3]
return resMap
#=====================================================================
# Parses a boolean.
[docs]def parseBool( stgParm):
stg = stgParm.lower()
if stg in ['n', 'no', 'f', 'false']: res = False
elif stg in ['y', 'yes', 't', 'true']: res = True
return res
#=====================================================================
# Converts fpath to a shorter name for display purposes.
# Strips off the ancDir prefix.
[docs]def shortName(
ancDir, # Some ancestor dir of fpath
fpath): # The path to shorten
if not fpath.startswith( ancDir):
throwerr('shortName: fpath does not start with ancDir. fpath: %s'
% (fpath,))
alen = len( ancDir)
tstg = fpath[alen+1:]
return tstg
#=====================================================================
# Checks that name is alphaNumeric.
# Calls throwerr if not.
[docs]def checkNiceName(
msg, # Some informative message
name): # The name to check
if not re.match( r'^[a-zA-Z0-9_]+', name):
throwerr('checkNiceName: invalid name. msg: %s name: "%s"'
% (msg, name,))
#=====================================================================
[docs]def mkIndent( ix):
res = ix * ' '
return res
#=====================================================================
# Raises an Exception.
[docs]def throwerr( msg):
fullMsg = 'schedmisc.py: ' + msg
raise Exception( fullMsg)
#=====================================================================