Commit 9659db27 authored by Costin Grigoras's avatar Costin Grigoras
Browse files

Limit the number of concurrent queries to the QUEUE table / setStatus

parent 64028509
......@@ -28,6 +28,7 @@ import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -670,6 +671,8 @@ public class TaskQueueUtils {
private static ApMon centralMLService = null;
private static final Semaphore limiter = new Semaphore(ConfigUtils.getConfig().geti("alien.taskQueue.TaskQueueUtils.setJobStatus_semaphores", 10), true);
/**
* @param job
* @param newStatus
......@@ -705,13 +708,25 @@ public class TaskQueueUtils {
final int queryRetriesMax = 3;
for (int retries = 0;; retries++) {
if (!db.query(q, false, Long.valueOf(job))) {
logger.log(Level.SEVERE, "Error executing the select query from QUEUE");
if (retries == queryRetriesMax - 1)
return false;
try {
limiter.acquire();
}
catch (@SuppressWarnings("unused") final InterruptedException e) {
return false;
}
try {
if (!db.query(q, false, Long.valueOf(job))) {
logger.log(Level.SEVERE, "Error executing the select query from QUEUE");
if (retries == queryRetriesMax - 1)
return false;
}
else
break;
}
finally {
limiter.release();
}
else
break;
}
if (!db.moveNext()) {
......@@ -760,12 +775,24 @@ public class TaskQueueUtils {
db.setQueryTimeout(600);
for (int retries = 0;; retries++) {
if (!db.query(q, false, newstatus, Long.valueOf(job))) {
if (retries == queryRetriesMax - 1)
return false;
try {
limiter.acquire();
}
catch (@SuppressWarnings("unused") final InterruptedException e) {
return false;
}
try {
if (!db.query(q, false, newstatus, Long.valueOf(job))) {
if (retries == queryRetriesMax - 1)
return false;
}
else
break;
}
finally {
limiter.release();
}
else
break;
}
final boolean updated = db.getUpdateCount() != 0;
......@@ -2726,7 +2753,7 @@ public class TaskQueueUtils {
/**
* @param key
*
*
*/
public LookupTable(final String key) {
query = "SELECT " + key.toLowerCase() + "id FROM QUEUE_" + key + " WHERE " + key.toLowerCase() + "=?";
......@@ -2739,7 +2766,7 @@ public class TaskQueueUtils {
}
@Override
protected Integer resolve(String key) {
protected Integer resolve(final String key) {
try (DBFunctions db = getQueueDB()) {
if (db == null)
return null;
......@@ -2772,9 +2799,9 @@ public class TaskQueueUtils {
* @return value for this key
*/
public static int getOrInsertFromLookupTable(final String key, final String value) {
LookupTable cache = lookupTable.computeIfAbsent(key.toUpperCase(), (k) -> new LookupTable(k));
final LookupTable cache = lookupTable.computeIfAbsent(key.toUpperCase(), (k) -> new LookupTable(k));
Integer i = cache.get(value);
final Integer i = cache.get(value);
if (i != null)
return i.intValue();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment