#! /usr/bin/python # This script looks at the currently running jobs in the cluster and finds # any compute node that has too many jobs running. If there are too many # jobs running it must be because the priority queue has been filled and # there are other low-priority jobs running on the node. In that case the # low priority jobs are suspended. Once the high priority jobs have finished # this script will unsuspend the suspended jobs. # # Note that QLOGIN sessions are ignored. import sys, re, os # Comand to get the processes running # Setup the SGE environment: os.environ["SGE_ROOT"]="/opt/gridengine" os.environ["SGE_CELL"]="default" os.environ["SGE_ARCH"]="lx26-amd64" os.environ["SGE_EXECD_PORT"]="537" os.environ["SGE_QMASTER_PORT"]="536" DEBUG = False if (len(sys.argv) > 1): if (sys.argv[1] == "DEBUG"): DEBUG = True priorityQueues = ["cva-batch.q", "cva-qlogin.q", "mckeown-batch.q"] qmod_command = "/opt/gridengine/bin/lx26-amd64/qmod" qstat_command = "/opt/gridengine/bin/lx26-amd64/qstat" qhost_command = "/opt/gridengine/bin/lx26-amd64/qhost | grep compute" slotsPerNode = 2 # Get the list of compute names computeNodes = {} qhost = os.popen(qhost_command, "r") for line in qhost.readlines(): match = re.match("(compute-[0-9]+-[0-9]+).*", line) if (match): computeNodes[match.group(1)] = [] qhost.close() # Get the qstat data qstat = os.popen(qstat_command, "r") # Remove the two header lines qstat.readline() qstat.readline() for line in qstat.readlines(): match = re.match(".*(compute-[0-9]+-[0-9]+).*", line) if (match): computeNodes[match.group(1)].append(line) qstat.close() # Now for each compute node look at what it's running: for computeNode in computeNodes: if (DEBUG): print computeNode +":" priorityRunning = 0 lowRunning = 0 lowSuspended = 0 lowRunningJobIDs = [] lowSuspendedJobIDs = [] for job in computeNodes[computeNode]: match = re.search("\s+([0-9]+)\s+(0\\.[0-9]+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)@"+computeNode+"[\.a-z]+\s+([0-9]+)\s+([0-9]*)", job) if (match): jobID = match.group(1) if (match.group(10) != ""): jobID = jobID +"." + match.group(10) jobName = match.group(3) jobUser = match.group(4) jobState = match.group(5) jobQueue = match.group(8) jobSlots = match.group(9) # Ignore QLOGIN jobs if (jobName == "QLOGIN"): continue if (DEBUG): print "\tFound: (" +jobID + ", " + jobName + ", " + jobUser + ", " + jobState + ", " + jobQueue + " jobSlots: " + jobSlots +")" # Determine the type and state of this job. if jobQueue in priorityQueues: priorityRunning += int(jobSlots) else: if (jobState == "r"): lowRunning += int(jobSlots) lowRunningJobIDs.append(jobID) else: lowSuspended += int(jobSlots) lowSuspendedJobIDs.append(jobID) # Now we've gone over all the jobs for this node if (DEBUG): print "\t" + str(priorityRunning) + " priority jobs, " + str(lowRunning) + " low priority running jobs, and " + str(lowSuspended) + " low priority suspended jobs." # Check if we need to suspend anything #if (priorityRunning > (slotsPerNode - lowRunning)): # Sort the jobs so we suspend the newest first (highest job ID) lowRunningJobIDs.sort() lowRunningJobIDs.reverse() numToSuspend = lowRunning - (slotsPerNode - priorityRunning) if (DEBUG): if (numToSuspend > 0): print ">> Number to suspend: " + str(numToSuspend) n = 0 while (numToSuspend > 0): numToSuspend -= 1 #for n in range(0,(slotsPerNode-lowRunning)+1): jobID = lowRunningJobIDs[n] n += 1 command = qmod_command + " -sj " + jobID if (DEBUG): print ">> Suspend: " + command else: os.popen(command, "r") # Sort the jobs so we unsuspend the lowest first (lowest job ID) lowSuspendedJobIDs.sort() numToResume = slotsPerNode - (priorityRunning + lowRunning) if (numToResume > lowSuspended): numToResume = 0 if (DEBUG): if (numToResume > 0): print ">> Number to resume: " + str(numToResume) n = 0 while (numToResume > 0): numToResume -= 1 #for n in range(0, (slotsPerNode-priorityRunning-lowRunning)): jobID = lowSuspendedJobIDs[n] n += 1 command = qmod_command + " -usj " + jobID if (DEBUG): print ">> Resume: " + command else: os.popen(command, "r")