Commit 59493223 authored by Costin Grigoras's avatar Costin Grigoras
Browse files

Move running sitesonar from JR to JA to report cpu cores and site name

parent 9d311f4c
......@@ -60,6 +60,7 @@ import apmon.BkThread;
import apmon.MonitoredJob;
import lazyj.ExtProperties;
import lia.util.process.ExternalProcesses;
import utils.ProcessWithTimeout;
/**
* Gets matched jobs, and launches JobWrapper for executing them
......@@ -107,9 +108,10 @@ public class JobAgent implements Runnable {
private static float lhcbMarks = -1;
private enum jaStatus {
STARTING_JA(0, "Starting running Job Agent"), REQUESTING_JOB(1, "Asking for a job"), INSTALLING_PKGS(2, "Found a matching job"), JOB_STARTED(3, "Starting processing job's payload"), RUNNING_JOB(
4, "Running job's payload"), DONE(5, "Finished running job"), FINISHING_JA(6, "Finished running Job Agent"), ERROR_IP(-1, "Error getting AliEn jar path"), ERROR_GET_JDL(-2,
"Error getting jdl"), ERROR_DIRS(-3, "Error creating working directories"), ERROR_START(-4, "Error launching Job Wrapper to start job");
STARTING_JA(0, "Starting running Job Agent"), REQUESTING_JOB(1, "Asking for a job"), INSTALLING_PKGS(2, "Found a matching job"), JOB_STARTED(3,
"Starting processing job's payload"), RUNNING_JOB(
4, "Running job's payload"), DONE(5, "Finished running job"), FINISHING_JA(6, "Finished running Job Agent"), ERROR_IP(-1, "Error getting AliEn jar path"), ERROR_GET_JDL(-2,
"Error getting jdl"), ERROR_DIRS(-3, "Error creating working directories"), ERROR_START(-4, "Error launching Job Wrapper to start job");
private final int value;
private final String value_string;
......@@ -235,10 +237,10 @@ public class JobAgent implements Runnable {
try {
handler = new FileHandler("job-agent-" + jobNumber + ".log");
handler.setFormatter(new SimpleFormatter() {
private String format = "%1$tb %1$td, %1$tY %1$tl:%1$tM:%1$tS %1$Tp %2$s JobNumber: " + jobNumber + "%n%4$s: %5$s%n";
private final String format = "%1$tb %1$td, %1$tY %1$tl:%1$tM:%1$tS %1$Tp %2$s JobNumber: " + jobNumber + "%n%4$s: %5$s%n";
@Override
public synchronized String format(LogRecord record) {
public synchronized String format(final LogRecord record) {
String source;
if (record.getSourceClassName() != null) {
source = record.getSourceClassName();
......@@ -249,10 +251,10 @@ public class JobAgent implements Runnable {
else {
source = record.getLoggerName();
}
String message = formatMessage(record);
final String message = formatMessage(record);
String throwable = "";
if (record.getThrown() != null) {
StringWriter sw = new StringWriter();
final StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
pw.println();
record.getThrown().printStackTrace(pw);
......@@ -272,7 +274,7 @@ public class JobAgent implements Runnable {
logger.addHandler(handler);
}
catch (IOException ie) {
catch (final IOException ie) {
logger.log(Level.WARNING, "Problem with getting logger: " + ie.toString());
ie.printStackTrace();
}
......@@ -290,6 +292,8 @@ public class JobAgent implements Runnable {
RUNNING_DISK = Long.valueOf(((Number) siteMap.getOrDefault("Disk", Integer.valueOf(10 * 1024))).longValue());
origTtl = ((Integer) siteMap.get("TTL")).intValue();
RUNNING_JOBAGENTS = 0;
collectSystemInformation();
}
}
......@@ -335,6 +339,34 @@ public class JobAgent implements Runnable {
}
}
private static final String SITESONAR = "/cvmfs/alice.cern.ch/sitesonar/sitesonar.sh";
private void collectSystemInformation() {
final File f = new File(SITESONAR);
if (f.exists() && f.canExecute()) {
final ProcessBuilder pBuilder = new ProcessBuilder(SITESONAR);
pBuilder.environment().put("ALIEN_JDL_CPUCORES", MAX_CPU != null ? MAX_CPU.toString() : "1");
pBuilder.environment().put("ALIEN_SITE", siteMap.getOrDefault("Site", "UNKNOWN").toString());
try {
final Process p = pBuilder.start();
if (p != null) {
final ProcessWithTimeout ptimeout = new ProcessWithTimeout(p, pBuilder);
ptimeout.waitFor(5, TimeUnit.MINUTES);
if (!ptimeout.exitedOk())
logger.log(Level.WARNING, "Sitesonar didn't finish in due time");
}
}
catch (@SuppressWarnings("unused") final IOException | InterruptedException e) {
// ignore
}
}
}
@SuppressWarnings("boxing")
@Override
public void run() {
......@@ -393,8 +425,8 @@ public class JobAgent implements Runnable {
sendBatchInfo();
reqCPU = ((Number) jdl.getLong("CPUCores")).longValue();
reqDisk = 10 * 1024l;
String workdirMaxSize = jdl.gets("Workdirectorysize");
reqDisk = 10 * 1024L;
final String workdirMaxSize = jdl.gets("Workdirectorysize");
final Pattern p = Pattern.compile("\\p{L}");
if (workdirMaxSize != null) {
......@@ -574,16 +606,19 @@ public class JobAgent implements Runnable {
* @return false if we can't run because of current conditions, true if positive
*/
public boolean checkParameters() {
int timeleft = computeTimeLeft();
final int timeleft = computeTimeLeft();
if (timeleft <= 0)
return false;
if (RUNNING_DISK.longValue() <= 10 * 1024) {
logger.log(Level.INFO, "There is not enough space left: " + RUNNING_DISK);
if (!System.getenv().containsKey("JALIEN_IGNORE_STORAGE")) {
logger.log(Level.WARNING, "There is not enough space left: " + RUNNING_DISK);
return false;
}
return false;
logger.log(Level.INFO, "Ignoring the reported local disk space of " + RUNNING_DISK);
}
if (RUNNING_CPU.longValue() <= 0)
return false;
......@@ -621,7 +656,7 @@ public class JobAgent implements Runnable {
logger.log(Level.INFO, "Updating dynamic parameters of jobAgent map");
// ttl recalculation
int timeleft = computeTimeLeft();
final int timeleft = computeTimeLeft();
if (checkParameters() == false)
return false;
......@@ -900,11 +935,11 @@ public class JobAgent implements Runnable {
if (code != 0) {
// Looks like something went wrong. Let's check the last reported status
final String lastStatus = readWrapperStatus();
if (lastStatus.equals("STARTED") || lastStatus.equals("RUNNING")) {
if ("STARTED".equals(lastStatus) || "RUNNING".equals(lastStatus)) {
commander.q_api.putJobLog(queueId, "trace", "ERROR: The JobWrapper was killed before job could complete");
changeJobStatus(JobStatus.ERROR_E, null); // JobWrapper was killed before the job could be completed
}
else if (lastStatus.equals("SAVING")) {
else if ("SAVING".equals(lastStatus)) {
commander.q_api.putJobLog(queueId, "trace", "ERROR: The JobWrapper was killed during saving");
changeJobStatus(JobStatus.ERROR_SV, null); // JobWrapper was killed during saving
}
......@@ -912,7 +947,7 @@ public class JobAgent implements Runnable {
}
}
private void setStatus(jaStatus new_status) {
private void setStatus(final jaStatus new_status) {
status = new_status;
monitor.sendParameter("ja_status_string", status.getStringValue());
monitor.sendParameter("ja_status", Integer.valueOf(status.getValue()));
......@@ -1107,8 +1142,6 @@ public class JobAgent implements Runnable {
catch (final Exception e) {
logger.log(Level.WARNING, "JA cannot update ML of the job status change", e);
}
return;
}
private String readWrapperStatus() {
......@@ -1256,7 +1289,7 @@ public class JobAgent implements Runnable {
private void sendBatchInfo() {
for (final String var : batchSystemVars) {
if (env.containsKey(var)) {
if (var.equals("_CONDOR_JOB_AD")) {
if ("_CONDOR_JOB_AD".equals(var)) {
try {
final List<String> lines = Files.readAllLines(Paths.get(env.get(var)));
for (final String line : lines) {
......
package alien.site;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import alien.config.ConfigUtils;
import alien.monitoring.Monitor;
import alien.monitoring.MonitorFactory;
import utils.ProcessWithTimeout;
/**
* @author sweisz
......@@ -56,8 +52,6 @@ public class JobRunner extends JobAgent {
final int maxRetries = Integer.parseInt(System.getenv().getOrDefault("MAX_RETRIES", "5"));
collectSystemInformation();
while (timestamp < ttlEnd) {
synchronized (JobAgent.requestSync) {
try {
......@@ -101,31 +95,6 @@ public class JobRunner extends JobAgent {
System.out.println("JobRunner Exiting");
}
private static final String SITESONAR = "/cvmfs/alice.cern.ch/sitesonar/sitesonar.sh";
private static void collectSystemInformation() {
final File f = new File(SITESONAR);
if (f.exists() && f.canExecute()) {
final ProcessBuilder pBuilder = new ProcessBuilder(SITESONAR);
try {
final Process p = pBuilder.start();
if (p != null) {
final ProcessWithTimeout ptimeout = new ProcessWithTimeout(p, pBuilder);
ptimeout.waitFor(5, TimeUnit.MINUTES);
if (!ptimeout.exitedOk())
logger.log(Level.WARNING, "Sitesonar didn't finish in due time");
}
}
catch (@SuppressWarnings("unused") final IOException | InterruptedException e) {
// ignore
}
}
}
public static void main(final String[] args) {
ConfigUtils.setApplicationName("JobRunner");
ConfigUtils.switchToForkProcessLaunching();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment