diff --git a/src/main/java/ch/cern/nile/common/StreamingApplication.java b/src/main/java/ch/cern/nile/common/StreamingApplication.java
index cc719bec0d63e8d304056b6581b3645a290cd972..98d67b04ba2c4b40dfa98374d1f0d7c623e58945 100644
--- a/src/main/java/ch/cern/nile/common/StreamingApplication.java
+++ b/src/main/java/ch/cern/nile/common/StreamingApplication.java
@@ -1,12 +1,12 @@
 package ch.cern.nile.common;
 
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Properties;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -16,7 +16,6 @@ import ch.cern.nile.common.clients.KafkaStreamsClient;
 import ch.cern.nile.common.configuration.PropertiesCheck;
 import ch.cern.nile.common.configuration.StreamType;
 import ch.cern.nile.common.configuration.properties.CommonProperties;
-import ch.cern.nile.common.exceptions.StreamingException;
 import ch.cern.nile.common.streams.Streaming;
 
 /**
@@ -27,6 +26,7 @@ import ch.cern.nile.common.streams.Streaming;
 public final class StreamingApplication {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(StreamingApplication.class);
+
     private static final int MIN_ARGS_LENGTH = 1;
 
     private StreamingApplication() {
@@ -39,7 +39,7 @@ public final class StreamingApplication {
      *
      * @param args command-line arguments, expecting the path to the properties file as the first argument
      * @throws IllegalArgumentException if the properties file path is not provided
-     * @throws StreamingException       if there are issues with loading the properties file,
+     * @throws StreamsException         if there are issues with loading the properties file,
      *                                  validating properties, or starting the streaming process
      */
     @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "This method is only used internally")
@@ -55,11 +55,10 @@ public final class StreamingApplication {
         } catch (IOException e) {
             final String message = "Error while loading the properties file";
             LOGGER.error(message, e);
-            throw new StreamingException(message, e);
+            throw new StreamsException(message, e);
         }
 
-        final StreamType sType =
-                StreamType.valueOf(configs.getProperty(CommonProperties.STREAM_TYPE.getValue(), null));
+        final StreamType sType = StreamType.valueOf(configs.getProperty(CommonProperties.STREAM_TYPE.getValue(), null));
 
         PropertiesCheck.validateProperties(configs, sType);
 
@@ -67,8 +66,7 @@ public final class StreamingApplication {
         client.configure(configs);
 
         try {
-            final Class<?> clazz =
-                    Class.forName(configs.getProperty(CommonProperties.STREAM_CLASS.getValue()));
+            final Class<?> clazz = Class.forName(configs.getProperty(CommonProperties.STREAM_CLASS.getValue()));
             final Streaming streaming;
             streaming = (Streaming) clazz.getDeclaredConstructor().newInstance();
             streaming.configure(configs);
@@ -77,7 +75,7 @@ public final class StreamingApplication {
                  | InvocationTargetException | NoSuchMethodException e) {
             final String message = "Error while starting the stream";
             LOGGER.error(message, e);
-            throw new StreamingException(message, e);
+            throw new StreamsException(message, e);
         }
     }
 
diff --git a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java
index 5ec6f0519060a1fe628532243890513f3600a338..a07914e6cd79a8b65d26a60e7e9d3e3cceb5eb28 100644
--- a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java
+++ b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java
@@ -12,6 +12,9 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import ch.cern.nile.common.configuration.Configure;
 import ch.cern.nile.common.configuration.properties.ClientProperties;
@@ -23,6 +26,12 @@ import ch.cern.nile.common.json.JsonSerde;
  */
 public class KafkaStreamsClient implements Configure {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamsClient.class);
+
+    private static final String SECURITY_PROTOCOL = "SASL_SSL";
+    private static final String SASL_MECHANISM = "GSSAPI";
+    private static final String SASL_KERBEROS_SERVICE_NAME = "kafka";
+
     private static final String TEST_CLUSTER_NAME = "test";
 
     private Properties properties;
@@ -38,28 +47,28 @@ public class KafkaStreamsClient implements Configure {
     @Override
     public void configure(final Properties props) {
         final String clientId = props.getProperty(ClientProperties.CLIENT_ID.getValue());
-        this.properties = new Properties();
-        this.properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
-        this.properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
+        properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId);
+        properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId);
 
         final String kafkaCluster = props.getProperty(ClientProperties.KAFKA_CLUSTER.getValue());
 
         if (TEST_CLUSTER_NAME.equals(kafkaCluster)) {
-            this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("bootstrap.servers"));
+            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("bootstrap.servers"));
         } else {
-            this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster));
-            this.properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
-            this.properties.put(SaslConfigs.SASL_MECHANISM, "GSSAPI");
-            this.properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka");
-            this.properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
+            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster));
+            properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL);
+            properties.put(SaslConfigs.SASL_MECHANISM, SASL_MECHANISM);
+            properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, SASL_KERBEROS_SERVICE_NAME);
+            properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                     props.getProperty(ClientProperties.TRUSTSTORE_LOCATION.getValue()));
         }
 
-        this.properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        this.properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-        this.properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+        properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
                 LogAndContinueExceptionHandler.class.getName());
-        this.properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
+        properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                 DefaultProductionExceptionHandler.class.getName());
     }
 
@@ -70,7 +79,14 @@ public class KafkaStreamsClient implements Configure {
      * @return a configured KafkaStreams instance
      */
     public KafkaStreams create(final Topology topology) {
-        return new KafkaStreams(topology, properties);
+        try {
+            return new KafkaStreams(topology, properties);
+        } catch (StreamsException e) {
+            final String message =
+                    "Failed to create KafkaStreams instance with properties: " + properties;
+            LOGGER.error(message, e);
+            throw new StreamsException(message, e);
+        }
     }
 
     /**
@@ -78,14 +94,14 @@ public class KafkaStreamsClient implements Configure {
      *
      * @param kafkaCluster the domain of the Kafka cluster
      * @return a comma-separated list of hostnames with port 9093
-     * @throws RuntimeException if the hostname resolution fails
      */
     private String reverseDnsLookup(final String kafkaCluster) {
         try {
             return performDnsLookup(kafkaCluster);
         } catch (UnknownHostException e) {
-            throw new ReverseDnsLookupException(
-                    "Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster, e);
+            final String message = "Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster;
+            LOGGER.error(message, e);
+            throw new ReverseDnsLookupException(message, e);
         }
     }
 
diff --git a/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java b/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java
deleted file mode 100644
index de716e6c231167c75b8fa568285566edf2193206..0000000000000000000000000000000000000000
--- a/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package ch.cern.nile.common.exceptions;
-
-import java.io.Serial;
-
-public class StreamingException extends RuntimeException {
-
-    @Serial
-    private static final long serialVersionUID = 1L;
-
-    public StreamingException(final Throwable cause) {
-        super(cause);
-    }
-
-    public StreamingException(final String message, final Throwable cause) {
-        super(message, cause);
-    }
-}
diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
index 6a77da1046132e861cd075df815bcbafba7c67cb..0fd0abd4ed5e5dc57cea3ca3cbd6bd915f8c08bc 100644
--- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
+++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
@@ -6,6 +6,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,7 +19,6 @@ import lombok.Setter;
 import ch.cern.nile.common.clients.KafkaStreamsClient;
 import ch.cern.nile.common.configuration.properties.ClientProperties;
 import ch.cern.nile.common.configuration.properties.DecodingProperties;
-import ch.cern.nile.common.exceptions.StreamingException;
 import ch.cern.nile.common.probes.Health;
 
 /**
@@ -70,7 +70,6 @@ public abstract class AbstractStream implements Streaming {
      * to the JVM and terminates the JVM upon completion of the stream.
      *
      * @param kafkaStreamsClient the client used to create and manage the Kafka Streams instance
-     * @throws StreamingException if an error occurs during streaming
      */
     @Override
     @SuppressWarnings("PMD.DoNotTerminateVM")
@@ -124,7 +123,7 @@ public abstract class AbstractStream implements Streaming {
             latch.await();
         } catch (InterruptedException e) {
             LOGGER.error("Error while waiting for latch", e);
-            throw new StreamingException("Error while waiting for latch", e);
+            throw new StreamsException("Error while waiting for latch", e);
         }
 
     }