Commit f8e4a57a authored by Marta Bertran Ferrer's avatar Marta Bertran Ferrer Committed by Costin Grigoras
Browse files

Added periodic monitoring of status and TTL

parent 01ef363b
......@@ -107,7 +107,7 @@ public class JobAgent implements Runnable {
private static float lhcbMarks = -1;
private enum jaStatus {
REQUESTING_JOB(1), INSTALLING_PKGS(2), JOB_STARTED(3), RUNNING_JOB(4), DONE(5), ERROR_HC(-1), // error in getting host
STARTING_JA(1), CHECKING_PARAMS(2), REQUESTING_JOB(3), MATCHED_JOB(4), JOB_STARTED(5), RUNNING_JOB(6), DONE_RUNNING_JOB(7), FINISHING_JA(8), ERROR_HC(-1), // error in getting host
ERROR_IP(-2), // error installing packages
ERROR_GET_JDL(-3), // error getting jdl
ERROR_JDL(-4), // incorrect jdl
......@@ -190,6 +190,9 @@ public class JobAgent implements Runnable {
private Long reqCPU = Long.valueOf(0);
private Long reqDisk = Long.valueOf(0);
private String state_string;
private int state_numeric;
/**
* Allow only one agent to request a job at a time
*/
......@@ -211,8 +214,22 @@ public class JobAgent implements Runnable {
jobNumber = totalJobs.incrementAndGet();
monitor = MonitorFactory.getMonitor(JobAgent.class.getCanonicalName(), jobNumber);
monitor.sendParameter("state", "JA number " + jobNumber + ". Starting Job Agent");
monitor.sendParameter("statenumeric", Long.valueOf(1));
state_string = "JA number " + jobNumber + ". Creating and Starting Job Agent";
state_numeric = jaStatus.STARTING_JA.getValue();
monitor.addMonitoring("resource_status", (names, values) -> {
names.add("TTL_left");
values.add(Integer.valueOf(computeTimeLeft()));
names.add("state");
values.add(state_string);
names.add("state_numeric");
values.add(Integer.valueOf(state_numeric));
});
monitorStatusChange();
logger = ConfigUtils.getLogger(JobAgent.class.getCanonicalName() + " " + jobNumber);
......@@ -255,6 +272,7 @@ public class JobAgent implements Runnable {
});
logger.addHandler(handler);
}
catch (IOException ie) {
logger.log(Level.WARNING, "Problem with getting logger: " + ie.toString());
......@@ -333,12 +351,12 @@ public class JobAgent implements Runnable {
return;
}
monitor.sendParameter("ja_status", Integer.valueOf(jaStatus.REQUESTING_JOB.getValue()));
monitor.sendParameter("TTL", siteMap.get("TTL"));
monitor.incrementCounter("requestedjobs");
monitor.sendParameter("state", "JA number " + jobNumber + ". Asking for a job");
monitor.sendParameter("statenumeric", Long.valueOf(3));
state_string = "JA number " + jobNumber + ". Asking for job";
state_numeric = jaStatus.REQUESTING_JOB.getValue();
monitorStatusChange();
final GetMatchJob jobMatch = commander.q_api.getMatchJob(new HashMap<>(siteMap));
......@@ -362,6 +380,12 @@ public class JobAgent implements Runnable {
tokenKey = (String) matchedJob.get("TokenKey");
legacyToken = (String) matchedJob.get("LegacyToken");
monitor.sendParameter("job_id", Long.valueOf(queueId));
state_string = "JA number " + jobNumber + ". Found matching job (" + queueId + ")";
state_numeric = jaStatus.MATCHED_JOB.getValue();
monitorStatusChange();
matchedJob.entrySet().forEach(entry -> {
logger.log(Level.INFO, entry.getKey() + " " + entry.getValue());
});
......@@ -408,17 +432,15 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO, username);
logger.log(Level.INFO, Long.toString(queueId));
monitor.sendParameter("state", "JA number " + jobNumber + ". Starting running job " + queueId);
monitor.sendParameter("statenumeric", Long.valueOf(4));
state_string = "JA number " + jobNumber + ". Starting processing job's " + queueId + " payload";
state_numeric = jaStatus.JOB_STARTED.getValue();
monitorStatusChange();
// process payload
handleJob();
cleanup();
monitor.sendParameter("state", "JA number " + jobNumber + ". Finished running job " + queueId);
monitor.sendParameter("statenumeric", Long.valueOf(5));
synchronized (requestSync) {
RUNNING_CPU += reqCPU;
RUNNING_DISK += reqDisk;
......@@ -436,8 +458,9 @@ public class JobAgent implements Runnable {
// }
}
monitor.sendParameter("state", "JA number " + jobNumber + ". Finished running Job Agent");
monitor.sendParameter("statenumeric", Long.valueOf(6));
state_string = "JA number " + jobNumber + ". Job Agent finished";
state_numeric = jaStatus.FINISHING_JA.getValue();
monitorStatusChange();
logger.log(Level.INFO, "JobAgent finished, id: " + jobAgentId + " totalJobs: " + totalJobs.get());
}
......@@ -479,8 +502,9 @@ public class JobAgent implements Runnable {
private void cleanup() {
logger.log(Level.INFO, "Sending monitoring values...");
monitor.sendParameter("job_id", Integer.valueOf(0));
monitor.sendParameter("ja_status", Integer.valueOf(jaStatus.DONE.getValue()));
state_string = "JA number " + jobNumber + ". Finished running job " + queueId;
state_numeric = jaStatus.DONE_RUNNING_JOB.getValue();
monitorStatusChange();
logger.log(Level.INFO, "Cleaning up after execution...");
......@@ -605,9 +629,11 @@ public class JobAgent implements Runnable {
final long jobAgentCurrentTime = System.currentTimeMillis();
final long jobAgentEndTime = jobAgentCurrentTime + timeleft;
monitor.sendParameter("state", "JA number " + jobNumber + ". Started to run in timestamp " + jobAgentCurrentTime +
" and will be allowed to run until timestamp " + jobAgentEndTime);
monitor.sendParameter("statenumeric", Long.valueOf(2));
state_string = "JA number " + jobNumber + ". Started to run in timestamp " + jobAgentCurrentTime +
" and will be allowed to run until timestamp " + jobAgentEndTime;
state_numeric = jaStatus.CHECKING_PARAMS.getValue();
monitorStatusChange();
if (checkParameters() == false)
return false;
......@@ -763,6 +789,10 @@ public class JobAgent implements Runnable {
pBuilder.redirectError(Redirect.INHERIT);
pBuilder.directory(tempDir);
state_string = "JA number " + jobNumber + ". Running job's " + queueId + " payload";
state_numeric = jaStatus.RUNNING_JOB.getValue();
monitorStatusChange();
final Process p;
// stdin from the viewpoint of the wrapper
......@@ -893,6 +923,11 @@ public class JobAgent implements Runnable {
}
}
private void monitorStatusChange() {
monitor.sendParameter("state", state_string);
monitor.sendParameter("state_numeric", Integer.valueOf(state_numeric));
}
private void sendProcessResources() {
// runtime(date formatted) start cpu(%) mem cputime rsz vsize ncpu
// cpufamily cpuspeed resourcecost maxrss maxvss ksi2k
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment