Files
maui/src/moab/MQueue.c
T
bchristiansen 0d182829d2 FEATURE: PATCH:Eygene Ryabinkin - Fitting job into partition that works.
Problem Ref:	[MAUIUSERS] PATCH FOR DEFAULT PARTITION HANDLING
Organization:	Eygene
Contact:	Eygene Ryabinkin



git-svn-id: svn://opensvn.adaptivecomputing.com/maui/trunk@99 3f5042e3-fb1d-0410-be18-d6ca2573e517
2008-10-03 21:02:55 +00:00

2052 lines
45 KiB
C

/* HEADER */
/* Contains: *
* int MQueueInitialize(Q,QueueName) *
* */
#include "moab.h"
#include "msched-proto.h"
extern mlog_t mlog;
extern mjob_t *MJob[];
extern mnode_t *MNode[];
extern mpar_t MPar[];
extern mrm_t MRM[];
extern mstat_t MStat;
extern mattrlist_t MAList;
extern msched_t MSched;
extern int MAQ[];
extern const char *MHoldType[];
extern const char *MPolicyMode[];
extern const char *MDefReason[];
extern const char *MAllocRejType[];
extern const char *MJRType[];
extern const char *MBFPolicy[];
extern const char *MXO[];
extern const char *MJobDependType[];
int MQueueInitialize(
mjob_t **Q, /* I (modified) */
char *QName) /* I */
{
if ((Q == NULL) || (QName == NULL))
{
return(FAILURE);
}
if ((*Q = (mjob_t *)calloc(1,sizeof(mjob_t))) == NULL)
{
DBG(1,fSTRUCT) DPrint("ALERT: cannot allocate memory for queue %s, errno: %d (%s)\n",
(QName != NULL) ? QName : "NULL",
errno,
strerror(errno));
return(FAILURE);
}
MUStrCpy((*Q)->Name,QName,sizeof((*Q)->Name));
(*Q)->Next = *Q;
(*Q)->Prev = *Q;
return(SUCCESS);
} /* END MQueueInitialize() */
int MQueuePrioritizeJobs(
mjob_t **Q, /* I: list of jobs to be prioritized (optional) */
int *JobIndex) /* O: hi-low priority sorted array of job indexes */
{
double tmpD;
int jindex;
long MaxIdleStartPriority = 0;
mjob_t *J;
if (JobIndex != NULL)
jindex = 0;
if ((Q == NULL) ||
(Q[0] == NULL) ||
(Q[0]->Next == NULL) ||
(Q[0]->Next == Q[0]))
{
/* no queue specified, search full job table */
for (jindex = 0;JobIndex[jindex] != -1;jindex++)
{
J = MJob[JobIndex[jindex]];
MJobGetStartPriority(J,0,&tmpD,0,NULL,NULL);
J->StartPriority = (unsigned long)tmpD;
if ((J->State == mjsStarting) || (J->State == mjsRunning))
{
MJobGetRunPriority(J,0,&tmpD,NULL);
J->RunPriority = (unsigned long)tmpD;
}
else
{
MaxIdleStartPriority = MAX(MaxIdleStartPriority,J->StartPriority);
}
}
}
else
{
jindex = 0;
for (J = Q[0]->Next;J != Q[0];J = J->Next)
{
MJobGetStartPriority(J,0,&tmpD,0,NULL,NULL);
J->StartPriority = (unsigned long)tmpD;
if ((J->State == mjsStarting) || (J->State == mjsRunning))
{
MJobGetRunPriority(J,0,&tmpD,NULL);
J->RunPriority = (unsigned long)tmpD;
}
else
{
MaxIdleStartPriority = MAX(MaxIdleStartPriority,J->StartPriority);
}
if (JobIndex != NULL)
JobIndex[jindex++] = J->Index;
}
} /* END for (J) */
if (JobIndex != NULL)
{
if (jindex > 1)
{
qsort(
(void *)&JobIndex[0],
jindex,
sizeof(int),
(int(*)(const void *,const void *))MJobStartPrioComp);
}
JobIndex[jindex] = -1;
#if !defined(__MALWAYSPREEMPT)
/* adjust preemption backfill status */
for (jindex = 0;JobIndex[jindex] != -1;jindex++)
{
J = MJob[JobIndex[jindex]];
if (((J->State == mjsStarting) || (J->State == mjsRunning)) &&
(J->StartPriority >= MaxIdleStartPriority) &&
(J->Flags & (1 << mjfBackfill)) &&
(J->Flags & (1 << mjfPreemptee)))
{
J->SpecFlags ^= (1 << mjfPreemptee);
MJobUpdateFlags(J);
DBG(2,fCORE) DPrint("INFO: backfill job '%s' no longer preemptible (%ld > %ld)\n",
J->Name,
J->StartPriority,
MaxIdleStartPriority);
}
}
#endif /* !defined(__MALWAYSPREEMPT) */
} /* END if (JobIndex != NULL) */
return(SUCCESS);
} /* END MQueuePrioritizeJobs() */
int MOQueueInitialize(
int *Queue) /* I */
{
if (Queue == NULL)
{
return(FAILURE);
}
Queue[0] = -1;
return(SUCCESS);
} /* END MOQueueInitialize() */
int MOQueueDestroy(
int *Queue, /* I (modified) */
int ModifyStats) /* I (boolean) */
{
int jindex;
mjob_t *J;
if (Queue == NULL)
{
return(FAILURE);
}
if (ModifyStats == TRUE)
{
/* adjust idle job stats */
for (jindex = 0;Queue[jindex] != -1;jindex++)
{
J = MJob[Queue[jindex]];
if ((J != NULL) && (J->State == mjsIdle))
{
/* remove idle job from object stats */
MPolicyAdjustUsage(NULL,J,NULL,mlIdle,MPar[0].L.IP,-1,-1,NULL);
MPolicyAdjustUsage(NULL,J,NULL,mlIdle,NULL,-1,-1,NULL);
MStat.IdleJobs--;
}
Queue[jindex] = -1;
} /* END for (jindex) */
} /* END if (ModifyStats == TRUE) */
Queue[0] = -1;
return(SUCCESS);
} /* END MOQueueDestroy() */
int MQueueBackFill(
int *BFQueue,
int PLevel,
mpar_t *P)
{
int BFNodeCount;
int BFProcCount;
long BFTime;
long OBFTime;
nodelist_t BFNodeList;
int tmpQ[MAX_MJOB];
mjob_t *JList[MAX_MJOB];
mjob_t *J;
int jindex;
int index;
long AdjBFTime;
double tmpD;
mcres_t DRes;
const char *FName = "MQueueBackFill";
DBG(1,fSCHED) DPrint("%s(BFQueue,%s,%s)\n",
FName,
MPolicyMode[PLevel],
(P != NULL) ? P->Name : "NULL");
if (P == NULL)
{
return(FAILURE);
}
memset(&DRes,0,sizeof(DRes));
DRes.Procs = 1;
/* backfill partition */
#ifdef __MLONGESTBFWINDOWFIRST
OBFTime = MAX_MTIME;
#else /* __MLONGESTBFWINDOWFIRST */
OBFTime = 0;
#endif /* __MLONGESTBFWINDOWFIRST */
/* process normal backfill */
while (MBFGetWindow(
&BFNodeCount,
&BFProcCount,
BFNodeList,
&BFTime,
OBFTime,
P,
ALL,
ALL,
ALL,
0,
0,
1,
&DRes,
NULL,
NULL,
NULL,
NULL) == SUCCESS)
{
DBG(3,fSCHED) DPrint("INFO: backfill window obtained [%d nodes/%d procs : %s]\n",
BFNodeCount,
BFProcCount,
MULToTString(BFTime));
OBFTime = BFTime;
MUNLGetMaxAVal((mnalloc_t *)BFNodeList,mnaSpeed,NULL,(void **)&tmpD);
AdjBFTime = (int)((double)BFTime * tmpD);
MOQueueInitialize(tmpQ);
if (MQueueSelectJobs(
BFQueue,
tmpQ,
PLevel,
BFNodeCount,
BFProcCount,
AdjBFTime,
P->Index,
NULL,
FALSE,
FALSE) == FAILURE)
{
DBG(5,fSCHED) DPrint("INFO: no jobs meet BF window criteria in partition %s\n",
P->Name);
MOQueueDestroy(tmpQ,FALSE);
continue;
} /* END if (MQueueSelectJobs() == FAILURE) */
/* remove ineligible jobs: reserved jobs, nobf QOS jobs, etc */
if (tmpQ[0] == -1)
{
MOQueueDestroy(tmpQ,FALSE);
continue;
}
index = 0;
for (jindex = 0;tmpQ[jindex] != -1;jindex++)
{
J = MJob[tmpQ[jindex]];
if (J->R != NULL)
{
/* do not backfill reserved jobs */
continue;
}
if ((J->Cred.Q != NULL) && (J->Cred.Q->Flags & (1 << mqfnobf)))
{
/* job QOS disables backfill */
continue;
}
if (index != jindex)
tmpQ[index] = tmpQ[jindex];
JList[index] = J;
index++;
if ((MPar[0].BFDepth > 0) &&
(index >= MPar[0].BFDepth))
{
break;
}
} /* END for (jindex) */
/* terminate queues */
tmpQ[index] = -1;
JList[index] = NULL;
switch(MPar[0].BFPolicy)
{
case bfPREEMPT:
MBFPreempt(
JList,
PLevel,
BFNodeList,
BFTime,
BFNodeCount,
BFProcCount,
P);
break;
case bfFIRSTFIT:
MBFFirstFit(
JList,
PLevel,
BFNodeList,
BFTime,
BFNodeCount,
BFProcCount,
P);
break;
case bfBESTFIT:
MBFBestFit(
JList,
PLevel,
BFNodeList,
BFTime,
BFNodeCount,
BFProcCount,
P);
break;
case bfGREEDY:
MBFGreedy(
JList,
PLevel,
BFNodeList,
BFTime,
BFNodeCount,
BFProcCount,
P);
break;
case bfNONE:
return(SUCCESS);
/*NOTREACHED*/
break;
default:
DBG(0,fSCHED) DPrint("ERROR: unexpected backfill policy %d (using %s)\n",
MPar[0].BFPolicy,
MBFPolicy[DEFAULT_BACKFILLPOLICY]);
MPar[0].BFPolicy = DEFAULT_BACKFILLPOLICY;
MBFFirstFit(
JList,
PLevel,
BFNodeList,
BFTime,
BFNodeCount,
BFProcCount,
P);
break;
} /* END switch (MPar[0].BFPolicy) */
MOQueueDestroy(tmpQ,FALSE);
} /* END while (MBFGetWindow() == SUCCESS) */
return(SUCCESS);
} /* END MQueueBackFill() */
int MQueueScheduleIJobs(
int *Q, /* I */
mpar_t *P) /* I */
{
int jindex;
mnodelist_t MNodeList;
mnodelist_t tmpMNodeList;
int index;
int sindex;
mjob_t *J;
mreq_t *RQ;
mbool_t ResCountRej;
int rqindex;
int SchedCount;
char NodeMap[MAX_MNODE];
int PResCount[MAX_MQOS];
int BestSIndex;
int BestSValue;
int IdleJobFound;
int rindex;
int qindex;
long tmpL;
int AllowPreemption;
unsigned long DefaultSpecWCLimit;
int DefaultTaskCount[MAX_MREQ_PER_JOB];
int NAPolicy;
const char *FName = "MQueueScheduleIJobs";
DBG(2,fSCHED) DPrint("%s(Q,%s)\n",
FName,
(P != NULL) ? P->Name : NULL);
if ((P == NULL) || (Q == NULL))
{
return(FAILURE);
}
if (MLocalQueueScheduleIJobs(Q,P) == SUCCESS)
{
return(SUCCESS);
}
/*
NOTE:
need a new job metric. must find schedule with best completion
times for most highest priority jobs. NOTE: this will significantly
affect current resource weightings. Do we weight by job size, ie
V = P * TC * f(Tc)?
*/
if (Q[0] == -1)
{
DBG(2,fSCHED) DPrint("INFO: no jobs in queue\n");
return(FAILURE);
}
IdleJobFound = FALSE;
SchedCount = 0;
memset(PResCount,0,sizeof(PResCount));
/* attempt to start all feasible jobs */
/* jobs listed in priority FIFO order */
/* reserved jobs included in list */
for (jindex = 0;Q[jindex] != -1;jindex++)
{
J = MJob[Q[jindex]];
MTRAPJOB(J,FName);
DBG(7,fSCHED) DPrint("INFO: checking job '%s'\n",
J->Name);
if ((J->EState == mjsStarting) ||
(J->State == mjsStarting) ||
(J->EState == mjsRunning) ||
(J->State == mjsRunning) ||
(J->State == mjsSuspended))
{
/* continue if job started in MQueueScheduleRJobs() */
DBG(7,fSCHED) DPrint("INFO: job '%s' not considered for prio scheduling (State/EState)\n",
J->Name);
continue;
}
if ((J->R != NULL) &&
((J->R->PtIndex != P->Index) || (J->R->StartTime > MSched.Time)))
{
/* job already has reservation */
DBG(7,fSCHED) DPrint("INFO: job '%s' not considered for prio scheduling (existing reservation)\n",
J->Name);
/* locate QOS group */
for (rindex = 0;rindex < MAX_MQOS;rindex++)
{
if (MPar[0].ResDepth[rindex] == 0)
continue;
for (qindex = 0;qindex < MAX_MQOS;qindex++)
{
if (MPar[0].ResQOSList[rindex][qindex] == NULL)
break;
if ((MPar[0].ResQOSList[rindex][qindex] == J->Cred.Q) ||
(MPar[0].ResQOSList[rindex][qindex] == (mqos_t *)MAX_MQOS))
{
break;
}
} /* END for (qindex) */
if ((MPar[0].ResQOSList[rindex][qindex] == J->Cred.Q) ||
(MPar[0].ResQOSList[rindex][qindex] == (mqos_t *)MAX_MQOS))
{
break;
}
} /* END for (rindex) */
if (rindex != MAX_MQOS)
{
PResCount[rindex]++;
}
continue;
} /* END if ((J->R != NULL) && ...) */
if (IdleJobFound == TRUE)
{
DBG(7,fSCHED) DPrint("INFO: job '%s' not considered for prio scheduling (Idle Job Reached)\n",
J->Name);
/* reserve job if eligible */
if (MJobCheckLimits(
J,
ptSOFT,
P,
(1 << mlActive),
NULL) == FAILURE)
{
/* continue if jobs started on this iteration */
/* cause this job to now violate active policies */
DBG(5,fSCHED) DPrint("INFO: skipping job '%s' (job now violates active policy)\n",
J->Name);
continue;
}
if ((MJobPReserve(J,P->Index,PResCount,NULL) != SUCCESS) ||
(J->R == NULL) ||
(J->R->StartTime > MSched.Time))
{
/* reservation made for idle job */
continue;
}
/* job reservation created for immediate use (schedule job) */
} /* END if (IdleJobFound == TRUE) */
MStat.JobsEvaluated++;
BestSValue = 0;
BestSIndex = -1;
NAPolicy = (J->Req[0]->NAllocPolicy != NULL) ?
J->Req[0]->NAllocPolicy->NAllocPolicy :
MPar[J->Req[0]->PtIndex].NAllocPolicy;
DefaultSpecWCLimit = J->SpecWCLimit[0];
for (rqindex = 0;J->Req[rqindex] != NULL;rqindex++)
{
DefaultTaskCount[rqindex] = J->Req[rqindex]->TaskRequestList[0];
} /* END for (rqindex) */
for (sindex = 1;J->Req[0]->TaskRequestList[sindex] > 0;sindex++)
{
J->SpecWCLimit[0] = J->SpecWCLimit[sindex];
J->Request.TC = 0;
for (rqindex = 0;J->Req[rqindex] != NULL;rqindex++)
{
RQ = J->Req[rqindex];
RQ->TaskRequestList[0] = RQ->TaskRequestList[sindex];
RQ->TaskCount = RQ->TaskRequestList[sindex];
J->Request.TC += RQ->TaskRequestList[sindex];
} /* END for (rqindex) */
/* is the best strategy widest fit? ie, local greedy? */
MJobUpdateResourceCache(J,sindex);
if (MJobCheckLimits(
J,
ptSOFT,
P,
(1 << mlActive),
NULL) == FAILURE)
{
/* continue if jobs started on this iteration */
/* cause this job to now violate active policies */
DBG(5,fSCHED) DPrint("INFO: skipping job '%s' (job violates active policy')\n",
J->Name);
continue;
}
DBG(4,fSCHED) DPrint("INFO: checking job %s(%d) state: %s (ex: %s)\n",
J->Name,
sindex,
MJobState[J->State],
MJobState[J->EState]);
/* select resources for job */
if (MPar[0].BFPolicy == bfPREEMPT)
{
/* all priority jobs are preemptors */
AllowPreemption = TRUE;
}
else
{
AllowPreemption = MBNOTSET;
}
if (MJobSelectMNL(
J,
P,
NULL,
MNodeList,
NodeMap,
AllowPreemption) == FAILURE)
{
DBG(6,fSCHED) DPrint("INFO: cannot locate %d tasks for job '%s' in partition %s\n",
J->Request.TC,
J->Name,
P->Name);
continue;
} /* END: if (MJobSelectMNL() == FAILURE) */
BestSValue = 1;
BestSIndex = sindex;
} /* END for (sindex) */
J->SpecWCLimit[0] = DefaultSpecWCLimit;
J->Request.TC = 0;
for (rqindex = 0;J->Req[rqindex] != NULL;rqindex++)
{
RQ = J->Req[rqindex];
RQ->TaskRequestList[0] = DefaultTaskCount[rqindex];
RQ->TaskCount = DefaultTaskCount[rqindex];
J->Request.TC += DefaultTaskCount[rqindex];
} /* END for (rqindex) */
MJobUpdateResourceCache(J,0);
if (BestSValue <= 0)
{
/* no available slot located */
if (MJobPReserve(J,P->Index,PResCount,&ResCountRej) == SUCCESS)
{
continue;
}
else
{
/* skip job */
if (!(J->Flags & (1 << mjfAdvReservation)))
{
IdleJobFound = TRUE;
}
continue;
}
} /* END: if (BestSValue <= 0) */
/* adequate nodes found */
J->SpecWCLimit[0] = J->SpecWCLimit[BestSIndex];
J->Request.TC = 0;
for (rqindex = 0;J->Req[rqindex] != NULL;rqindex++)
{
RQ = J->Req[rqindex];
RQ->TaskRequestList[0] = RQ->TaskRequestList[BestSIndex];
RQ->TaskCount = RQ->TaskRequestList[BestSIndex];
J->Request.TC += RQ->TaskRequestList[BestSIndex];
RQ->TaskRequestList[BestSIndex] = DefaultTaskCount[rqindex];
}
J->SpecWCLimit[BestSIndex] = DefaultSpecWCLimit;
if (BestSIndex > 1)
MJobUpdateResourceCache(J,BestSIndex);
if (MPar[0].NodeSetDelay > 0)
memcpy(tmpMNodeList,MNodeList,sizeof(tmpMNodeList));
if (MJobAllocMNL(
J,
MNodeList,
NodeMap,
NULL,
NAPolicy,
MSched.Time) == FAILURE)
{
DBG(1,fSCHED) DPrint("ERROR: cannot allocate nodes to job '%s' in partition %s\n",
J->Name,
P->Name);
tmpL = MSched.Time + 1;
MJobSetAttr(J,mjaSysSMinTime,(void **)&tmpL,0,mAdd);
if ((MJobPReserve(J,P->Index,PResCount,&ResCountRej) == SUCCESS) ||
(ResCountRej == FALSE))
{
continue;
}
else
{
/* skip job */
IdleJobFound = TRUE;
continue;
}
} /* END if (NodesAllocated == FALSE) */
if (MJobStart(J) == FAILURE)
{
DBG(1,fSCHED) DPrint("ERROR: cannot start job '%s' in partition %s\n",
J->Name,
P->Name);
tmpL = MSched.Time + 1;
MJobSetAttr(J,mjaSysSMinTime,(void **)&tmpL,0,mAdd);
if (MJobPReserve(J,P->Index,PResCount,&ResCountRej) == SUCCESS)
{
continue;
}
else
{
/* skip job */
IdleJobFound = TRUE;
continue;
}
} /* END if (MJobStart() == FAILURE) */
if ((MPar[0].BFPolicy == bfPREEMPT) && (IdleJobFound == TRUE))
{
/* job was backfilled */
J->SpecFlags |= (1 << mjfPreemptee);
MJobUpdateFlags(J);
}
SchedCount++;
} /* END for jindex */
if (SchedCount != 0)
{
DBG(2,fSCHED) DPrint("INFO: %d jobs started on iteration %d\n",
SchedCount,
MSched.Iteration);
}
DBG(2,fSCHED)
{
fprintf(mlog.logfp,"Active Jobs------\n");
for (index = 0;MAQ[index] != -1;index++)
MJobShow(MJob[MAQ[index]],0,NULL);
fprintf(mlog.logfp,"------------------\n");
}
DBG(2,fSCHED) DPrint("INFO: resources available after scheduling: N: %d P: %d\n",
MPar[0].IdleNodes,
MPar[0].ARes.Procs);
return(SUCCESS);
} /* END MQueueScheduleIJobs() */
int MQueueDiagnose(
mjob_t **FullQ, /* I */
int *NBJobList,
int PLevel,
mpar_t *PS,
char *Buffer,
int BufSize)
{
int pindex;
int index;
int jindex;
mjob_t *J;
char DValue[MAX_MNAME];
enum MJobDependEnum DType;
char Message[MAX_MLINE];
int RIndex;
mbool_t IsBlocked;
mpar_t *P;
mpar_t *GP;
mreq_t *RQ;
const char *FName = "MQueueDiagnose";
if ((FullQ == NULL) ||
(Buffer == NULL) ||
(PS == NULL))
{
return(FAILURE);
}
/* NOTE: must be synchronized with MQueueSelectJobs() and MJobCheckPolicies() */
sprintf(Buffer,"%sDiagnosing blocked jobs (policylevel %s partition %s)\n\n",
Buffer,
MPolicyMode[PLevel],
PS->Name);
if ((FullQ[0]->Next == NULL) || (FullQ[0]->Next == FullQ[0]))
{
DBG(4,fUI) DPrint("no blocked jobs found in %s()\n",
FName);
sprintf(Buffer,"no blocked jobs found\n");
return(FAILURE);
}
MLocalCheckFairnessPolicy(NULL,MSched.Time,NULL);
for (J = FullQ[0]->Next;J != FullQ[0];J = J->Next)
{
if (strlen(Buffer) + 256 >= BufSize)
break;
if ((PS->Index > 0) && (MUBMCheck(PS->Index,J->PAL) == FAILURE))
continue;
RQ = J->Req[0]; /* FIXME */
if ((J->State == mjsStarting) || (J->State == mjsRunning))
{
continue;
}
DBG(4,fUI) DPrint("INFO: checking job '%s' QOS: %s (QFlags: %lu)\n",
J->Name,
(J->Cred.Q != NULL) ? J->Cred.Q->Name : "NULL",
(J->Cred.Q != NULL) ? J->Cred.Q->Flags : 0);
/* look for job in NBJobList */
for (jindex = 0;jindex < MAX_MJOB + MAX_MHBUF;jindex++)
{
if (NBJobList[jindex] == -1)
break;
if (J->Index == NBJobList[jindex])
break;
} /* END for (jindex) */
if (J->Index == NBJobList[jindex])
{
/* job is not blocked */
continue;
}
IsBlocked = FALSE;
/* check job state */
if ((J->State != mjsIdle) && (J->State != mjsHold))
{
sprintf(Buffer,"%sjob %-20s has non-idle state (state: '%s')\n",
Buffer,
J->Name,
MJobState[J->State]);
continue;
}
/* check user/system/batch hold */
if (J->Hold != 0)
{
sprintf(Buffer,"%sjob %-20s has the following hold(s) in place: ",
Buffer,
J->Name);
for (index = 0;MHoldType[index] != NULL;index++)
{
if (J->Hold & (1 << index))
{
strcat(Buffer," ");
strcat(Buffer,MHoldType[index]);
}
} /* END for (index) */
strcat(Buffer,"\n");
IsBlocked = TRUE;
}
/* check if job has been previously scheduled or deferred */
if (J->EState != mjsIdle)
{
sprintf(Buffer,"%sjob %-20s has non-idle expected state (expected state: %s)\n",
Buffer,
J->Name,
MJobState[J->EState]);
IsBlocked = TRUE;
}
/* check release time */
if (J->SMinTime > MSched.Time)
{
sprintf(Buffer,"%sjob %-20s has not reached its start date (%s to startdate)\n",
Buffer,
J->Name,
MULToTString(J->SMinTime - MSched.Time));
IsBlocked = TRUE;
}
/* check job dependencies */
if (MJobCheckDependency(J,&DType,DValue) == FAILURE)
{
sprintf(Buffer,"%sjob %s requires %s of job '%s')\n",
Buffer,
J->Name,
MJobDependType[DType],
DValue);
IsBlocked = TRUE;
} /* END if (MJobCheckDependency(J,&DType,DValue) == FAILURE) */
if (MLocalCheckFairnessPolicy(J,MSched.Time,NULL) == FAILURE)
{
DBG(6,fSCHED) DPrint("INFO: job %s rejected, partition %s (violates local fairness policy)\n",
J->Name,
PS->Name);
sprintf(Buffer,"%sjob %-20s would violate 'local' fairness policies\n",
Buffer,
J->Name);
IsBlocked = TRUE;
}
GP = &MPar[0];
/* determine all partitions in which job can run */
for (pindex = 1;pindex < MAX_MPAR;pindex++)
{
P = &MPar[pindex];
if (P->ConfigNodes == 0)
continue;
if (MUNumListGetCount(
J->StartPriority,
RQ->DRes.PSlot,
P->CRes.PSlot,
0,
NULL) == FAILURE)
{
/* required classes not configured in partition */
sprintf(Buffer,"%sjob %-20s requires classes not configured in partition %s (%s)\n",
Buffer,
J->Name,
P->Name,
MUCAListToString(RQ->DRes.PSlot,P->CRes.PSlot,NULL));
IsBlocked = TRUE;
continue;
}
if (MUBMCheck(P->Index,J->PAL) == FAILURE)
{
sprintf(Buffer,"%sjob %-20s not allowed to run in partition %s (partitions allowed: %s)\n",
Buffer,
J->Name,
P->Name,
MUListAttrs(ePartition,J->PAL[0]));
IsBlocked = TRUE;
continue;
}
/* check job limits and other policies */
if (MJobCheckPolicies(
J,
PLevel,
0,
P,
&RIndex,
Message,
MSched.Time) == FAILURE)
{
strcat(Buffer,Message);
IsBlocked = TRUE;
continue;
}
if (GP->FSC.FSPolicy != fspNONE)
{
int OIndex;
if (MFSCheckCap(NULL,J,P,&OIndex) == FAILURE)
{
DBG(5,fSCHED) DPrint("INFO: job '%s' exceeds %s FS cap\n",
J->Name,
MXO[OIndex]);
sprintf(Buffer,"%sjob %-20s would violate %s FS cap in partition %s\n",
Buffer,
J->Name,
MXO[OIndex],
P->Name);
IsBlocked = TRUE;
continue;
}
} /* END if (GP->FSC.FSPolicy != fspNONE) */
} /* END for (pindex) */
if (IsBlocked == FALSE)
{
switch (J->BlockReason)
{
case mjneIdlePolicy:
sprintf(Buffer,"%sjob %-20s is blocked by idle job policy\n",
Buffer,
J->Name);
break;
default:
if ((PLevel == ptHARD) || (PLevel == ptOFF))
{
sprintf(Buffer,"%sjob %-20s is not blocked at this policy level\n",
Buffer,
J->Name);
}
break;
} /* END switch(J->BlockReason) */
}
}
if (J != FullQ[0])
{
strcat(Buffer,"\nlist truncated\n");
}
return(SUCCESS);
} /* END MQueueDiagnose() */
int MQueueScheduleSJobs(
int *Q) /* I */
{
int jindex;
mjob_t *J;
mnodelist_t MNodeList;
char NodeMap[MAX_MNODE];
int AllowPreemption;
const char *FName = "MQueueScheduleSJobs";
DBG(5,fSCHED) DPrint("%s(Q)\n",
FName);
if (Q == NULL)
{
return(FAILURE);
}
/* locate suspended jobs */
AllowPreemption = FALSE;
for (jindex = 0;Q[jindex] != -1;jindex++)
{
J = MJob[Q[jindex]];
if (J->State != mjsSuspended)
continue;
if (J->RMinTime > MSched.Time)
continue;
if (MJobSelectMNL(
J,
&MPar[J->Req[0]->PtIndex],
J->NodeList,
MNodeList,
NodeMap,
AllowPreemption) == FAILURE)
{
/* job nodes not available */
continue;
}
/* all nodes support job */
MJobResume(J,NULL,NULL);
} /* END for (jindex) */
return(SUCCESS);
} /* END MQueueScheduleSJobs() */
int MQueueCheckStatus()
{
int index;
int ReasonList[MAX_MREJREASON];
int SrcQ[2];
int tmpQ[2];
enum MHoldReasonEnum Reason;
mjob_t *J;
mjob_t *JNext;
char DeferMessage[MAX_MLINE];
long Delta;
const char *FName = "MQueueCheckStatus";
DBG(3,fSTRUCT) DPrint("%s()\n",
FName);
/* purge expired jobs */
for (J = MJob[0]->Next;(J != NULL) && (J != MJob[0]);J = JNext)
{
JNext = J->Next;
DBG(7,fSTRUCT) DPrint("INFO: checking purge criteria for job '%s'\n",
J->Name);
if ((J->RM != NULL) && (J->RM->WorkloadUpdateIteration != MSched.Iteration))
{
continue;
}
Delta = (long)MSched.Time - J->ATime;
if (Delta > MAX(MAX(15,MSched.RMPollInterval),MSched.JobPurgeTime))
{
DBG(2,fSTRUCT) DPrint("WARNING: job '%s' no longer detected (%ld > %ld)\n",
J->Name,
Delta,
MAX(MAX(15,MSched.RMPollInterval),MSched.JobPurgeTime));
DBG(2,fSTRUCT) DPrint("ALERT: purging job '%s'\n",
J->Name);
if (((J->State == mjsStarting) || (J->State == mjsRunning)) &&
(J->CompletionTime <= 0))
{
J->CompletionTime = MSched.Time;
}
if (J->Cred.A != NULL)
{
if (MAMAllocResCancel(
J->Cred.A->Name,
J->Name,
"job purged",
NULL,
&Reason) == FAILURE)
{
DBG(1,fSCHED) DPrint("ERROR: cannot cancel allocation reservation for job '%s'\n",
J->Name);
}
}
/* remove job from job list */
if ((J->State == mjsStarting) || (J->State == mjsRunning))
{
MJobProcessCompleted(J);
}
MJobRemove(J);
} /* END if ((MSched.Time - J->ATime) > ...) */
if ((J->State == mjsIdle) || (J->State == mjsDeferred) || (J->State == mjsHold))
{
if (J->Flags & (1 << mjfNoQueue))
{
DBG(2,fSTRUCT) DPrint("INFO: cancelling Non-Queue job '%s'\n",
J->Name);
if (J->Cred.A != NULL)
{
if (MAMAllocResCancel(J->Cred.A->Name,J->Name,"interactive job purged",NULL,&Reason) == FAILURE)
{
DBG(1,fSCHED) DPrint("ERROR: cannot cancel allocation reservation for job '%s'\n",
J->Name);
}
}
/* cancel job */
if (J->EState == mjsDeferred)
{
sprintf(DeferMessage,"SCHED_INFO: job cannot run. Reason: %s\n",
MDefReason[J->HoldReason]);
MRMJobCancel(J,DeferMessage,NULL);
}
else
{
MOQueueInitialize(SrcQ);
SrcQ[0] = J->Index;
SrcQ[1] = -1;
memset(ReasonList,0,sizeof(ReasonList));
if (MQueueSelectJobs(
SrcQ,
tmpQ,
ptHARD,
MAX_MNODE_PER_JOB,
MAX_MTASK,
MAX_MTIME,
-1,
ReasonList,
FALSE,
FALSE) == FAILURE)
{
strcpy(DeferMessage,"SCHED_INFO: job cannot run. Reason: cannot select job\n");
}
else if (tmpQ[0] == -1)
{
for (index = 0;index < MAX_MREJREASON;index++)
{
if (ReasonList[index] != 0)
break;
}
if (index != MAX_MREJREASON)
{
sprintf(DeferMessage,"SCHED_INFO: job cannot run. Reason: %s\n",
MAllocRejType[index]);
}
else
{
strcpy(DeferMessage,"SCHED_INFO: job cannot run. Reason: policy violation\n");
}
}
else
{
strcpy(DeferMessage,"SCHED_INFO: insufficient resources to run job\n");
}
MRMJobCancel(J,DeferMessage,NULL);
} /* END else (J->EState == mjsDeferred) */
} /* END if (J->Flags & (1 << mjfNoQueue)) */
} /* END if ((J->State == mjsIdle)... */
} /* END for (jindex) */
/* purge subjobs */
/* NYI */
return(SUCCESS);
} /* END MQueueCheckStatus() */
int MQueueAddAJob(
mjob_t *J) /* I */
{
int jindex;
const char *FName = "MQueueAddAJob";
DBG(4,fSCHED) DPrint("%s(%s)\n",
FName,
(J != NULL) ? J->Name : "NULL");
if (J == NULL)
{
return(FAILURE);
}
/* MQueueAddAJob() requires J->StartTime */
/* find open MAQ slot */
for (jindex = 0;MAQ[jindex] != -1;jindex++)
{
if (J->Index == MAQ[jindex])
{
/* job previously added */
return(SUCCESS);
}
} /* END for (jindex) */
MAQ[jindex] = J->Index;
MAQ[jindex + 1] = -1;
/* sort active jobs in earliest completion time first order */
qsort(
(void *)&MAQ[0],
(jindex + 1),
sizeof(int),
(int(*)(const void *,const void *))MJobCTimeComp);
MStatUpdateActiveJobUsage(J);
/* if job is started by scheduler or is found already running */
if ((J->EState == mjsIdle) || (J->CTime == MSched.Time))
{
/* new job detected */
/* if job started by scheduler set EState to Starting. */
/* if job discovered in active state, set to whatever is detected. */
if (MSched.Mode == msmSim)
{
J->EState = mjsRunning;
}
else
{
if (J->State == mjsIdle)
J->EState = mjsStarting;
else
J->EState = J->State;
}
}
else
{
DBG(6,fSCHED) DPrint("INFO: previously detected active Job[%03d] '%s' added to MAQ\n",
J->Index,
J->Name);
}
DBG(5,fSCHED)
{
DBG(5,fSCHED) DPrint("INFO: job '%s' added to MAQ at slot %d\n",
J->Name,
jindex);
DBG(5,fSCHED) DPrint("INFO: MAQ: ");
for (jindex = 0;MAQ[jindex] != -1;jindex++)
{
fprintf(mlog.logfp,"[%d : %s : %ld]",
MAQ[jindex],
MJob[MAQ[jindex]]->Name,
MJob[MAQ[jindex]]->StartTime + MJob[MAQ[jindex]]->WCLimit - MSched.Time);
} /* END for (jindex) */
fprintf(mlog.logfp,"\n");
}
return(SUCCESS);
} /* END MQueueAddAJob() */
int MQueueScheduleRJobs(
int *Q) /* I: prioritized list of feasible jobs */
{
int index;
int jindex;
mnodelist_t MNodeList;
int SchedCount;
int PIndex;
char NodeMap[MAX_MNODE];
int HPSAdapterCheck;
int RMNodeStateCheck;
int HPSAdapterDelayCount;
int RMNodeStateDelayCount;
mjob_t *J;
mnode_t *N;
mreq_t *RQ;
mres_t *R;
int rqindex;
int rindex;
int qindex;
int HighPriority[MAX_MPAR][MAX_MQOS];
int GResPolicy;
int ResPolicy;
long tmpL;
int NAPolicy;
const char *FName = "MQueueScheduleRJobs";
DBG(2,fSCHED) DPrint("%s(Q)\n",
FName);
if (Q == NULL)
{
return(FAILURE);
}
memset(HighPriority,0,sizeof(HighPriority));
GResPolicy = (MPar[0].ResPolicy == resDefault) ?
DEFAULT_RESERVATIONMODE :
MPar[0].ResPolicy;
/* check all feasible jobs */
SchedCount = 0;
for (jindex = 0;Q[jindex] != -1;jindex++)
{
J = MJob[Q[jindex]];
if ((J == NULL) || (J->State != mjsIdle))
continue;
MTRAPJOB(J,FName);
DBG(5,fSCHED) DPrint("INFO: checking job %s in %s()\n",
J->Name,
FName);
/* bind job to specified reservation */
if (J->ResName[0] != '\0')
{
if (MResFind(J->ResName,&R) == SUCCESS)
{
/* non-preemptible */
if (J->R != NULL)
MResDestroy(&J->R);
if (MJobReserve(J,mjrUser) == SUCCESS)
{
J->R->Priority = J->StartPriority;
}
}
}
PIndex = 0;
if (J->R != NULL)
{
PIndex = J->R->PtIndex;
ResPolicy = (MPar[PIndex].ResPolicy != resDefault) ?
MPar[PIndex].ResPolicy :
GResPolicy;
}
else
{
ResPolicy = GResPolicy;
}
/* create new reservations */
if (J->Cred.Q->Flags & (1 << mqfreserved))
{
if (J->R != NULL)
MResDestroy(&J->R);
/* non-preemptible */
if (MJobReserve(J,mjrQOSReserved) == FAILURE)
{
DBG(1,fSCHED) DPrint("ALERT: cannot create %s reservation for job %s\n",
MJRType[mjrQOSReserved],
J->Name);
continue;
}
}
else if ((J->CMaxTime != MAX_MTIME) && (J->CMaxTime >= MSched.Time))
{
if (J->R != NULL)
MResDestroy(&J->R);
/* non-preemptible */
if (MJobReserve(J,mjrDeadline) == FAILURE)
{
DBG(1,fSCHED) DPrint("ALERT: cannot create %s reservation for job %s\n",
MJRType[mjrDeadline],
J->Name);
continue;
}
}
else if (J->RType == mjrPriority)
{
/* locate QOS group */
for (rindex = 0;rindex < MAX_MQOS;rindex++)
{
if (MPar[0].ResDepth[rindex] == 0)
continue;
for (qindex = 0;qindex < MAX_MQOS;qindex++)
{
if (MPar[0].ResQOSList[rindex][qindex] == NULL)
break;
if ((MPar[0].ResQOSList[rindex][qindex] == J->Cred.Q) ||
(MPar[0].ResQOSList[rindex][qindex] == (mqos_t *)MAX_MQOS))
{
break;
}
} /* END for (qindex) */
if ((MPar[0].ResQOSList[rindex][qindex] == J->Cred.Q) ||
(MPar[0].ResQOSList[rindex][qindex] == (mqos_t *)MAX_MQOS))
{
break;
}
} /* END for (rindex) */
if (rindex == MAX_MQOS)
{
/* ERROR: cannot locate qos reservation group */
DBG(2,fSCHED) DPrint("ALERT: cannot locate QOS group for reserved job %s (QOS: %s) in %s()\n",
J->Name,
J->Cred.Q->Name,
FName);
if (J->R != NULL)
MResDestroy(&J->R);
continue;
}
HighPriority[PIndex][rindex]++;
/* release all 'current highest' reservations */
/* attempt 'slide-forward' on other priority reservations */
if (J->R != NULL)
{
switch (ResPolicy)
{
case resCurrentHighest:
/* release all reservations */
DBG(6,fSCHED) DPrint("INFO: releasing reservation '%s' for res policy CurrentHighest\n",
J->R->Name);
MResDestroy(&J->R);
/* recreate top 'ResDepth' reservations */
/* appropriate reservations will be re created in MQueueScheduleIJobs() */
/*
if (HighPriority[PIndex][rindex] <=
MPar[0].ResDepth[rindex])
{
if (MJobReserve(J,mjrPriority) == SUCCESS)
{
J->R->Priority = J->StartPriority;
J->R->Flags |= (1 << mrfPreemptible);
}
}
*/
break;
case resHighest:
/* all 'Highest' reservations should be slid forward regardless */
/* of ResDepth */
if (MJobReserve(J,mjrPriority) == SUCCESS)
{
J->R->Priority = J->StartPriority;
J->R->Flags |= (1 << mrfPreemptible);
}
break;
case resNever:
default:
MResDestroy(&J->R);
break;
} /* END switch (ResPolicy) */
} /* END if (J->R != NULL) */
} /* END if (J->RType == mjrPriority) */
/* continue if no reservation */
if (J->R == NULL)
{
continue;
}
/* if the time to schedule has arrived... */
if ((MSched.Time >= J->R->StartTime) && (J->State == mjsIdle))
{
DBG(2,fSCHED) DPrint("INFO: located job '%s' reserved to start %30s",
J->Name,
MULToDString((mulong *)&J->R->StartTime));
/* get reservation partition information */
PIndex = J->R->PtIndex;
if (MJobCheckLimits(
J,
ptSOFT,
&MPar[PIndex],
(1 << mlActive),
NULL) == FAILURE)
{
if (strcmp(J->ResName,J->R->Name) != 0)
MResDestroy(&J->R);
if (MSched.Mode != msmTest)
{
if (MSched.Mode == msmNormal)
{
MOSSyslog(LOG_NOTICE,"cannot run reserved job '%s'. job violates active policy",
J->Name);
}
DBG(1,fSCHED) DPrint("ALERT: cannot run reserved job '%s'. (job violates active policy)\n",
J->Name);
MJobSetHold(
J,
(1 << mhDefer),
MSched.DeferTime,
mhrPolicyViolation,
"reserved job violates active policy");
}
continue;
} /* END if (MJobCheckLimits() == FAILURE) */
if (MJobSelectMNL(
J,
&MPar[PIndex],
NULL,
MNodeList,
NodeMap,
TRUE) == FAILURE)
{
/* not enough nodes to run job */
DBG(2,fSCHED) DPrint("ALERT: insufficient nodes to run reserved job '%s' on iteration %d\n",
J->Name,
MSched.Iteration);
/* check if LL node state/adapter is source of problem */
HPSAdapterCheck = FALSE;
RMNodeStateCheck = TRUE;
for (rqindex = 0;J->Req[rqindex] != NULL;rqindex++)
{
RQ = J->Req[rqindex];
if (MRM[RQ->RMIndex].Type == mrmtLL)
{
if (RQ->Network == MUMAGetIndex(eNetwork,"hps_user",mVerify))
{
HPSAdapterCheck = TRUE;
break;
}
}
} /* END for (rqindex) */
HPSAdapterDelayCount = 0;
RMNodeStateDelayCount = 0;
for (index = 0;index < MAX_MNODE;index++)
{
N = MNode[index];
if ((N == NULL) || (N->Name[0] == '\0'))
break;
if (N->Name[0] == '\1')
continue;
if ((N->PtIndex != PIndex) &&
(PIndex != 0) &&
!(J->Flags & (1 << mjfSpan)))
{
continue;
}
if ((N->EState == mnsIdle) ||
(N->EState == mnsActive))
{
if (((N->RM->Type == mrmtLL) ||
(N->RM->Type == mrmtPBS)) &&
(RMNodeStateCheck == TRUE))
{
/* add dedicated check for running jobs */
if (N->State == mnsBusy)
{
DBG(6,fSCHED) DPrint("INFO: node '%s' is not available for scheduling. (state: %s exp: %s)\n",
N->Name,
MAList[eNodeState][N->State],
MAList[eNodeState][N->EState]);
RMNodeStateDelayCount++;
}
}
}
if ((N->RM->Type == mrmtLL) &&
(HPSAdapterCheck == TRUE))
{
if (!(N->Network & MUMAGetBM(eNetwork,"hps_user",mVerify)))
{
DBG(6,fSCHED) DPrint("INFO: LL node '%s' is not available for scheduling. (missing 'hps_user' adapter)\n",
N->Name);
HPSAdapterDelayCount++;
}
}
} /* END for (index = 0;MNode[index].Name[0];index++) */
if (((MSched.Time - J->R->StartTime) <
MPar[0].ResRetryTime) &&
((HPSAdapterDelayCount > 0) ||
(RMNodeStateDelayCount > 0)))
{
/* need to extend reservation properly */
if (MSched.Mode == msmNormal)
{
MOSSyslog(LOG_NOTICE,"extending reservation for job %s, A: %d PS: %d",
J->Name,
HPSAdapterDelayCount,
RMNodeStateDelayCount);
}
tmpL = MSched.Time + 1;
MJobSetAttr(J,mjaSysSMinTime,(void **)&tmpL,0,mAdd);
DBG(4,fSCHED) DPrint("INFO: reservation for job '%s' blocked by RM race condition. (retrying reservation)\n",
J->Name);
if (MJobReserve(J,J->RType) == SUCCESS)
{
DBG(4,fSCHED) DPrint("INFO: reservation for job '%s' recreated for time %s\n",
J->Name,
MULToTString(J->R->StartTime - MSched.Time));
}
else
{
DBG(4,fSCHED) DPrint("ALERT: cannot create slide-back reservation for job '%s'\n",
J->Name);
}
} /* END if (MSched.Time...) */
else
{
DBG(2,fSCHED) DPrint("ALERT: insufficient nodes to run reserved job '%s' (reservation released)\n",
J->Name);
if (strcmp(J->ResName,J->R->Name) != 0)
MResDestroy(&J->R);
if (MSched.Mode != msmTest)
{
if (MSched.Mode == msmNormal)
{
MOSSyslog(LOG_NOTICE,"reserved job %s cannot run. deferring",
J->Name);
}
MJobSetHold(
J,
(1 << mhDefer),
MSched.DeferTime,
mhrNoResources,
"insufficient resources to start reserved job");
}
} /* END else (MSched.Time - J->R->StartTime) */
continue;
} /* END if (MJobSelectMNL() == FAILURE) */
RQ = J->Req[0];
NAPolicy = (RQ->NAllocPolicy != NULL) ?
RQ->NAllocPolicy->NAllocPolicy :
MPar[RQ->PtIndex].NAllocPolicy;
if (MJobAllocMNL(
J,
MNodeList,
NodeMap,
NULL,
NAPolicy,
MSched.Time) == SUCCESS)
{
if (MJobStart(J) == SUCCESS)
{
SchedCount++;
DBG(2,fSCHED) DPrint("INFO: reserved job '%s' started\n",
J->Name);
}
else
{
DBG(1,fSCHED) DPrint("ALERT: cannot run reserved job '%s'\n",
J->Name);
}
}
else
{
DBG(1,fSCHED) DPrint("ERROR: cannot allocate nodes for job '%s'\n",
J->Name);
}
} /* END if (if (MSched.Time >= J->R->StartTime)) */
} /* END for (jindex) */
/* increment bypass value */
if (SchedCount != 0)
{
DBG(2,fSCHED) DPrint("INFO: reserved jobs scheduled: %d\n",
SchedCount);
}
return(SUCCESS);
} /* END MQueueScheduleRJobs() */
/* END MQueue.c */