diff --git a/src/main/java/ch/cern/nile/common/StreamingApplication.java b/src/main/java/ch/cern/nile/common/StreamingApplication.java index cc719bec0d63e8d304056b6581b3645a290cd972..98d67b04ba2c4b40dfa98374d1f0d7c623e58945 100644 --- a/src/main/java/ch/cern/nile/common/StreamingApplication.java +++ b/src/main/java/ch/cern/nile/common/StreamingApplication.java @@ -1,12 +1,12 @@ package ch.cern.nile.common; - import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Properties; +import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +16,6 @@ import ch.cern.nile.common.clients.KafkaStreamsClient; import ch.cern.nile.common.configuration.PropertiesCheck; import ch.cern.nile.common.configuration.StreamType; import ch.cern.nile.common.configuration.properties.CommonProperties; -import ch.cern.nile.common.exceptions.StreamingException; import ch.cern.nile.common.streams.Streaming; /** @@ -27,6 +26,7 @@ import ch.cern.nile.common.streams.Streaming; public final class StreamingApplication { private static final Logger LOGGER = LoggerFactory.getLogger(StreamingApplication.class); + private static final int MIN_ARGS_LENGTH = 1; private StreamingApplication() { @@ -39,7 +39,7 @@ public final class StreamingApplication { * * @param args command-line arguments, expecting the path to the properties file as the first argument * @throws IllegalArgumentException if the properties file path is not provided - * @throws StreamingException if there are issues with loading the properties file, + * @throws StreamsException if there are issues with loading the properties file, * validating properties, or starting the streaming process */ @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "This method is only used internally") @@ -55,11 +55,10 @@ public final class StreamingApplication { } catch (IOException e) { final String message = "Error while loading the properties file"; LOGGER.error(message, e); - throw new StreamingException(message, e); + throw new StreamsException(message, e); } - final StreamType sType = - StreamType.valueOf(configs.getProperty(CommonProperties.STREAM_TYPE.getValue(), null)); + final StreamType sType = StreamType.valueOf(configs.getProperty(CommonProperties.STREAM_TYPE.getValue(), null)); PropertiesCheck.validateProperties(configs, sType); @@ -67,8 +66,7 @@ public final class StreamingApplication { client.configure(configs); try { - final Class<?> clazz = - Class.forName(configs.getProperty(CommonProperties.STREAM_CLASS.getValue())); + final Class<?> clazz = Class.forName(configs.getProperty(CommonProperties.STREAM_CLASS.getValue())); final Streaming streaming; streaming = (Streaming) clazz.getDeclaredConstructor().newInstance(); streaming.configure(configs); @@ -77,7 +75,7 @@ public final class StreamingApplication { | InvocationTargetException | NoSuchMethodException e) { final String message = "Error while starting the stream"; LOGGER.error(message, e); - throw new StreamingException(message, e); + throw new StreamsException(message, e); } } diff --git a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java index 5ec6f0519060a1fe628532243890513f3600a338..a07914e6cd79a8b65d26a60e7e9d3e3cceb5eb28 100644 --- a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java +++ b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java @@ -12,6 +12,9 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import ch.cern.nile.common.configuration.Configure; import ch.cern.nile.common.configuration.properties.ClientProperties; @@ -23,6 +26,12 @@ import ch.cern.nile.common.json.JsonSerde; */ public class KafkaStreamsClient implements Configure { + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsClient.class); + + private static final String SECURITY_PROTOCOL = "SASL_SSL"; + private static final String SASL_MECHANISM = "GSSAPI"; + private static final String SASL_KERBEROS_SERVICE_NAME = "kafka"; + private static final String TEST_CLUSTER_NAME = "test"; private Properties properties; @@ -38,28 +47,28 @@ public class KafkaStreamsClient implements Configure { @Override public void configure(final Properties props) { final String clientId = props.getProperty(ClientProperties.CLIENT_ID.getValue()); - this.properties = new Properties(); - this.properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId); - this.properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); + properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId); + properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); final String kafkaCluster = props.getProperty(ClientProperties.KAFKA_CLUSTER.getValue()); if (TEST_CLUSTER_NAME.equals(kafkaCluster)) { - this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("bootstrap.servers")); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("bootstrap.servers")); } else { - this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster)); - this.properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); - this.properties.put(SaslConfigs.SASL_MECHANISM, "GSSAPI"); - this.properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); - this.properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster)); + properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL); + properties.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM); + properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SASL_KERBEROS_SERVICE_NAME); + properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, props.getProperty(ClientProperties.TRUSTSTORE_LOCATION.getValue())); } - this.properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - this.properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - this.properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName()); - this.properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class.getName()); } @@ -70,7 +79,14 @@ public class KafkaStreamsClient implements Configure { * @return a configured KafkaStreams instance */ public KafkaStreams create(final Topology topology) { - return new KafkaStreams(topology, properties); + try { + return new KafkaStreams(topology, properties); + } catch (StreamsException e) { + final String message = + "Failed to create KafkaStreams instance with properties: " + properties; + LOGGER.error(message, e); + throw new StreamsException(message, e); + } } /** @@ -78,14 +94,14 @@ public class KafkaStreamsClient implements Configure { * * @param kafkaCluster the domain of the Kafka cluster * @return a comma-separated list of hostnames with port 9093 - * @throws RuntimeException if the hostname resolution fails */ private String reverseDnsLookup(final String kafkaCluster) { try { return performDnsLookup(kafkaCluster); } catch (UnknownHostException e) { - throw new ReverseDnsLookupException( - "Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster, e); + final String message = "Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster; + LOGGER.error(message, e); + throw new ReverseDnsLookupException(message, e); } } diff --git a/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java b/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java deleted file mode 100644 index de716e6c231167c75b8fa568285566edf2193206..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java +++ /dev/null @@ -1,17 +0,0 @@ -package ch.cern.nile.common.exceptions; - -import java.io.Serial; - -public class StreamingException extends RuntimeException { - - @Serial - private static final long serialVersionUID = 1L; - - public StreamingException(final Throwable cause) { - super(cause); - } - - public StreamingException(final String message, final Throwable cause) { - super(message, cause); - } -} diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java index 6a77da1046132e861cd075df815bcbafba7c67cb..0fd0abd4ed5e5dc57cea3ca3cbd6bd915f8c08bc 100644 --- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java +++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java @@ -6,6 +6,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +19,6 @@ import lombok.Setter; import ch.cern.nile.common.clients.KafkaStreamsClient; import ch.cern.nile.common.configuration.properties.ClientProperties; import ch.cern.nile.common.configuration.properties.DecodingProperties; -import ch.cern.nile.common.exceptions.StreamingException; import ch.cern.nile.common.probes.Health; /** @@ -70,7 +70,6 @@ public abstract class AbstractStream implements Streaming { * to the JVM and terminates the JVM upon completion of the stream. * * @param kafkaStreamsClient the client used to create and manage the Kafka Streams instance - * @throws StreamingException if an error occurs during streaming */ @Override @SuppressWarnings("PMD.DoNotTerminateVM") @@ -124,7 +123,7 @@ public abstract class AbstractStream implements Streaming { latch.await(); } catch (InterruptedException e) { LOGGER.error("Error while waiting for latch", e); - throw new StreamingException("Error while waiting for latch", e); + throw new StreamsException("Error while waiting for latch", e); } }