Commit 01ef363b authored by Marta Bertran Ferrer's avatar Marta Bertran Ferrer Committed by Costin Grigoras
Browse files

Added monitoring for reporting the status of Job Agents

parent 791c0e53
......@@ -79,12 +79,44 @@ public class Monitor implements Runnable {
clusterName = MonitorFactory.getConfigString(component, "cluster_name", cluster);
final String pattern = MonitorFactory.getConfigString(component, "node_name", component.startsWith("alien.site.") ? "${hostname}:${pid}" : "${hostname}");
final String pattern = MonitorFactory.getConfigString(component, "node_name",
component.startsWith("alien.site.") ? "${hostname}:${pid}" : "${hostname}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
nodeName = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
}
/**
* @param component
* @param jobNumber
*/
Monitor(final String component, final int jobNumber) {
this.component = component;
this.modules = new HashSet<>();
final String clusterPrefix = MonitorFactory.getConfigString(component, "cluster_prefix", "ALIEN");
final String clusterSuffix = MonitorFactory.getConfigString(component, "cluster_suffix", "Nodes");
String cluster = "";
if (clusterPrefix != null && clusterPrefix.length() > 0)
cluster = clusterPrefix + "_";
cluster += component;
if (clusterSuffix != null && clusterSuffix.length() > 0)
cluster += "_" + clusterSuffix;
clusterName = MonitorFactory.getConfigString(component, "cluster_name", cluster);
final String pattern = MonitorFactory.getConfigString(component, "node_name", "${hostname}:${pid}:${jobnumber}");
String temp = Format.replace(pattern, "${hostname}", ConfigUtils.getLocalHostname());
temp = Format.replace(temp, "${pid}", String.valueOf(MonitorFactory.getSelfProcessID()));
nodeName = Format.replace(temp, "${jobnumber}", String.valueOf(jobNumber));
}
/**
* Get the ML cluster name
*
......@@ -169,11 +201,11 @@ public class Monitor implements Runnable {
}
/**
* Add a measurement value. This can be the time (recommended in seconds) that took a command to executed, a file size (in bytes) and so on.
* Add a measurement value. This can be the time (recommended in seconds) that
* took a command to executed, a file size (in bytes) and so on.
*
* @param key
* @param quantity
* how much to add to the previous value
* @param quantity how much to add to the previous value
* @return accumulated so far, or <code>-1</code> if there was any error
*/
public double addMeasurement(final String key, final double quantity) {
......@@ -200,7 +232,8 @@ public class Monitor implements Runnable {
* Get the CacheMonitor for this key.
*
* @param key
* @return the existing, or newly created, object, or <code>null</code> if a different type of object was already associated to this key
* @return the existing, or newly created, object, or <code>null</code> if a
* different type of object was already associated to this key
*/
public CacheMonitor getCacheMonitor(final String key) {
final MonitoringObject mo = monitoringObjects.computeIfAbsent(key, (k) -> new CacheMonitor(k));
......@@ -334,17 +367,16 @@ public class Monitor implements Runnable {
/**
* Send these parameters
*
* @param paramNames
* the names
* @param paramValues
* values associated to the names, Strings or Numbers
* @param paramNames the names
* @param paramValues values associated to the names, Strings or Numbers
*/
public void sendParameters(final Vector<String> paramNames, final Vector<Object> paramValues) {
if (paramNames == null || paramValues == null || (paramNames.size() == 0 && paramValues.size() == 0))
return;
if (paramValues.size() != paramNames.size()) {
logger.log(Level.WARNING, "The names and the values arrays have different sizes (" + paramNames.size() + " vs " + paramValues.size() + ")");
logger.log(Level.WARNING, "The names and the values arrays have different sizes (" + paramNames.size()
+ " vs " + paramValues.size() + ")");
return;
}
......@@ -352,12 +384,12 @@ public class Monitor implements Runnable {
}
/**
* Send only one parameter. This method of sending is less efficient than {@link #sendParameters(Vector, Vector)} and so it should only be used when there is exactly one parameter to be sent.
* Send only one parameter. This method of sending is less efficient than
* {@link #sendParameters(Vector, Vector)} and so it should only be used when
* there is exactly one parameter to be sent.
*
* @param parameterName
* parameter name
* @param parameterValue
* the value, should be either a String or a Number
* @param parameterName parameter name
* @param parameterValue the value, should be either a String or a Number
* @see #sendParameters(Vector, Vector)
*/
public void sendParameter(final String parameterName, final Object parameterValue) {
......
......@@ -122,6 +122,35 @@ public final class MonitorFactory {
return m;
}
/**
* Get the monitor for this component
*
* @param component
* @return the monitor
*/
public static Monitor getMonitor(final String component, final int jobNumber) {
Monitor m;
synchronized (monitors) {
m = monitors.get(component + "_" + jobNumber);
if (m == null && getConfigBoolean(component, "enabled", true)) {
m = new Monitor(component, jobNumber);
final int interval = getConfigInt(component, "period", isJob() ? 120 : 60);
final ScheduledFuture<?> future = executor.scheduleAtFixedRate(m, random.nextInt(interval), interval, TimeUnit.SECONDS);
m.future = future;
m.interval = interval;
monitors.put(component + "_" + jobNumber, m);
}
}
return m;
}
/**
* Enable periodic sending of background host monitoring
*/
......
......@@ -133,7 +133,7 @@ public class JobAgent implements Runnable {
/**
* ML monitor object
*/
private static final Monitor monitor = MonitorFactory.getMonitor(JobAgent.class.getCanonicalName());
private Monitor monitor;
/**
* ApMon sender
......@@ -209,7 +209,13 @@ public class JobAgent implements Runnable {
ce = env.get("CE");
jobNumber = totalJobs.incrementAndGet();
monitor = MonitorFactory.getMonitor(JobAgent.class.getCanonicalName(), jobNumber);
monitor.sendParameter("state", "JA number " + jobNumber + ". Starting Job Agent");
monitor.sendParameter("statenumeric", Long.valueOf(1));
logger = ConfigUtils.getLogger(JobAgent.class.getCanonicalName() + " " + jobNumber);
FileHandler handler = null;
try {
handler = new FileHandler("job-agent-" + jobNumber + ".log");
......@@ -317,6 +323,7 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO, "Starting JobAgent " + jobNumber + " in " + hostName);
logger.log(Level.INFO, siteMap.toString());
try {
logger.log(Level.INFO, "Resources available CPU DISK: " + RUNNING_CPU + " " + RUNNING_DISK);
synchronized (requestSync) {
......@@ -330,6 +337,9 @@ public class JobAgent implements Runnable {
monitor.sendParameter("TTL", siteMap.get("TTL"));
monitor.incrementCounter("requestedjobs");
monitor.sendParameter("state", "JA number " + jobNumber + ". Asking for a job");
monitor.sendParameter("statenumeric", Long.valueOf(3));
final GetMatchJob jobMatch = commander.q_api.getMatchJob(new HashMap<>(siteMap));
matchedJob = jobMatch.getMatchJob();
......@@ -398,11 +408,17 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO, username);
logger.log(Level.INFO, Long.toString(queueId));
monitor.sendParameter("state", "JA number " + jobNumber + ". Starting running job " + queueId);
monitor.sendParameter("statenumeric", Long.valueOf(4));
// process payload
handleJob();
cleanup();
monitor.sendParameter("state", "JA number " + jobNumber + ". Finished running job " + queueId);
monitor.sendParameter("statenumeric", Long.valueOf(5));
synchronized (requestSync) {
RUNNING_CPU += reqCPU;
RUNNING_DISK += reqDisk;
......@@ -420,6 +436,9 @@ public class JobAgent implements Runnable {
// }
}
monitor.sendParameter("state", "JA number " + jobNumber + ". Finished running Job Agent");
monitor.sendParameter("statenumeric", Long.valueOf(6));
logger.log(Level.INFO, "JobAgent finished, id: " + jobAgentId + " totalJobs: " + totalJobs.get());
}
......@@ -584,6 +603,12 @@ public class JobAgent implements Runnable {
// ttl recalculation
int timeleft = computeTimeLeft();
final long jobAgentCurrentTime = System.currentTimeMillis();
final long jobAgentEndTime = jobAgentCurrentTime + timeleft;
monitor.sendParameter("state", "JA number " + jobNumber + ". Started to run in timestamp " + jobAgentCurrentTime +
" and will be allowed to run until timestamp " + jobAgentEndTime);
monitor.sendParameter("statenumeric", Long.valueOf(2));
if (checkParameters() == false)
return false;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment