diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 045bd75f2eacf30cb717596bdc40a8ef793732a9..5864734a33c1bc0b71dc7e049327be10c9f5465e 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -6,6 +6,13 @@ - The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Entry types: Added, Changed, Deprecated, Removed, Fixed, Security + +## [1.3.0] - 2024-10-18 +## Added +- Added new StreamType: `CONVERSION` +- Add DLQ support to AbstractStream +- Implement JsonArraySerde for objects containing JsonArray types + ## [1.2.0] - 2024-10-18 ## Added - Added new StreamType: `SCHEMA_TRANSFORMATION` diff --git a/pom.xml b/pom.xml index 5b98da729e992f04d86aef799d3c31a5d1487d97..69f61c4380036d73cdd8fc7a5d6c3d99c81e8423 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ <groupId>ch.cern.nile</groupId> <artifactId>nile-common</artifactId> - <version>1.2.0</version> + <version>1.3.0</version> <properties> <maven.compiler.release>21</maven.compiler.release> diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java index dd62d28c7e418318fdd9f3e70c672a8ec1934d46..5686f10e35f6c9e5df44ffc7f1fddb8b8a808170 100644 --- a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java +++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java @@ -6,6 +6,7 @@ import java.util.Set; import ch.cern.nile.common.configuration.properties.ClientProperties; import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.ConversionProperties; import ch.cern.nile.common.configuration.properties.DecodingProperties; import ch.cern.nile.common.configuration.properties.EnrichmentProperties; import ch.cern.nile.common.configuration.properties.PropertyEnum; @@ -26,6 +27,7 @@ public final class PropertiesCheck { private static final Set<String> ENRICHMENT_PROPERTIES = PropertyEnum.getValues(EnrichmentProperties.class); private static final Set<String> SCHEMA_TRANSFORMATION_PROPERTIES = PropertyEnum.getValues( SchemaTransformationProperties.class); + private static final Set<String> CONVERSION_PROPERTIES = PropertyEnum.getValues(ConversionProperties.class); private PropertiesCheck() { } @@ -51,8 +53,8 @@ public final class PropertiesCheck { case DECODING: validateRequiredProperties(properties, DECODING_PROPERTIES); break; - case ROUTING: - validateRequiredProperties(properties, ROUTING_PROPERTIES); + case CONVERSION: + validateRequiredProperties(properties, CONVERSION_PROPERTIES); break; case ENRICHMENT: validateRequiredProperties(properties, ENRICHMENT_PROPERTIES); @@ -60,6 +62,9 @@ public final class PropertiesCheck { case SCHEMA_TRANSFORMATION: validateRequiredProperties(properties, SCHEMA_TRANSFORMATION_PROPERTIES); break; + case ROUTING: + validateRequiredProperties(properties, ROUTING_PROPERTIES); + break; default: // Cannot happen as the stream type is validated before this switch statement. throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType)); diff --git a/src/main/java/ch/cern/nile/common/configuration/StreamType.java b/src/main/java/ch/cern/nile/common/configuration/StreamType.java index e396268da7856fbbfbcba22c7caeedad123f56c0..06f4051c2b423f7aefdfebff37c7f639905cc9e1 100644 --- a/src/main/java/ch/cern/nile/common/configuration/StreamType.java +++ b/src/main/java/ch/cern/nile/common/configuration/StreamType.java @@ -5,9 +5,10 @@ package ch.cern.nile.common.configuration; */ public enum StreamType { - ROUTING, DECODING, ENRICHMENT, - SCHEMA_TRANSFORMATION + CONVERSION, + SCHEMA_TRANSFORMATION, + ROUTING } diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..867c180b3a709379a1f40d979fce7e0b976899ff --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/ConversionProperties.java @@ -0,0 +1,15 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + +/** + * Enum representing properties specific to conversion clients. + */ +@Getter +public enum ConversionProperties implements PropertyEnum { + CLOCK_CHANNEL_NAME("clock.channel.name"); + + private final String value; + + ConversionProperties(final String value) { this.value = value; } +} diff --git a/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java new file mode 100644 index 0000000000000000000000000000000000000000..a8eb4fe9d88a21132eebac7af8e1d8f45de51c0f --- /dev/null +++ b/src/main/java/ch/cern/nile/common/json/JsonArraySerde.java @@ -0,0 +1,65 @@ +package ch.cern.nile.common.json; + +import java.util.Map; + +import com.google.gson.JsonArray; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** JsonArray Serde implementation. + * @author icoteril + * @version 1.3.0 + * @since 1.3.0 + */ +public class JsonArraySerde implements Serde<JsonArray> { + + private final JsonPojoSerializer<JsonArray> jsonSerializer = new JsonPojoSerializer<>(); + private final JsonPojoDeserializer<JsonArray> jsonDeserializer = new JsonPojoDeserializer<>(JsonArray.class); + + /** + * Configures this Serde with the given properties. This method configures both the internal + * serializer and deserializer with the provided configuration settings. + * + * @param configs the properties from the consumer or producer configuration + * @param isKey indicates whether this Serde is being used for key serialization/deserialization + * This parameter is ignored in this implementation + */ + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + jsonSerializer.configure(configs, isKey); + jsonDeserializer.configure(configs, isKey); + } + + /** + * Closes this Serde. This method closes both the internal serializer and deserializer. + */ + @Override + public void close() { + jsonSerializer.close(); + jsonDeserializer.close(); + } + + /** + * Returns the serializer component of this Serde. + * + * @return The {@link JsonPojoSerializer} for serializing JSON objects + */ + @Override + public Serializer<JsonArray> serializer() { + return jsonSerializer; + } + + /** + * Returns the deserializer component of this Serde. + * + * @return The {@link JsonPojoDeserializer} for deserializing JSON objects + */ + @Override + public Deserializer<JsonArray> deserializer() { + return jsonDeserializer; + } + +} diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java index d84c8ac62774aef7ebc533f6d38df619de12771e..e03a09a85af2b7cd4f202d018114a4f95d75af63 100644 --- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java +++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java @@ -1,5 +1,6 @@ package ch.cern.nile.common.streams; +import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -7,6 +8,7 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,9 @@ public abstract class AbstractStream implements Streaming { @Getter(AccessLevel.PROTECTED) private String schemaRegistryUrl; + @Getter(AccessLevel.PROTECTED) + private String dlqTopic; + @Setter(AccessLevel.PROTECTED) private long lastReadOffset = -2; @@ -112,10 +117,14 @@ public abstract class AbstractStream implements Streaming { final StreamsBuilder builder = new StreamsBuilder(); sourceTopic = properties.getProperty(ClientProperties.SOURCE_TOPIC.getValue()); sinkTopic = properties.getProperty(DecodingProperties.SINK_TOPIC.getValue()); + dlqTopic = String.join("-", Arrays.asList(sinkTopic, "dlq")); schemaRegistryUrl = properties.getProperty(SchemaTransformationProperties.SCHEMA_REGISTRY_URL.getValue()); createTopology(builder); final Topology topology = builder.build(); streams = kafkaStreamsClient.create(topology); + streams.setUncaughtExceptionHandler( + (exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD + ); health = new Health(streams); latch = new CountDownLatch(1); } diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java index 17421b890fc189c4f78593ca419ce437e933eca3..045d28d3645bca19237c549ab78c764035912b6c 100644 --- a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java +++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java @@ -7,8 +7,10 @@ import java.util.Properties; import org.junit.jupiter.api.Test; + import ch.cern.nile.common.configuration.properties.ClientProperties; import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.ConversionProperties; import ch.cern.nile.common.configuration.properties.DecodingProperties; import ch.cern.nile.common.configuration.properties.EnrichmentProperties; import ch.cern.nile.common.configuration.properties.RoutingProperties; @@ -47,6 +49,15 @@ class PropertiesCheckTest { assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING)); } + @Test + void givenValidConversionProperties_whenValidateProperties_thenPassesValidation() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + properties.put(ConversionProperties.CLOCK_CHANNEL_NAME.getValue(), ""); + + assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.CONVERSION)); + } + @Test void givenValidEnrichmentProperties_whenValidateProperties_thenPassesValidation() { final Properties properties = new Properties(); diff --git a/src/test/java/ch/cern/nile/common/probes/HealthTest.java b/src/test/java/ch/cern/nile/common/probes/HealthTest.java index f2f0d5cedff76371b465ae4b0e93d1d58878bd58..4d5e2a14969601b394f607225f41c872657fccec 100644 --- a/src/test/java/ch/cern/nile/common/probes/HealthTest.java +++ b/src/test/java/ch/cern/nile/common/probes/HealthTest.java @@ -97,6 +97,6 @@ class HealthTest { void givenHttpServerCreationFails_whenStart_thenThrowsRuntimeException() throws IOException { when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenThrow(IOException.class); - assertThrows(RuntimeException.class, () -> health.start()); + assertThrows(RuntimeException.class, health::start); } }