Commit 8e189d3f authored by Marta Bertran Ferrer's avatar Marta Bertran Ferrer Committed by Costin Grigoras
Browse files

Used monitoring enum for both numeric and string states

parent f8e4a57a
......@@ -56,36 +56,6 @@ public class Monitor implements Runnable {
*/
private final String nodeName;
/**
* @param component
*/
Monitor(final String component) {
this.component = component;
this.modules = new HashSet<>();
final String clusterPrefix = MonitorFactory.getConfigString(component, "cluster_prefix", "ALIEN");
final String clusterSuffix = MonitorFactory.getConfigString(component, "cluster_suffix", "Nodes");
String cluster = "";
if (clusterPrefix != null && clusterPrefix.length() > 0)
cluster = clusterPrefix + "_";
cluster += component;
if (clusterSuffix != null && clusterSuffix.length() > 0)
cluster += "_" + clusterSuffix;
clusterName = MonitorFactory.getConfigString(component, "cluster_name", cluster);
final String pattern = MonitorFactory.getConfigString(component, "node_name",
component.startsWith("alien.site.") ? "${hostname}:${pid}" : "${hostname}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
nodeName = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
}
/**
* @param component
* @param jobNumber
......@@ -110,11 +80,17 @@ public class Monitor implements Runnable {
clusterName = MonitorFactory.getConfigString(component, "cluster_name", cluster);
final String pattern = MonitorFactory.getConfigString(component, "node_name", "${hostname}:${pid}:${jobnumber}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
temp = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
nodeName = Format.replace(temp, "${jobnumber}", String.valueOf(jobNumber));
if (jobNumber < 0) {
final String pattern = MonitorFactory.getConfigString(component, "node_name",
component.startsWith("alien.site.") ? "${hostname}:${pid}" : "${hostname}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
nodeName = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
} else {
final String pattern = MonitorFactory.getConfigString(component, "node_name", "${hostname}:${pid}:${jobnumber}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
temp = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
nodeName = Format.replace(temp, "${jobnumber}", String.valueOf(jobNumber));
}
}
/**
......
......@@ -100,32 +100,14 @@ public final class MonitorFactory {
* @return the monitor
*/
public static Monitor getMonitor(final String component) {
Monitor m;
synchronized (monitors) {
m = monitors.get(component);
if (m == null && getConfigBoolean(component, "enabled", true)) {
m = new Monitor(component);
final int interval = getConfigInt(component, "period", isJob() ? 120 : 60);
final ScheduledFuture<?> future = executor.scheduleAtFixedRate(m, random.nextInt(interval), interval, TimeUnit.SECONDS);
m.future = future;
m.interval = interval;
monitors.put(component, m);
}
}
return m;
return getMonitor(component, -1);
}
/**
* Get the monitor for this component
*
* @param component
* @param jobNumber
* @return the monitor
*/
public static Monitor getMonitor(final String component, final int jobNumber) {
......
......@@ -107,22 +107,25 @@ public class JobAgent implements Runnable {
private static float lhcbMarks = -1;
private enum jaStatus {
STARTING_JA(1), CHECKING_PARAMS(2), REQUESTING_JOB(3), MATCHED_JOB(4), JOB_STARTED(5), RUNNING_JOB(6), DONE_RUNNING_JOB(7), FINISHING_JA(8), ERROR_HC(-1), // error in getting host
ERROR_IP(-2), // error installing packages
ERROR_GET_JDL(-3), // error getting jdl
ERROR_JDL(-4), // incorrect jdl
ERROR_DIRS(-5), // error creating directories, not enough free space in workdir
ERROR_START(-6); // error forking to start job
STARTING_JA(0, "Starting running Job Agent"), REQUESTING_JOB(1, "Asking for a job"), INSTALLING_PKGS(2, "Found matching job"), JOB_STARTED(3, "Starting processing job's payload"), RUNNING_JOB(
4, "Running job's payload"), DONE(5, "Finished running job"), FINISHING_JA(6, "Job Agent finished"), ERROR_IP(-1, "Error getting AliEn jar path"), ERROR_GET_JDL(-2,
"Error getting jdl"), ERROR_DIRS(-3, "Error creating directories, not enough free space in workdir"), ERROR_START(-4, "Error launching Job Wrapper to start job");
private final int value;
private final String value_string;
jaStatus(final int value) {
jaStatus(final int value, final String value_string) {
this.value = value;
this.value_string = value_string;
}
public int getValue() {
return value;
}
public String getStringValue() {
return value_string;
}
}
/**
......@@ -190,8 +193,7 @@ public class JobAgent implements Runnable {
private Long reqCPU = Long.valueOf(0);
private Long reqDisk = Long.valueOf(0);
private String state_string;
private int state_numeric;
private jaStatus status;
/**
* Allow only one agent to request a job at a time
......@@ -215,20 +217,18 @@ public class JobAgent implements Runnable {
monitor = MonitorFactory.getMonitor(JobAgent.class.getCanonicalName(), jobNumber);
state_string = "JA number " + jobNumber + ". Creating and Starting Job Agent";
state_numeric = jaStatus.STARTING_JA.getValue();
monitor.addMonitoring("resource_status", (names, values) -> {
names.add("TTL_left");
values.add(Integer.valueOf(computeTimeLeft()));
names.add("state");
values.add(state_string);
names.add("ja_status_string");
values.add(status.getStringValue());
names.add("state_numeric");
values.add(Integer.valueOf(state_numeric));
names.add("ja_status");
values.add(Integer.valueOf(status.getValue()));
});
status = jaStatus.STARTING_JA;
monitorStatusChange();
logger = ConfigUtils.getLogger(JobAgent.class.getCanonicalName() + " " + jobNumber);
......@@ -332,6 +332,9 @@ public class JobAgent implements Runnable {
}
catch (final URISyntaxException e) {
logger.log(Level.SEVERE, "Could not obtain AliEn jar path: " + e.toString());
status = jaStatus.ERROR_IP;
monitorStatusChange();
}
}
......@@ -352,10 +355,8 @@ public class JobAgent implements Runnable {
}
monitor.sendParameter("TTL", siteMap.get("TTL"));
monitor.incrementCounter("requestedjobs");
state_string = "JA number " + jobNumber + ". Asking for job";
state_numeric = jaStatus.REQUESTING_JOB.getValue();
status = jaStatus.REQUESTING_JOB;
monitorStatusChange();
final GetMatchJob jobMatch = commander.q_api.getMatchJob(new HashMap<>(siteMap));
......@@ -367,6 +368,10 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO,
"We didn't get anything back. Nothing to run right now.");
RUNNING_JOBAGENTS -= 1;
status = jaStatus.ERROR_GET_JDL;
monitorStatusChange();
throw new Exception();
}
......@@ -382,8 +387,7 @@ public class JobAgent implements Runnable {
monitor.sendParameter("job_id", Long.valueOf(queueId));
state_string = "JA number " + jobNumber + ". Found matching job (" + queueId + ")";
state_numeric = jaStatus.MATCHED_JOB.getValue();
status = jaStatus.INSTALLING_PKGS;
monitorStatusChange();
matchedJob.entrySet().forEach(entry -> {
......@@ -432,8 +436,7 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO, username);
logger.log(Level.INFO, Long.toString(queueId));
state_string = "JA number " + jobNumber + ". Starting processing job's " + queueId + " payload";
state_numeric = jaStatus.JOB_STARTED.getValue();
status = jaStatus.JOB_STARTED;
monitorStatusChange();
// process payload
......@@ -451,6 +454,10 @@ public class JobAgent implements Runnable {
}
catch (final Exception e) {
logger.log(Level.INFO, "Error getting a matching job: ", e);
status = jaStatus.ERROR_GET_JDL;
monitorStatusChange();
if (RUNNING_CPU.equals(MAX_CPU))
retries.getAndIncrement();
// synchronized (requestSync) {
......@@ -458,8 +465,7 @@ public class JobAgent implements Runnable {
// }
}
state_string = "JA number " + jobNumber + ". Job Agent finished";
state_numeric = jaStatus.FINISHING_JA.getValue();
status = jaStatus.FINISHING_JA;
monitorStatusChange();
logger.log(Level.INFO, "JobAgent finished, id: " + jobAgentId + " totalJobs: " + totalJobs.get());
......@@ -472,6 +478,7 @@ public class JobAgent implements Runnable {
if (!createWorkDir()) {
// changeStatus(JobStatus.ERROR_IB);
logger.log(Level.INFO, "Error. Workdir for job could not be created");
return;
}
jobWrapperLogDir = jobWorkdir + "/" + jobWrapperLogName;
......@@ -502,10 +509,11 @@ public class JobAgent implements Runnable {
private void cleanup() {
logger.log(Level.INFO, "Sending monitoring values...");
state_string = "JA number " + jobNumber + ". Finished running job " + queueId;
state_numeric = jaStatus.DONE_RUNNING_JOB.getValue();
status = jaStatus.DONE;
monitorStatusChange();
monitor.sendParameter("job_id", Integer.valueOf(0));
logger.log(Level.INFO, "Cleaning up after execution...");
try {
......@@ -630,11 +638,6 @@ public class JobAgent implements Runnable {
final long jobAgentCurrentTime = System.currentTimeMillis();
final long jobAgentEndTime = jobAgentCurrentTime + timeleft;
state_string = "JA number " + jobNumber + ". Started to run in timestamp " + jobAgentCurrentTime +
" and will be allowed to run until timestamp " + jobAgentEndTime;
state_numeric = jaStatus.CHECKING_PARAMS.getValue();
monitorStatusChange();
if (checkParameters() == false)
return false;
......@@ -789,8 +792,7 @@ public class JobAgent implements Runnable {
pBuilder.redirectError(Redirect.INHERIT);
pBuilder.directory(tempDir);
state_string = "JA number " + jobNumber + ". Running job's " + queueId + " payload";
state_numeric = jaStatus.RUNNING_JOB.getValue();
status = jaStatus.RUNNING_JOB;
monitorStatusChange();
final Process p;
......@@ -828,6 +830,10 @@ public class JobAgent implements Runnable {
}
catch (final Exception ioe) {
logger.log(Level.SEVERE, "Exception running " + launchCommand + " : " + ioe.getMessage());
status = jaStatus.ERROR_START;
monitorStatusChange();
return 1;
}
......@@ -924,8 +930,8 @@ public class JobAgent implements Runnable {
}
private void monitorStatusChange() {
monitor.sendParameter("state", state_string);
monitor.sendParameter("state_numeric", Integer.valueOf(state_numeric));
monitor.sendParameter("ja_status_string", status.getStringValue());
monitor.sendParameter("ja_status", Integer.valueOf(status.getValue()));
}
private void sendProcessResources() {
......@@ -1051,6 +1057,10 @@ public class JobAgent implements Runnable {
final boolean created = tempDir.mkdirs();
if (!created) {
logger.log(Level.INFO, "Workdir does not exist and can't be created: " + jobWorkdir);
status = jaStatus.ERROR_DIRS;
monitorStatusChange();
return false;
}
final File jobTmpDir = new File(jobWorkdir + "/tmp");
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment