From 0a46cc1c6b5d7b767c6b22d36f730a1ee224d08d Mon Sep 17 00:00:00 2001 From: Ignacio Coterillo <ignacio.coterillo.coz@cern.ch> Date: Fri, 18 Oct 2024 13:14:56 +0200 Subject: [PATCH 1/2] Implement Conversion type --- CHANGELOG.MD | 7 +++ pom.xml | 2 +- .../common/configuration/PropertiesCheck.java | 16 +++-- .../nile/common/configuration/StreamType.java | 5 +- .../properties/ConversionProperties.java | 15 +++++ .../cern/nile/common/json/JsonArraySerde.java | 61 +++++++++++++++++++ .../nile/common/streams/AbstractStream.java | 11 ++++ .../configuration/PropertiesCheckTest.java | 15 +++-- 8 files changed, 115 insertions(+), 17 deletions(-) create mode 100644 src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java create mode 100644 src/main/java/ch/cern/nile/common/json/JsonArraySerde.java diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 045bd75..5864734 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -6,6 +6,13 @@ - The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Entry types: Added, Changed, Deprecated, Removed, Fixed, Security + +## [1.3.0] - 2024-10-18 +## Added +- Added new StreamType: `CONVERSION` +- Add DLQ support to AbstractStream +- Implement JsonArraySerde for objects containing JsonArray types + ## [1.2.0] - 2024-10-18 ## Added - Added new StreamType: `SCHEMA_TRANSFORMATION` diff --git a/pom.xml b/pom.xml index 5b98da7..69f61c4 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ <groupId>ch.cern.nile</groupId> <artifactId>nile-common</artifactId> - <version>1.2.0</version> + <version>1.3.0</version> <properties> <maven.compiler.release>21</maven.compiler.release> diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java index dd62d28..325dcb4 100644 --- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java +++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java @@ -4,13 +4,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; -import ch.cern.nile.common.configuration.properties.ClientProperties; -import ch.cern.nile.common.configuration.properties.CommonProperties; -import ch.cern.nile.common.configuration.properties.DecodingProperties; -import ch.cern.nile.common.configuration.properties.EnrichmentProperties; -import ch.cern.nile.common.configuration.properties.PropertyEnum; -import ch.cern.nile.common.configuration.properties.RoutingProperties; -import ch.cern.nile.common.configuration.properties.SchemaTransformationProperties; +import ch.cern.nile.common.configuration.properties.*; import ch.cern.nile.common.exceptions.MissingPropertyException; import ch.cern.nile.common.exceptions.UnknownStreamTypeException; @@ -26,6 +20,7 @@ public final class PropertiesCheck { private static final Set<String> ENRICHMENT_PROPERTIES = PropertyEnum.getValues(EnrichmentProperties.class); private static final Set<String> SCHEMA_TRANSFORMATION_PROPERTIES = PropertyEnum.getValues( SchemaTransformationProperties.class); + private static final Set<String> CONVERSION_PROPERTIES = PropertyEnum.getValues(ConversionProperties.class); private PropertiesCheck() { } @@ -51,8 +46,8 @@ public final class PropertiesCheck { case DECODING: validateRequiredProperties(properties, DECODING_PROPERTIES); break; - case ROUTING: - validateRequiredProperties(properties, ROUTING_PROPERTIES); + case CONVERSION: + validateRequiredProperties(properties, CONVERSION_PROPERTIES); break; case ENRICHMENT: validateRequiredProperties(properties, ENRICHMENT_PROPERTIES); @@ -60,6 +55,9 @@ public final class PropertiesCheck { case SCHEMA_TRANSFORMATION: validateRequiredProperties(properties, SCHEMA_TRANSFORMATION_PROPERTIES); break; + case ROUTING: + validateRequiredProperties(properties, ROUTING_PROPERTIES); + break; default: // Cannot happen as the stream type is validated before this switch statement. throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType)); diff --git a/src/main/java/ch/cern/nile/common/configuration/StreamType.java b/src/main/java/ch/cern/nile/common/configuration/StreamType.java index e396268..06f4051 100644 --- a/src/main/java/ch/cern/nile/common/configuration/StreamType.java +++ b/src/main/java/ch/cern/nile/common/configuration/StreamType.java @@ -5,9 +5,10 @@ package ch.cern.nile.common.configuration; */ public enum StreamType { - ROUTING, DECODING, ENRICHMENT, - SCHEMA_TRANSFORMATION + CONVERSION, + SCHEMA_TRANSFORMATION, + ROUTING } diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java new file mode 100644 index 0000000..01965a2 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java @@ -0,0 +1,15 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + +/** + * Enum representing properties specific to conversion clients" + */ +@Getter +public enum ConversionProperties implements PropertyEnum { + CLOCK_CHANNEL_NAME("clock.channel.name"); + + private final String value; + + ConversionProperties(final String value) { this.value = value; } +} diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java new file mode 100644 index 0000000..227fc12 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java @@ -0,0 +1,61 @@ +package ch.cern.nile.common.json; + +import java.util.Map; + +import com.google.gson.JsonArray; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + + +public class JsonArraySerde implements Serde<JsonArray> { + + private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>(); + private final JsonPojoDeserializer<JsonArray> jsonDeserializer = new JsonPojoDeserializer<>(JsonArray.class); + + /** + * Configures this Serde with the given properties. This method configures both the internal + * serializer and deserializer with the provided configuration settings. + * + * @param configs the properties from the consumer or producer configuration + * @param isKey indicates whether this Serde is being used for key serialization/deserialization + * This parameter is ignored in this implementation + */ + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + jsonSerializer.configure(configs, isKey); + jsonDeserializer.configure(configs, isKey); + } + + /** + * Closes this Serde. This method closes both the internal serializer and deserializer. + */ + @Override + public void close() { + jsonSerializer.close(); + jsonDeserializer.close(); + } + + /** + * Returns the serializer component of this Serde. + * + * @return The {@link JsonPojoSerializer} for serializing JSON objects + */ + @Override + public Serializer<JsonArray> serializer() { + return jsonSerializer; + } + + /** + * Returns the deserializer component of this Serde. + * + * @return The {@link JsonPojoDeserializer} for deserializing JSON objects + */ + @Override + public Deserializer<JsonArray> deserializer() { + return jsonDeserializer; + } + +} diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java index d84c8ac..f3b0476 100644 --- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java +++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java @@ -1,5 +1,6 @@ package ch.cern.nile.common.streams; +import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -7,6 +8,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,9 @@ public abstract class AbstractStream implements Streaming { @Getter(AccessLevel.PROTECTED) private String schemaRegistryUrl; + @Getter(AccessLevel.PROTECTED) + private String dlqTopic; + @Setter(AccessLevel.PROTECTED) private long lastReadOffset = -2; @@ -56,6 +61,11 @@ public abstract class AbstractStream implements Streaming { private Health health; private CountDownLatch latch; + public AbstractStream() { + //super(); + //this.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; + } + /** * Configures the Kafka Streams application with provided settings. * @@ -112,6 +122,7 @@ public abstract class AbstractStream implements Streaming { final StreamsBuilder builder = new StreamsBuilder(); sourceTopic = properties.getProperty(ClientProperties.SOURCE_TOPIC.getValue()); sinkTopic = properties.getProperty(DecodingProperties.SINK_TOPIC.getValue()); + dlqTopic = String.join("-", Arrays.asList(sinkTopic, "dlq")); schemaRegistryUrl = properties.getProperty(SchemaTransformationProperties.SCHEMA_REGISTRY_URL.getValue()); createTopology(builder); final Topology topology = builder.build(); diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java index 17421b8..9688323 100644 --- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java +++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java @@ -5,13 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Properties; +import ch.cern.nile.common.configuration.properties.*; import org.junit.jupiter.api.Test; -import ch.cern.nile.common.configuration.properties.ClientProperties; -import ch.cern.nile.common.configuration.properties.CommonProperties; -import ch.cern.nile.common.configuration.properties.DecodingProperties; -import ch.cern.nile.common.configuration.properties.EnrichmentProperties; -import ch.cern.nile.common.configuration.properties.RoutingProperties; import ch.cern.nile.common.exceptions.MissingPropertyException; class PropertiesCheckTest { @@ -47,6 +43,15 @@ class PropertiesCheckTest { assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING)); } + @Test + void givenValidConversionProperties_whenValidateProperties_thenPassesValidation() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + properties.put(ConversionProperties.CLOCK_CHANNEL_NAME.getValue(), ""); + + assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.CONVERSION)); + } + @Test void givenValidEnrichmentProperties_whenValidateProperties_thenPassesValidation() { final Properties properties = new Properties(); -- GitLab From 42de08ab90ed326bee211348725e205ef077df77 Mon Sep 17 00:00:00 2001 From: Ignacio Coterillo <ignacio.coterillo.coz@cern.ch> Date: Fri, 18 Oct 2024 16:32:54 +0200 Subject: [PATCH 2/2] wip: checkstyle --- .../cern/nile/common/configuration/PropertiesCheck.java | 9 ++++++++- .../configuration/properties/ConversionProperties.java | 2 +- .../java/ch/cern/nile/common/json/JsonArraySerde.java | 6 +++++- .../java/ch/cern/nile/common/streams/AbstractStream.java | 8 +++----- .../nile/common/configuration/PropertiesCheckTest.java | 8 +++++++- src/test/java/ch/cern/nile/common/probes/HealthTest.java | 2 +- 6 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java index 325dcb4..5686f10 100644 --- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java +++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java @@ -4,7 +4,14 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; -import ch.cern.nile.common.configuration.properties.*; +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.ConversionProperties; +import ch.cern.nile.common.configuration.properties.DecodingProperties; +import ch.cern.nile.common.configuration.properties.EnrichmentProperties; +import ch.cern.nile.common.configuration.properties.PropertyEnum; +import ch.cern.nile.common.configuration.properties.RoutingProperties; +import ch.cern.nile.common.configuration.properties.SchemaTransformationProperties; import ch.cern.nile.common.exceptions.MissingPropertyException; import ch.cern.nile.common.exceptions.UnknownStreamTypeException; diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java index 01965a2..867c180 100644 --- a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java +++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java @@ -3,7 +3,7 @@ package ch.cern.nile.common.configuration.properties; import lombok.Getter; /** - * Enum representing properties specific to conversion clients" + * Enum representing properties specific to conversion clients. */ @Getter public enum ConversionProperties implements PropertyEnum { diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java index 227fc12..a8eb4fe 100644 --- a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java +++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java @@ -8,7 +8,11 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; - +/** JsonArray Serde implementation. + * @author icoteril + * @version 1.3.0 + * @since 1.3.0 + */ public class JsonArraySerde implements Serde<JsonArray> { private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>(); diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java index f3b0476..e03a09a 100644 --- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java +++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java @@ -61,11 +61,6 @@ public abstract class AbstractStream implements Streaming { private Health health; private CountDownLatch latch; - public AbstractStream() { - //super(); - //this.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; - } - /** * Configures the Kafka Streams application with provided settings. * @@ -127,6 +122,9 @@ public abstract class AbstractStream implements Streaming { createTopology(builder); final Topology topology = builder.build(); streams = kafkaStreamsClient.create(topology); + streams.setUncaughtExceptionHandler( + (exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD + ); health = new Health(streams); latch = new CountDownLatch(1); } diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java index 9688323..045d28d 100644 --- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java +++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java @@ -5,9 +5,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.Properties; -import ch.cern.nile.common.configuration.properties.*; import org.junit.jupiter.api.Test; + +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.ConversionProperties; +import ch.cern.nile.common.configuration.properties.DecodingProperties; +import ch.cern.nile.common.configuration.properties.EnrichmentProperties; +import ch.cern.nile.common.configuration.properties.RoutingProperties; import ch.cern.nile.common.exceptions.MissingPropertyException; class PropertiesCheckTest { diff --git a/src/test/java/ch/cern/nile/common/probes/HealthTest.java b/src/test/java/ch/cern/nile/common/probes/HealthTest.java index f2f0d5c..4d5e2a1 100644 --- a/src/test/java/ch/cern/nile/common/probes/HealthTest.java +++ b/src/test/java/ch/cern/nile/common/probes/HealthTest.java @@ -97,6 +97,6 @@ class HealthTest { void givenHttpServerCreationFails_whenStart_thenThrowsRuntimeException() throws IOException { when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenThrow(IOException.class); - assertThrows(RuntimeException.class, () -> health.start()); + assertThrows(RuntimeException.class, health::start); } } -- GitLab