diff --git a/CHANGELOG.MD b/CHANGELOG.MD
index 045bd75f2eacf30cb717596bdc40a8ef793732a9..5864734a33c1bc0b71dc7e049327be10c9f5465e 100644
--- a/CHANGELOG.MD
+++ b/CHANGELOG.MD
@@ -6,6 +6,13 @@
 - The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Entry types: Added, Changed, Deprecated, Removed, Fixed, Security
 
+
+## [1.3.0] - 2024-10-18
+## Added
+- Added new StreamType: `CONVERSION`
+- Add DLQ support to AbstractStream
+- Implement JsonArraySerde for objects containing JsonArray types
+
 ## [1.2.0] - 2024-10-18
 ## Added
 - Added new StreamType: `SCHEMA_TRANSFORMATION`
diff --git a/pom.xml b/pom.xml
index 5b98da729e992f04d86aef799d3c31a5d1487d97..69f61c4380036d73cdd8fc7a5d6c3d99c81e8423 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>ch.cern.nile</groupId>
     <artifactId>nile-common</artifactId>
-    <version>1.2.0</version>
+    <version>1.3.0</version>
 
     <properties>
         <maven.compiler.release>21</maven.compiler.release>
diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
index dd62d28c7e418318fdd9f3e70c672a8ec1934d46..5686f10e35f6c9e5df44ffc7f1fddb8b8a808170 100644
--- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
+++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
@@ -6,6 +6,7 @@ import java.util.Set;
 
 import ch.cern.nile.common.configuration.properties.ClientProperties;
 import ch.cern.nile.common.configuration.properties.CommonProperties;
+import ch.cern.nile.common.configuration.properties.ConversionProperties;
 import ch.cern.nile.common.configuration.properties.DecodingProperties;
 import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
 import ch.cern.nile.common.configuration.properties.PropertyEnum;
@@ -26,6 +27,7 @@ public final class PropertiesCheck {
     private static final Set<String> ENRICHMENT_PROPERTIES = PropertyEnum.getValues(EnrichmentProperties.class);
     private static final Set<String> SCHEMA_TRANSFORMATION_PROPERTIES = PropertyEnum.getValues(
             SchemaTransformationProperties.class);
+    private static final Set<String> CONVERSION_PROPERTIES = PropertyEnum.getValues(ConversionProperties.class);
 
     private PropertiesCheck() {
     }
@@ -51,8 +53,8 @@ public final class PropertiesCheck {
             case DECODING:
                 validateRequiredProperties(properties, DECODING_PROPERTIES);
                 break;
-            case ROUTING:
-                validateRequiredProperties(properties, ROUTING_PROPERTIES);
+            case CONVERSION:
+                validateRequiredProperties(properties, CONVERSION_PROPERTIES);
                 break;
             case ENRICHMENT:
                 validateRequiredProperties(properties, ENRICHMENT_PROPERTIES);
@@ -60,6 +62,9 @@ public final class PropertiesCheck {
             case SCHEMA_TRANSFORMATION:
                 validateRequiredProperties(properties, SCHEMA_TRANSFORMATION_PROPERTIES);
                 break;
+            case ROUTING:
+                validateRequiredProperties(properties, ROUTING_PROPERTIES);
+                break;
             default:
                 // Cannot happen as the stream type is validated before this switch statement.
                 throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType));
diff --git a/src/main/java/ch/cern/nile/common/configuration/StreamType.java b/src/main/java/ch/cern/nile/common/configuration/StreamType.java
index e396268da7856fbbfbcba22c7caeedad123f56c0..06f4051c2b423f7aefdfebff37c7f639905cc9e1 100644
--- a/src/main/java/ch/cern/nile/common/configuration/StreamType.java
+++ b/src/main/java/ch/cern/nile/common/configuration/StreamType.java
@@ -5,9 +5,10 @@ package ch.cern.nile.common.configuration;
  */
 public enum StreamType {
 
-    ROUTING,
     DECODING,
     ENRICHMENT,
-    SCHEMA_TRANSFORMATION
+    CONVERSION,
+    SCHEMA_TRANSFORMATION,
+    ROUTING
 
 }
diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
new file mode 100644
index 0000000000000000000000000000000000000000..867c180b3a709379a1f40d979fce7e0b976899ff
--- /dev/null
+++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
@@ -0,0 +1,15 @@
+package ch.cern.nile.common.configuration.properties;
+
+import lombok.Getter;
+
+/**
+ * Enum representing properties specific to conversion clients.
+ */
+@Getter
+public enum ConversionProperties implements PropertyEnum {
+    CLOCK_CHANNEL_NAME("clock.channel.name");
+
+    private final String value;
+
+    ConversionProperties(final String value) { this.value = value; }
+}
diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
new file mode 100644
index 0000000000000000000000000000000000000000..a8eb4fe9d88a21132eebac7af8e1d8f45de51c0f
--- /dev/null
+++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
@@ -0,0 +1,65 @@
+package ch.cern.nile.common.json;
+
+import java.util.Map;
+
+import com.google.gson.JsonArray;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+/** JsonArray Serde implementation.
+ * @author icoteril
+ * @version 1.3.0
+ * @since 1.3.0
+ */
+public class JsonArraySerde implements Serde<JsonArray>  {
+
+    private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>();
+    private final JsonPojoDeserializer<JsonArray> jsonDeserializer = new JsonPojoDeserializer<>(JsonArray.class);
+
+    /**
+     * Configures this Serde with the given properties. This method configures both the internal
+     * serializer and deserializer with the provided configuration settings.
+     *
+     * @param configs the properties from the consumer or producer configuration
+     * @param isKey   indicates whether this Serde is being used for key serialization/deserialization
+     *                This parameter is ignored in this implementation
+     */
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        jsonSerializer.configure(configs, isKey);
+        jsonDeserializer.configure(configs, isKey);
+    }
+
+    /**
+     * Closes this Serde. This method closes both the internal serializer and deserializer.
+     */
+    @Override
+    public void close() {
+        jsonSerializer.close();
+        jsonDeserializer.close();
+    }
+
+    /**
+     * Returns the serializer component of this Serde.
+     *
+     * @return The {@link JsonPojoSerializer} for serializing JSON objects
+     */
+    @Override
+    public Serializer<JsonArray> serializer() {
+        return jsonSerializer;
+    }
+
+    /**
+     * Returns the deserializer component of this Serde.
+     *
+     * @return The {@link JsonPojoDeserializer} for deserializing JSON objects
+     */
+    @Override
+    public Deserializer<JsonArray> deserializer() {
+        return jsonDeserializer;
+    }
+
+}
diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
index d84c8ac62774aef7ebc533f6d38df619de12771e..e03a09a85af2b7cd4f202d018114a4f95d75af63 100644
--- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
+++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
@@ -1,5 +1,6 @@
 package ch.cern.nile.common.streams;
 
+import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
@@ -7,6 +8,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,9 @@ public abstract class AbstractStream implements Streaming {
     @Getter(AccessLevel.PROTECTED)
     private String schemaRegistryUrl;
 
+    @Getter(AccessLevel.PROTECTED)
+    private String dlqTopic;
+
     @Setter(AccessLevel.PROTECTED)
     private long lastReadOffset = -2;
 
@@ -112,10 +117,14 @@ public abstract class AbstractStream implements Streaming {
         final StreamsBuilder builder = new StreamsBuilder();
         sourceTopic = properties.getProperty(ClientProperties.SOURCE_TOPIC.getValue());
         sinkTopic = properties.getProperty(DecodingProperties.SINK_TOPIC.getValue());
+        dlqTopic = String.join("-", Arrays.asList(sinkTopic, "dlq"));
         schemaRegistryUrl = properties.getProperty(SchemaTransformationProperties.SCHEMA_REGISTRY_URL.getValue());
         createTopology(builder);
         final Topology topology = builder.build();
         streams = kafkaStreamsClient.create(topology);
+        streams.setUncaughtExceptionHandler(
+                (exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
+        );
         health = new Health(streams);
         latch = new CountDownLatch(1);
     }
diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
index 17421b890fc189c4f78593ca419ce437e933eca3..045d28d3645bca19237c549ab78c764035912b6c 100644
--- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
+++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
@@ -7,8 +7,10 @@ import java.util.Properties;
 
 import org.junit.jupiter.api.Test;
 
+
 import ch.cern.nile.common.configuration.properties.ClientProperties;
 import ch.cern.nile.common.configuration.properties.CommonProperties;
+import ch.cern.nile.common.configuration.properties.ConversionProperties;
 import ch.cern.nile.common.configuration.properties.DecodingProperties;
 import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
 import ch.cern.nile.common.configuration.properties.RoutingProperties;
@@ -47,6 +49,15 @@ class PropertiesCheckTest {
         assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING));
     }
 
+    @Test
+    void givenValidConversionProperties_whenValidateProperties_thenPassesValidation() {
+        final Properties properties = new Properties();
+        initClientAndCommonProperties(properties);
+        properties.put(ConversionProperties.CLOCK_CHANNEL_NAME.getValue(), "");
+
+        assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.CONVERSION));
+    }
+
     @Test
     void givenValidEnrichmentProperties_whenValidateProperties_thenPassesValidation() {
         final Properties properties = new Properties();
diff --git a/src/test/java/ch/cern/nile/common/probes/HealthTest.java b/src/test/java/ch/cern/nile/common/probes/HealthTest.java
index f2f0d5cedff76371b465ae4b0e93d1d58878bd58..4d5e2a14969601b394f607225f41c872657fccec 100644
--- a/src/test/java/ch/cern/nile/common/probes/HealthTest.java
+++ b/src/test/java/ch/cern/nile/common/probes/HealthTest.java
@@ -97,6 +97,6 @@ class HealthTest {
     void givenHttpServerCreationFails_whenStart_thenThrowsRuntimeException() throws IOException {
         when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenThrow(IOException.class);
 
-        assertThrows(RuntimeException.class, () -> health.start());
+        assertThrows(RuntimeException.class, health::start);
     }
 }