Skip to content
Snippets Groups Projects
Commit 96fc0d56 authored by Andrei Kazarov's avatar Andrei Kazarov
Browse files

fixes for reload (use of RW mutex)

parent ae5a835c
Branches master
No related tags found
No related merge requests found
Pipeline #
......@@ -7,12 +7,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.RuntimeErrorException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -25,8 +22,7 @@ import ch.cern.tdaq.cassandra.injector.Injectors;
import ch.cern.tdaq.cassandra.reader.ConfigReader;
import ch.cern.tdaq.cassandra.directive.Schema;
import ch.cern.tdaq.cassandra.modification.TestCallback;
import ch.cern.tdaq.cassandra.event.TestEvent;
import ch.cern.tdaq.cassandra.modification.UpdateCallback;
import static java.util.concurrent.TimeUnit.*;
......@@ -99,8 +95,14 @@ public final class AALEngine {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// multithreading protection for dynamic reloading of EPL statements and sending events from different threads
private final Lock read = readWriteLock.readLock();
private final Lock write = readWriteLock.writeLock();
public final void readLock() { read.lock() ; }
public final void readUnlock() { read.unlock() ; }
public final void writeLock() { write.lock() ; }
public final void writeUnlock() { write.unlock() ; }
/**
* Static method to get the handle to the unique instance of the engine
......@@ -110,6 +112,14 @@ public final class AALEngine {
return INSTANCE;
}
/**
* Static method to get the handle to the unique instance of the Esper
* @return
*/
public final CEPProvider getEsper() {
return esper;
}
private AALEngine() {
configuration = ch.cern.tdaq.cassandra.config.Configuration.getInstance();
......@@ -249,7 +259,7 @@ public final class AALEngine {
// Add a subscription
if (configuration.getRDBConfig().contains("rdbconfig")){
config.Subscription c = new config.Subscription(new TestCallback(db, esper), "Changing in database!");
config.Subscription c = new config.Subscription(new UpdateCallback(db, this), "Changing in database!");
db.subscribe(c);
}
......@@ -345,7 +355,6 @@ public final class AALEngine {
injectorCallbackPool.shutdown();
//Stopping periodic thread pool
periodicScheduler.shutdown();
// close database connections
......@@ -365,19 +374,15 @@ public final class AALEngine {
Runnable task = new Runnable() {
@Override
public void run() {
read.lock();
public void run() {
readLock();
try {
esper.sendEvent(event);
} catch (Exception a) {
log.error("Very bad! Unexpected exception thrown "+ a);
} finally {
read.unlock();
}
} finally {
readUnlock();
}
}
};
try {
......@@ -393,8 +398,6 @@ public final class AALEngine {
}
}
public void addInjector(String name, Injector inj){
injectors.put(name, inj);
}
......
......@@ -73,14 +73,10 @@ public final class CEPProvider {
//---------------------------------------------------------------------------------
// Methods for loading, removing and reloading statements and directives
public void registerStatement(String statement) {
epAdministrator.createEPL(statement);
}
public void registerStatement(String statement, String statementID) {
epAdministrator.createEPL(statement, statementID);
public void registerStatement(String statement, String statementID) {
epAdministrator.createEPL(statement, statementID); // throws EPException
// initial statemets do not have listeners
log.info("CREATED INITSTMT: "+statementID);
}
......@@ -91,7 +87,7 @@ public final class CEPProvider {
for(SAInitialStatement i : configurationStatements) {
try {
registerStatement(i.get_code(), i.UID());
} catch (EPStatementSyntaxException e) {
} catch (EPException e) {
log.error("Syntax Error in statement: "+e);
}
}
......@@ -102,14 +98,15 @@ public final class CEPProvider {
try {
for (SAInitialStatement initepl : d.get_initialstatements()) {
epAdministrator.createEPL(initepl.get_code(), initepl.UID(), d);
log.info("CREATED INIT EPLSTMT: " + initepl.UID() + " for directive " + d.UID());
}
EPStatement stmt = epAdministrator.createEPL(d.get_epl().get_code(), d.get_epl().UID(), d);
log.info("CREATED EPLSTMT: "+d.get_epl().UID());
log.info("CREATED EPLSTMT: "+d.get_epl().UID() + " for directive " + d.UID());
List<AlertListener> alist = AlertListener.fromListToAlertList(d.get_listeners(), d.UID());
for(AlertListener l: alist)
stmt.addListener(l);
} catch (EPException e) {
log.error(d.UID()+"Invalid Directive found: "+e);
log.error(d.UID()+": invalid directive: "+e);
throw new InvalidDirectiveException(e.getMessage());
}
}
......@@ -155,7 +152,7 @@ public final class CEPProvider {
try {
registerStatement(newStatement, statementID);
} catch (EPStatementSyntaxException e) {
} catch (EPException e) {
log.error("Syntax Error in statement: "+e);
}
......
package ch.cern.tdaq.cassandra.modification;
import ch.cern.tdaq.cassandra.AALEngine;
import ch.cern.tdaq.cassandra.CEPProvider;
......@@ -8,12 +9,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Arrays;
import com.espertech.esper.client.EPStatementSyntaxException;
import com.espertech.esper.client.EPException;
import java.lang.String;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
......@@ -31,24 +30,20 @@ import ch.cern.tdaq.cassandra.schema.SAConfig;
import ch.cern.tdaq.cassandra.schema.SAConfig_Helper;
public class TestCallback implements config.Callback {
public class UpdateCallback implements config.Callback {
private static final Log log = LogFactory.getLog(TestCallback.class);
private static final Log log = LogFactory.getLog(UpdateCallback.class);
private final CEPProvider esper;
private final AALEngine engine ;
private config.Configuration db;
public TestCallback(config.Configuration d, CEPProvider e) {
public UpdateCallback(config.Configuration d, AALEngine e) {
this.db = d;
this.esper = e;
this.esper = e.getEsper();
this.engine = e ;
}
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock write = readWriteLock.writeLock();
public void process_changes(config.Change[] changes, java.lang.Object parameter) {
......@@ -71,13 +66,10 @@ public class TestCallback implements config.Callback {
classesChanged.put(change.get_class_name(), change);
}
SAConfig updatedConfig = SAConfig_Helper.get(db, "main");
// esper.getEpServiceProvider().getEngineInstanceWideLock().writeLock().lock();
write.lock();
// lock sending new events to CEP engine
engine.writeLock();
// Start atomic management unit.
// Any events concurrently being processed by other threads must complete before the code completes obtaining the lock.
// Any events sent in by other threads will await the release of the lock.
......@@ -88,26 +80,19 @@ public class TestCallback implements config.Callback {
finally {
// Complete atomic management unit.
// Any events sent in by other threads will now continue processing against the changed set of statements.
write.unlock();
// esper.getEpServiceProvider().getEngineInstanceWideLock().writeLock().unlock();
engine.writeUnlock();
}
}
public void reload(Map<String, Change> classesChanged, SAConfig updatedConfig) {
for (SAInitialStatement initstmt : Schema.parseInitialStatements()) {
if (! Arrays.asList(esper.getEpAdministrator().getStatementNames()).contains(initstmt.UID())) {
try {
esper.registerStatement(initstmt.get_code(), initstmt.UID());
log.info("CREATED INITIAL STATEMENT "+initstmt.UID());
} catch (EPStatementSyntaxException e) {
} catch (EPException e) {
log.error("Syntax Error in statement: "+e);
}
......@@ -120,8 +105,8 @@ public class TestCallback implements config.Callback {
log.info("REMOVED INITSTMT "+initstmt.UID());
esper.registerStatement(initstmt.get_code(), initstmt.UID());
log.info("CREATED INITSTMT "+initstmt.UID());
} catch (EPStatementSyntaxException e) {
log.error("Syntax Error in statement: "+e);
} catch (EPException e) {
log.error("Syntax Error in statement: "+e);
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment