From 0a46cc1c6b5d7b767c6b22d36f730a1ee224d08d Mon Sep 17 00:00:00 2001
From: Ignacio Coterillo <ignacio.coterillo.coz@cern.ch>
Date: Fri, 18 Oct 2024 13:14:56 +0200
Subject: [PATCH 1/2] Implement Conversion type

---
 CHANGELOG.MD                                  |  7 +++
 pom.xml                                       |  2 +-
 .../common/configuration/PropertiesCheck.java | 16 +++--
 .../nile/common/configuration/StreamType.java |  5 +-
 .../properties/ConversionProperties.java      | 15 +++++
 .../cern/nile/common/json/JsonArraySerde.java | 61 +++++++++++++++++++
 .../nile/common/streams/AbstractStream.java   | 11 ++++
 .../configuration/PropertiesCheckTest.java    | 15 +++--
 8 files changed, 115 insertions(+), 17 deletions(-)
 create mode 100644 src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
 create mode 100644 src/main/java/ch/cern/nile/common/json/JsonArraySerde.java

diff --git a/CHANGELOG.MD b/CHANGELOG.MD
index 045bd75..5864734 100644
--- a/CHANGELOG.MD
+++ b/CHANGELOG.MD
@@ -6,6 +6,13 @@
 - The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
 - Entry types: Added, Changed, Deprecated, Removed, Fixed, Security
 
+
+## [1.3.0] - 2024-10-18
+## Added
+- Added new StreamType: `CONVERSION`
+- Add DLQ support to AbstractStream
+- Implement JsonArraySerde for objects containing JsonArray types
+
 ## [1.2.0] - 2024-10-18
 ## Added
 - Added new StreamType: `SCHEMA_TRANSFORMATION`
diff --git a/pom.xml b/pom.xml
index 5b98da7..69f61c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>ch.cern.nile</groupId>
     <artifactId>nile-common</artifactId>
-    <version>1.2.0</version>
+    <version>1.3.0</version>
 
     <properties>
         <maven.compiler.release>21</maven.compiler.release>
diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
index dd62d28..325dcb4 100644
--- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
+++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
@@ -4,13 +4,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
-import ch.cern.nile.common.configuration.properties.ClientProperties;
-import ch.cern.nile.common.configuration.properties.CommonProperties;
-import ch.cern.nile.common.configuration.properties.DecodingProperties;
-import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
-import ch.cern.nile.common.configuration.properties.PropertyEnum;
-import ch.cern.nile.common.configuration.properties.RoutingProperties;
-import ch.cern.nile.common.configuration.properties.SchemaTransformationProperties;
+import ch.cern.nile.common.configuration.properties.*;
 import ch.cern.nile.common.exceptions.MissingPropertyException;
 import ch.cern.nile.common.exceptions.UnknownStreamTypeException;
 
@@ -26,6 +20,7 @@ public final class PropertiesCheck {
     private static final Set<String> ENRICHMENT_PROPERTIES = PropertyEnum.getValues(EnrichmentProperties.class);
     private static final Set<String> SCHEMA_TRANSFORMATION_PROPERTIES = PropertyEnum.getValues(
             SchemaTransformationProperties.class);
+    private static final Set<String> CONVERSION_PROPERTIES = PropertyEnum.getValues(ConversionProperties.class);
 
     private PropertiesCheck() {
     }
@@ -51,8 +46,8 @@ public final class PropertiesCheck {
             case DECODING:
                 validateRequiredProperties(properties, DECODING_PROPERTIES);
                 break;
-            case ROUTING:
-                validateRequiredProperties(properties, ROUTING_PROPERTIES);
+            case CONVERSION:
+                validateRequiredProperties(properties, CONVERSION_PROPERTIES);
                 break;
             case ENRICHMENT:
                 validateRequiredProperties(properties, ENRICHMENT_PROPERTIES);
@@ -60,6 +55,9 @@ public final class PropertiesCheck {
             case SCHEMA_TRANSFORMATION:
                 validateRequiredProperties(properties, SCHEMA_TRANSFORMATION_PROPERTIES);
                 break;
+            case ROUTING:
+                validateRequiredProperties(properties, ROUTING_PROPERTIES);
+                break;
             default:
                 // Cannot happen as the stream type is validated before this switch statement.
                 throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType));
diff --git a/src/main/java/ch/cern/nile/common/configuration/StreamType.java b/src/main/java/ch/cern/nile/common/configuration/StreamType.java
index e396268..06f4051 100644
--- a/src/main/java/ch/cern/nile/common/configuration/StreamType.java
+++ b/src/main/java/ch/cern/nile/common/configuration/StreamType.java
@@ -5,9 +5,10 @@ package ch.cern.nile.common.configuration;
  */
 public enum StreamType {
 
-    ROUTING,
     DECODING,
     ENRICHMENT,
-    SCHEMA_TRANSFORMATION
+    CONVERSION,
+    SCHEMA_TRANSFORMATION,
+    ROUTING
 
 }
diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
new file mode 100644
index 0000000..01965a2
--- /dev/null
+++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
@@ -0,0 +1,15 @@
+package ch.cern.nile.common.configuration.properties;
+
+import lombok.Getter;
+
+/**
+ * Enum representing properties specific to conversion clients"
+ */
+@Getter
+public enum ConversionProperties implements PropertyEnum {
+    CLOCK_CHANNEL_NAME("clock.channel.name");
+
+    private final String value;
+
+    ConversionProperties(final String value) { this.value = value; }
+}
diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
new file mode 100644
index 0000000..227fc12
--- /dev/null
+++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
@@ -0,0 +1,61 @@
+package ch.cern.nile.common.json;
+
+import java.util.Map;
+
+import com.google.gson.JsonArray;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+
+public class JsonArraySerde implements Serde<JsonArray>  {
+
+    private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>();
+    private final JsonPojoDeserializer<JsonArray> jsonDeserializer = new JsonPojoDeserializer<>(JsonArray.class);
+
+    /**
+     * Configures this Serde with the given properties. This method configures both the internal
+     * serializer and deserializer with the provided configuration settings.
+     *
+     * @param configs the properties from the consumer or producer configuration
+     * @param isKey   indicates whether this Serde is being used for key serialization/deserialization
+     *                This parameter is ignored in this implementation
+     */
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        jsonSerializer.configure(configs, isKey);
+        jsonDeserializer.configure(configs, isKey);
+    }
+
+    /**
+     * Closes this Serde. This method closes both the internal serializer and deserializer.
+     */
+    @Override
+    public void close() {
+        jsonSerializer.close();
+        jsonDeserializer.close();
+    }
+
+    /**
+     * Returns the serializer component of this Serde.
+     *
+     * @return The {@link JsonPojoSerializer} for serializing JSON objects
+     */
+    @Override
+    public Serializer<JsonArray> serializer() {
+        return jsonSerializer;
+    }
+
+    /**
+     * Returns the deserializer component of this Serde.
+     *
+     * @return The {@link JsonPojoDeserializer} for deserializing JSON objects
+     */
+    @Override
+    public Deserializer<JsonArray> deserializer() {
+        return jsonDeserializer;
+    }
+
+}
diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
index d84c8ac..f3b0476 100644
--- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
+++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
@@ -1,5 +1,6 @@
 package ch.cern.nile.common.streams;
 
+import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
@@ -7,6 +8,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,9 @@ public abstract class AbstractStream implements Streaming {
     @Getter(AccessLevel.PROTECTED)
     private String schemaRegistryUrl;
 
+    @Getter(AccessLevel.PROTECTED)
+    private String dlqTopic;
+
     @Setter(AccessLevel.PROTECTED)
     private long lastReadOffset = -2;
 
@@ -56,6 +61,11 @@ public abstract class AbstractStream implements Streaming {
     private Health health;
     private CountDownLatch latch;
 
+    public AbstractStream() {
+	//super();
+        //this.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
+    }
+
     /**
      * Configures the Kafka Streams application with provided settings.
      *
@@ -112,6 +122,7 @@ public abstract class AbstractStream implements Streaming {
         final StreamsBuilder builder = new StreamsBuilder();
         sourceTopic = properties.getProperty(ClientProperties.SOURCE_TOPIC.getValue());
         sinkTopic = properties.getProperty(DecodingProperties.SINK_TOPIC.getValue());
+        dlqTopic = String.join("-", Arrays.asList(sinkTopic, "dlq"));
         schemaRegistryUrl = properties.getProperty(SchemaTransformationProperties.SCHEMA_REGISTRY_URL.getValue());
         createTopology(builder);
         final Topology topology = builder.build();
diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
index 17421b8..9688323 100644
--- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
+++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
@@ -5,13 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Properties;
 
+import ch.cern.nile.common.configuration.properties.*;
 import org.junit.jupiter.api.Test;
 
-import ch.cern.nile.common.configuration.properties.ClientProperties;
-import ch.cern.nile.common.configuration.properties.CommonProperties;
-import ch.cern.nile.common.configuration.properties.DecodingProperties;
-import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
-import ch.cern.nile.common.configuration.properties.RoutingProperties;
 import ch.cern.nile.common.exceptions.MissingPropertyException;
 
 class PropertiesCheckTest {
@@ -47,6 +43,15 @@ class PropertiesCheckTest {
         assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING));
     }
 
+    @Test
+    void givenValidConversionProperties_whenValidateProperties_thenPassesValidation() {
+        final Properties properties = new Properties();
+        initClientAndCommonProperties(properties);
+        properties.put(ConversionProperties.CLOCK_CHANNEL_NAME.getValue(), "");
+
+        assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.CONVERSION));
+    }
+
     @Test
     void givenValidEnrichmentProperties_whenValidateProperties_thenPassesValidation() {
         final Properties properties = new Properties();
-- 
GitLab


From 42de08ab90ed326bee211348725e205ef077df77 Mon Sep 17 00:00:00 2001
From: Ignacio Coterillo <ignacio.coterillo.coz@cern.ch>
Date: Fri, 18 Oct 2024 16:32:54 +0200
Subject: [PATCH 2/2] wip: checkstyle

---
 .../cern/nile/common/configuration/PropertiesCheck.java  | 9 ++++++++-
 .../configuration/properties/ConversionProperties.java   | 2 +-
 .../java/ch/cern/nile/common/json/JsonArraySerde.java    | 6 +++++-
 .../java/ch/cern/nile/common/streams/AbstractStream.java | 8 +++-----
 .../nile/common/configuration/PropertiesCheckTest.java   | 8 +++++++-
 src/test/java/ch/cern/nile/common/probes/HealthTest.java | 2 +-
 6 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
index 325dcb4..5686f10 100644
--- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
+++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java
@@ -4,7 +4,14 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
-import ch.cern.nile.common.configuration.properties.*;
+import ch.cern.nile.common.configuration.properties.ClientProperties;
+import ch.cern.nile.common.configuration.properties.CommonProperties;
+import ch.cern.nile.common.configuration.properties.ConversionProperties;
+import ch.cern.nile.common.configuration.properties.DecodingProperties;
+import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
+import ch.cern.nile.common.configuration.properties.PropertyEnum;
+import ch.cern.nile.common.configuration.properties.RoutingProperties;
+import ch.cern.nile.common.configuration.properties.SchemaTransformationProperties;
 import ch.cern.nile.common.exceptions.MissingPropertyException;
 import ch.cern.nile.common.exceptions.UnknownStreamTypeException;
 
diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
index 01965a2..867c180 100644
--- a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
+++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java
@@ -3,7 +3,7 @@ package ch.cern.nile.common.configuration.properties;
 import lombok.Getter;
 
 /**
- * Enum representing properties specific to conversion clients"
+ * Enum representing properties specific to conversion clients.
  */
 @Getter
 public enum ConversionProperties implements PropertyEnum {
diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
index 227fc12..a8eb4fe 100644
--- a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
+++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java
@@ -8,7 +8,11 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
 
-
+/** JsonArray Serde implementation.
+ * @author icoteril
+ * @version 1.3.0
+ * @since 1.3.0
+ */
 public class JsonArraySerde implements Serde<JsonArray>  {
 
     private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>();
diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
index f3b0476..e03a09a 100644
--- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
+++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java
@@ -61,11 +61,6 @@ public abstract class AbstractStream implements Streaming {
     private Health health;
     private CountDownLatch latch;
 
-    public AbstractStream() {
-	//super();
-        //this.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
-    }
-
     /**
      * Configures the Kafka Streams application with provided settings.
      *
@@ -127,6 +122,9 @@ public abstract class AbstractStream implements Streaming {
         createTopology(builder);
         final Topology topology = builder.build();
         streams = kafkaStreamsClient.create(topology);
+        streams.setUncaughtExceptionHandler(
+                (exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
+        );
         health = new Health(streams);
         latch = new CountDownLatch(1);
     }
diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
index 9688323..045d28d 100644
--- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
+++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java
@@ -5,9 +5,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Properties;
 
-import ch.cern.nile.common.configuration.properties.*;
 import org.junit.jupiter.api.Test;
 
+
+import ch.cern.nile.common.configuration.properties.ClientProperties;
+import ch.cern.nile.common.configuration.properties.CommonProperties;
+import ch.cern.nile.common.configuration.properties.ConversionProperties;
+import ch.cern.nile.common.configuration.properties.DecodingProperties;
+import ch.cern.nile.common.configuration.properties.EnrichmentProperties;
+import ch.cern.nile.common.configuration.properties.RoutingProperties;
 import ch.cern.nile.common.exceptions.MissingPropertyException;
 
 class PropertiesCheckTest {
diff --git a/src/test/java/ch/cern/nile/common/probes/HealthTest.java b/src/test/java/ch/cern/nile/common/probes/HealthTest.java
index f2f0d5c..4d5e2a1 100644
--- a/src/test/java/ch/cern/nile/common/probes/HealthTest.java
+++ b/src/test/java/ch/cern/nile/common/probes/HealthTest.java
@@ -97,6 +97,6 @@ class HealthTest {
     void givenHttpServerCreationFails_whenStart_thenThrowsRuntimeException() throws IOException {
         when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenThrow(IOException.class);
 
-        assertThrows(RuntimeException.class, () -> health.start());
+        assertThrows(RuntimeException.class, health::start);
     }
 }
-- 
GitLab