Source code for schedMain.schedMisc

#!/usr/bin/env python

import copy, datetime, os, re, shutil, subprocess, sys, traceback

# Misc constants and utilities for schedMain.py

#=====================================================================

# Task status, used in taskClass.py.
ST_UNKNOWN  = 0
ST_INIT     = 1
ST_SUBMIT   = 2
ST_WAIT     = 3
ST_RUN      = 4
ST_OK       = 5
ST_ERROR    = 6
ST_NUM_STAT = 7   # number of status levels

statusNames = ['unknown', 'init', 'submit', 'wait', 'run', 'ok', 'error']

# Various file names

NM_PREWORK       = "preWork"             # pre-requisite tasks
NM_POSTOKWORK    = "postOkWork"          # tasks to start after OK completion
NM_POSTERRORWORK = "postErrorWork"       # tasks to start after error completion
NM_STATUS        = "status"              # status file name
NM_LOCKFILE      = "schedMain.lockFile"  # global lockfile name

dateFmt = '%Y-%m-%d_%H:%M:%S'

cmdDirName = 'cmd'                  # name of global subdir with scripts

#=====================================================================

[docs]def getPbsIdMap( bugLev, hostType, userId): if hostType == 'hostLocal': pbsMap = {} else: pbsMap = getPbsIdMapSub( bugLev, hostType, userId) return pbsMap #===================================================================== # Get info from the "showq" Moab command. # Returns pbsMap: jobId -> state, like schedmisc.ST_WAIT # # peregrine: # Torque for resource manager(qsub) # BatchHold, Hold, Idle, Running # Moab for workload manager # # stampede: # resource manager: # Simple Linux Utility for Resource Management (SLURM) # Waiting, Running, Complte
[docs]def getPbsIdMapSub( bugLev, hostType, userId): if hostType == 'peregrine': cmdLine = 'showq' elif hostType == 'stampede': cmdLine = 'showq' else: throwerr('unknown hostType: %s' % (hostType,)) (qrc, qstdout, qstderr, qdeltaSec) = runSubprocess( os.getcwd(), # wkDir cmdLine, # cmdLine None, # stdInput os.getcwd() + '/showq', # outPrefix False) # showInfo if qrc != 0: throwerr('getPbsIdMap: showq err. rc: %d\n stderr: %s\n stdout: %s\n' % (qrc, qstderr, qstdout,)) # At peregrine stdout has the format: # 434982 ssulliva Running 24 00:00:26 Fri Jan 24 12:49:42 # 434981 ssulliva Idle 1 00:01:00 Fri Jan 24 12:48:12 # 428796 slany BatchHold 32 5:00:00:00 Wed Jan 22 08:37:33 # 462361 ssulliva Canceling 24 2:37:59 Tue Feb 11 11:34:12 # where the fields are: # jobId userId status procs timea timeb # if status==Running: # timea = remaining, timeb = startTime # if status==Idle or BatchHold: # timea = wcLimit, timeb = queueTime # At stampede stdout has the format: # ACTIVE JOBS-------------------- # JOBID JOBNAME USERNAME STATE CORE REMAINING STARTTIME # ========================================================================== # 2808140 ffs-rst-1 ahakbari Running 256 44:34:33 Sun Feb 23 04:34:03 # 2808152 ffs-rst-2 ahakbari Running 256 44:34:33 Sun Feb 23 04:34:03 # 2850511 qm.875 sylee78 Running 32 23:33:50 Sat Feb 22 07:33:20 # WAITING JOBS-------------------- # JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME # ========================================================================== # 2857287 mpipi tg457571 Waiting 32 0:05:00 Fri Feb 21 10:24:35 # 2857289 mpipi rlong021 Waiting 32 0:05:00 Fri Feb 21 10:24:35 # 2857303 mpipi hmoncada Waiting 32 0:05:00 Fri Feb 21 10:43:37 # BLOCKED JOBS-- # JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME # ========================================================================== # 2808167 ffs-rst-1 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:08:31 # 2808172 ffs-rst-2 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:08:47 # 2808183 ffs-rst-1 ahakbari Waiting 256 48:00:00 Sat Feb 15 14:09:49 # COMPLETING/ERRORED JOBS--------- # JOBID JOBNAME USERNAME STATE CORE WCLIMIT QUEUETIME # ========================================================================== # 2856612 3DRun80 rkashyap Complte 512 0:58:50 Fri Feb 21 09:08:22 # 2868475 enzoSymm tg803229 Complte 32 7:58:34 Sun Feb 23 07:58:06 pbsMap = {} # jobId -> state, like ST_WAIT qlines = qstdout.split('\n') for qline in qlines: qline = qline.strip() qtoks = qline.split() if hostType == 'peregrine': if len(qtoks) == 9 and re.match('^\d+$', qtoks[0]): # ignore headings (qjobId, quserId, qstate) = qtoks[0:3] if qstate in ['BatchHold', 'Hold', 'Idle', 'SystemHold', 'UserHold']: status = ST_WAIT elif qstate in ['Running']: status = ST_RUN else: print 'getPbsMap: unknown status: %s' % (qline,) status = ST_WAIT # assume it's some variant of wait pbsMap[qjobId] = status elif hostType == 'stampede': if len(qtoks) == 10 and re.match('^\d+$', qtoks[0]): # ignore headings (qjobId, qjobName, quserId, qstate) = qtoks[0:4] if qstate in ['Waiting']: status = ST_WAIT elif qstate in ['Running']: status = ST_RUN elif qstate in ['Complte']: status = ST_OK else: print 'getPbsMap: unknown status: %s' % (qline,) status = ST_WAIT # assume it's some variant of wait pbsMap[qjobId] = status else: throwerr('unknown hostType: %s' % (hostType,)) return pbsMap #===================================================================== # Not used. # Get info from the "showq" Moab command. # Returns a map: jobName -> state # where state is one of the Moab tags: BatchHold, Idle, Running, etc.
[docs]def getPbsNameMapUnused( bugLev, userId): # xxx hardcoded 'showq' (qrc, qstdout, qstderr, qdeltaSec) = runSubprocess( os.getcwd(), # wkDir 'showq', # cmdLine None, # stdInput os.getcwd() + '/showq', # outPrefix False) # showInfo if qrc != 0: throwerr('getPbsNameMap: showq err. rc: %d\n stderr: %s\n stdout: %s\n' % (qrc, qstderr, qstdout,)) # stdout has the format: # 434982 ssulliva Running 24 00:00:26 Fri Jan 24 12:49:42 # 434981 ssulliva Idle 1 00:01:00 Fri Jan 24 12:48:12 # 428796 slany BatchHold 32 5:00:00:00 Wed Jan 22 08:37:33 # where the fields are: # jobId userId status procs timea timeb # if status==Running: # timea = remaining, timeb = startTime # if status==Idle or BatchHold: # timea = wcLimit, timeb = queueTime pbsMap = {} # jobName -> state, like ST_WAIT qlines = qstdout.split('\n') for qline in qlines: qline = qline.strip() qtoks = qline.split() if len(qtoks) == 9: # ignore the headings (qjobId, quserId, qstatus) = qtoks[0:3] # Issue 'checkjob' and scan for lines like # AName: someJobName # State: Running (crc, cstdout, cstderr, cdeltaSec) = runSubprocess( os.getcwd(), # wkDir 'checkjob %s' % (qjobId,), # cmdLine None, # stdInput os.getcwd() + '/checkjob', # outPrefix False) # showInfo if crc != 0: throwerr( 'getPbsNameMap: checkjob err. rc: %d\n stderr: %s\n stdout: %s\n' % (crc, cstderr, cstdout,)) clines = cstdout.split('\n') cname = None cstate = None for cline in clines: ctoks = cline.strip().split() if len(ctoks) == 2: ctag = ctoks[0].lower() if ctag == 'aname:': cname = toks[1] if ctag == 'state:': cstate = toks[1] if cname != None and cstate != None: pbsMap[cname] = cstate return pbsMap #===================================================================== # Call subprocess.Popen( cmd, shell=True) # Returns (rc, stdout, stderr, deltaTimeSec).
[docs]def runSubprocess( wkDir, # work dir to use for cmd cmd, # command line stdInput, # stdin to pass to command. Usually None. outPrefix, # prefix for 'x.stdout', 'x.stderr' showInfo): # bool. If True, print cmd, wkDir. timea = datetime.datetime.now() if showInfo: print 'runSubprocess: cmd: %s' % (cmd,) print 'runSubprocess: wkDir: %s' % (wkDir,) pipe = subprocess.PIPE # Append the scheduler dir to PYTHONPATH ourDir = os.path.dirname( __file__) subEnv = copy.deepcopy( os.environ) subEnv['PYTHONPATH'] += ':' + ourDir proc = subprocess.Popen( cmd, shell=True, cwd=wkDir, stdin=pipe, stdout=pipe, stderr=pipe, bufsize=10*1000*1000, env=subEnv) (stdout, stderr) = proc.communicate( stdInput) rc = proc.returncode timeb = datetime.datetime.now() if stdout == None: stdout = '' if stderr == None: stderr = '' if rc != 0: msg = 'subprocess failed.\n' msg += 'wkDir: %s\n' % (wkDir,) msg += 'cmd: %s\n' % (cmd,) msg += 'rc: %d\n' % (rc,) with open( outPrefix + '.stderr', 'w') as fout: fout.write( stderr) with open( outPrefix + '.stdout', 'w') as fout: fout.write( stdout) if showInfo: if stderr != None and len(stderr) > 0: print '===== start stderr =====' sys.stdout.write( stderr) print '===== end stderr =====' if stdout != None and len(stdout) > 0: print '===== start stdout =====' sys.stdout.write( stdout) print '===== end stdout =====' return (rc, stdout, stderr, deltaTimeSec( timea, timeb)) #===================================================================== # Returns the number of tasks not OK.
[docs]def getNumNotOk( tasks): num = 0 if tasks != None: for task in tasks: if task.status != ST_OK: num += 1 return num #===================================================================== # Returns the current date.
[docs]def getCurDate(): return datetime.datetime.now() #===================================================================== # Formats a date.
[docs]def formatDate( dt): if dt == None: res = 'None' else: res = dt.strftime( dateFmt) return res #===================================================================== # Parses a date.
[docs]def parseDate( stg): if stg == None or len(stg) != 19: throwerr('parseDate: invalid stg: "%s"' % (stg,)) else: res = datetime.datetime.strptime( stg, dateFmt) return res #===================================================================== # Returns the difference dateb - datea, in seconds.
[docs]def deltaTimeSec( timea, timeb): td = timeb - timea deltaSec = 24*3600*td.days + td.seconds + 1.e-6*td.microseconds return deltaSec #===================================================================== # Writes the file (sname).status.run, # with the current date.
[docs]def setStatusRun( sname, dirPath ): writeStatusFile( sname, dirPath, ST_RUN, extra=None) #===================================================================== # Writes the file (sname).status.ok, # with the current date.
[docs]def setStatusOk( sname, dirPath ): writeStatusFile( sname, dirPath, ST_OK, extra=None) #===================================================================== # Writes the file (sname).status.error, # with the current date, followed by the specified errMsg.
[docs]def setStatusError( sname, dirPath, errMsg): writeStatusFile( sname, dirPath, ST_ERROR, extra=errMsg) #===================================================================== # Writes the file (sname).status.(statusName), # with the current date, followed by the specified extra info.
[docs]def writeStatusFile( sname, dirPath, stIx, extra=None): spath = os.path.join( dirPath, '%s.%s.%s' % (sname, NM_STATUS, statusNames[stIx],)) if os.path.exists( spath): print 'writeStatusFile: status file already exists: %s' % (spath,) with open( spath) as fin: print ' old content: %s\n' % (fin.read(),) print ' new date: %s' % (formatDate( getCurDate()),) else: with open( spath, 'w') as fout: print >> fout, formatDate( getCurDate()) if extra != None: print >> fout, extra #===================================================================== # Searches the allTasks list for a match on execName and taskDir. # Returns the Task or None.
[docs]def findTask( execName, taskDir, allTasks): toks = execName.split('.') if len(toks) != 2: throwerr('findTask: invalid execName: %s' % (execName,)) sname = toks[0] stype = toks[1] checkNiceName( taskDir + ':' + execName, sname) if stype not in ['sh', 'py', 'pbs']: throwerr('findTask: invalid execName: %s' % (execName,)) if not os.path.isabs( taskDir): throwerr('findTask: taskDir not absolute: %s' % (taskDir,)) if taskDir.endswith('/'): throwerr('findTask: taskDir ends with /: %s' % (taskDir,)) res = None for task in allTasks: if sname == task.sname and taskDir == task.taskDir: if execName != task.execName: throwerr('findTask: duelling execNames for task: %s' % (task,)) res = task break return res #===================================================================== # Removes an entire directory tree.
[docs]def rmTree( ancDir=None, # Some ancestor dir of rmDir, for safety checking rmDir=None): # The directory tree to remove if ancDir == None or len(ancDir) < 2: throwerr('rmTree: ancDir is None or too short') if rmDir == None: throwerr('rmTree: rmDir == None') if not os.path.isabs( ancDir): throwerr('rmTree: ancDir is not abs') if not os.path.isabs( rmDir): throwerr('rmTree: rmDir is not abs') if not rmDir.startswith( ancDir): throwerr('rmTree: rmDir must start with ancDir') if os.path.isdir( rmDir): shutil.rmtree( rmDir) elif os.path.isfile( rmDir) or os.path.islink( rmDir): os.remove( rmDir) # remove ordinary file else: throwerr('rmTree: unknown file type: %s' % (rmDir,)) #===================================================================== # Gets command line parameters from the command line, # and overrides from globalDir/taskParms, # and overrides from curDir/taskParms. # # The taskParms file has the format; # scriptNameRegex dirRegex key value # # Example of file line: # ^myScript.py$ someDir/alpha -bugLev 10 # For the script myScript.py, running in any directory # whose absolute path contains the string 'someDir/alpha', # set the parameter: -bugLev 10
[docs]def getParmMap( scriptName, curDir, specMap): if not specMap.has_key('globalDir'): throwerr('getParms: globalDir not in specMap. script: %s curDir: %s' \ % (scriptName, curDir,)) errMsgs = [] rmap = {} # final result map: keyword -> value # Get command line parms if len(sys.argv) % 2 != 1: errMsgs.append('parms must be key/value pairs.') if len(errMsgs) == 0: for iarg in range( 1, len(sys.argv), 2): key = sys.argv[iarg] value = sys.argv[iarg+1] if not key.startswith('-'): errMsgs.append('invalid key (no -): %s' % (key,)) key = key[1:] # strip leading '-' rmap[key] = getParmValue( specMap, key, value) if rmap[key] == None: errMsgs.append('unknown parm: %s' % (key,)) # Find name of globalDir if not rmap.has_key('globalDir'): errMsgs.append('globalDir not specified') # Get overrides from globalDir/taskParms # Get overrides from curDir/taskParms if rmap.has_key('globalDir'): for dirName in [ rmap['globalDir'], curDir]: fpath = os.path.join( dirName, 'taskParms') if len(errMsgs) == 0 and os.path.exists( fpath): with open( fpath) as fin: flines = fin.readlines() for fline in flines: line = fline.strip() ix = line.find('#') if ix >= 0: line = line[:ix].strip() if len(line) > 0: toks = line.split() if len(toks) != 4: errMsgs.append('invalid line. file path: "%s"\n line: "%s"' \ % (fpath, line)) # Check for scriptName and dirName regex matches if re.search( toks[0], scriptName) \ and re.search( toks[1], os.path.abspath( curDir)): key = toks[2] value = toks[3] rmap[key] = getParmValue( specMap, key, value) if rmap[key] == None: errMsgs.append('unknown parm. file path: "%s"\n line: "%s"' \ % (fpath, line)) keys = specMap.keys() keys.sort() for key in keys: if not rmap.has_key( key): specs = specMap[key] defVal = specs[1] # default value if defVal == None: errMsgs.append('parm not specified: %s' % (key,)) else: rmap[key] = defVal if len(errMsgs) == 0: resObj = rmap else: errMsgs.append('script: %s' % (os.path.abspath( scriptName,))) errMsgs.append('curDir: %s' % (os.path.abspath( curDir,))) resObj = '\n ' + '\n '.join( errMsgs) + '\n' return resObj #===================================================================== # Returns None on error
[docs]def getParmValue( specMap, key, value): vres = None # default = error specs = specMap.get( key, None) # spec = [type, default] if specs != None: specTp = specs[0] if specTp == 'str': vres = value elif specTp == 'int': try: vres = int( value) except ValueError, exc: pass elif specTp == 'float': try: vres = float( value) except ValueError, exc: pass elif specTp == 'bool': if value == 'n': vres = False elif value == 'y': vres = True # Else leave vres as None else: throwerr('unknown specMap entry. key: %s specs: %s' % (key, specs,)) return vres #===================================================================== # Gets command line parameters from a variety of sources: # * lists, like sys.argv, with format: [key, value, key, value,...] # * maps, with format: keyword->value # * files, with lines having the format # scriptNameRegex dirRegex key value # # The last specified value wins. # # Example of file line: # ^myScript.py$ someDir/alpha -bugLev 10 # For the script myScript.py, running in any directory # whose absolute path contains the string 'someDir/alpha', # set the parameter: -bugLev 10
[docs]def getParmsOld( scriptName, curDir, *specs): rmap = {} # final result map: keyword -> value for spec in specs: if type(spec).__name__ == 'list': if len(spec) % 2 != 1: throwerr( 'getParms: parms must be key/value pairs. script: %s curDir: %s' \ % (scriptName, curDir,)) for iarg in range( 1, len(spec), 2): key = spec[iarg] val = spec[iarg+1] rmap[key] = val elif type(spec).__name__ == 'dict': keys = dict.keys() for key in keys: rmap[key] = spec[key] elif type(spec).__name__ == 'str': # file name with open( spec) as fin: flines = fin.readlines() for fline in flines: line = fline.strip() ix = line.find('#') if ix >= 0: line = line[:ix].strip() if len(line) > 0: toks = line.split() if len(toks) != 4: throwerr( 'invalid parm. script: %s curDir: %s file: %s line: %s' \ % (scriptName, curDir, spec, fline)) if re.search( toks[0], scriptName) \ and re.search( toks[1], os.path.abspath( curDir)): resMap[toks[2]] = toks[3] return resMap #===================================================================== # Parses a boolean.
[docs]def parseBool( stgParm): stg = stgParm.lower() if stg in ['n', 'no', 'f', 'false']: res = False elif stg in ['y', 'yes', 't', 'true']: res = True return res #===================================================================== # Converts fpath to a shorter name for display purposes. # Strips off the ancDir prefix.
[docs]def shortName( ancDir, # Some ancestor dir of fpath fpath): # The path to shorten if not fpath.startswith( ancDir): throwerr('shortName: fpath does not start with ancDir. fpath: %s' % (fpath,)) alen = len( ancDir) tstg = fpath[alen+1:] return tstg #===================================================================== # Checks that name is alphaNumeric. # Calls throwerr if not.
[docs]def checkNiceName( msg, # Some informative message name): # The name to check if not re.match( r'^[a-zA-Z0-9_]+', name): throwerr('checkNiceName: invalid name. msg: %s name: "%s"' % (msg, name,)) #=====================================================================
[docs]def mkIndent( ix): res = ix * ' ' return res #===================================================================== # Raises an Exception.
[docs]def throwerr( msg): fullMsg = 'schedmisc.py: ' + msg raise Exception( fullMsg) #=====================================================================