diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ab61b350d650b10c740cf3e94bd0f7b264c33a9c..f80b94e8d22a876c6610ca1d402607a090faf46d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,45 +1,4 @@ -variables: - MAVEN_OPTS: >- - -Dhttps.protocols=TLSv1.2 - -Dmaven.repo.local=$CI_PROJECT_DIR/.m2/repository - -Dorg.slf4j.simpleLogger.showDateTime=true - -Djava.awt.headless=true - - MAVEN_CLI_OPTS: >- - --batch-mode - --errors - --fail-at-end - --show-version - --no-transfer-progress - -DinstallAtEnd=true - -DdeployAtEnd=true - -image: maven:3.8-openjdk-11 - -cache: - paths: - - .m2/repository - -before_script: - - VERSION=$(mvn -q -Dexec.executable=echo -Dexec.args='${project.version}' --non-recursive exec:exec) - - if [ "$CI_COMMIT_BRANCH" == "qa" ]; then - export VERSION="${VERSION}-QA"; - elif [ "$CI_COMMIT_BRANCH" != "master" ]; then - export VERSION="${VERSION}-${CI_COMMIT_BRANCH}-SNAPSHOT"; - fi - - mvn versions:set -DnewVersion=$VERSION - -.verify: - stage: test - script: - - 'mvn $MAVEN_CLI_OPTS test' - - if [ ! -f ci_settings.xml ]; then - echo "CI settings missing! Please create ci_settings.xml file."; - exit 1; - fi - -deploy:jdk11: - stage: deploy - script: - - 'mvn $MAVEN_CLI_OPTS deploy --settings ci_settings.xml' - when: on_success +include: + - project: nile/java-build-tools + ref: master + file: common-lib-ci.yml diff --git a/CHANGELOG.MD b/CHANGELOG.MD index a9545d547ba8547112b82406c566c497415314e6..7fda23190884bfe04416c449e867f9c669ddc6fa 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -1,4 +1,5 @@ # Changelog of nile-common + - Product: nile/streams/libs/nile-common - Origin: Refactored from `kafka-streams` version `2.7.2`. - All notable changes to this project will be documented in this file. @@ -6,178 +7,254 @@ - Entry types: Added, Changed, Deprecated, Removed, Fixed, Security ## [1.0.1] - 2023-12-07 + ## Added - - Added Main class from `kafka-streams` version `2.7.2` - - Temporarily removed checkstyle -- will be added back in a future release + +- Added Main class from `kafka-streams` version `2.7.2` +- Temporarily removed checkstyle -- will be added back in a future release ## [1.0.0] - 2023-10-04 + ## Added - - Initial release of nile-common - - Extracted from `kafka-streams` version `2.7.2`: - - clients - - configs - - json - - models - - probes - - streams/AbstractStream - - streams/Streaming - - resources/log4j.properties - - schema (excluding `schema/db`) + +- Initial release of nile-common +- Extracted from `kafka-streams` version `2.7.2`: + - clients + - configs + - json + - models + - probes + - streams/AbstractStream + - streams/Streaming + - resources/log4j.properties + - schema (excluding `schema/db`) ________________________________________________________________________________ # Old Changelog of kafka-streams ## [2.7.2] - 2023-09-21 + ### Fixed - - Fixed GenerateTableCreationCommandMain DEVEUI type + +- Fixed GenerateTableCreationCommandMain DEVEUI type ## [2.7.1] - 2023-08-09 + ### Fixed - - [NILE-946] SchemaInjector should match upper & lowe case for the timestamp field + +- [NILE-946] SchemaInjector should match upper & lowe case for the timestamp field ## [2.7.0] - 2023-08-09 + ### Added - - [NILE-964] New ConnectSchemaToTableCreationCommandGenerator to generate SQL commands for creating tables - - [NILE-964] New schema injector to inject schemas in streaming records (if required) - - [NILE-964] Containerized tests for the SchemaInjector with the Aiven JDBC connector and Oracle + +- [NILE-964] New ConnectSchemaToTableCreationCommandGenerator to generate SQL commands for creating tables +- [NILE-964] New schema injector to inject schemas in streaming records (if required) +- [NILE-964] Containerized tests for the SchemaInjector with the Aiven JDBC connector and Oracle + ### Changed - - [NILE-946] LoraEnAccessControlDecode now uses the new schema injector - - [NILE-946] New Kaitai struct for LoraEnAccessControlDecode - - [NILE-946] Added new main class for table creation command generation + +- [NILE-946] LoraEnAccessControlDecode now uses the new schema injector +- [NILE-946] New Kaitai struct for LoraEnAccessControlDecode +- [NILE-946] Added new main class for table creation command generation ## [2.6.9] - 2023-07-12 + ### Fixed - - [NILE-948] DifferentialPressure should be signed instead of unsigned + +- [NILE-948] DifferentialPressure should be signed instead of unsigned ## [2.6.8] - 2023-07-12 + ### Fixed - - [NILE-948] Ignore unsupported frames for LoraItComputerCenterTempCayenneLpp + +- [NILE-948] Ignore unsupported frames for LoraItComputerCenterTempCayenneLpp ## [2.6.7] - 2023-07-12 + ### Added - - [NILE-948] New decoder and streaming for LoraItComputerCenterTempCayenneLpp sensors + +- [NILE-948] New decoder and streaming for LoraItComputerCenterTempCayenneLpp sensors ## [2.6.6] - 2023-07-07 + ### Fixed - - Fixed division by zero in LoraHumTempBatmon + +- Fixed division by zero in LoraHumTempBatmon ## [2.6.5] - 2023-06-29 + ### Added - - [NILE-977] Add StatusFlag in LoraHumTempBatmon + +- [NILE-977] Add StatusFlag in LoraHumTempBatmon ## [2.6.4] - 2023-05-12 + ### Fixed - - LoraHumTempBatmon fix (changed temperature_gain_adc_conv calculation formula) + +- LoraHumTempBatmon fix (changed temperature_gain_adc_conv calculation formula) ## [2.6.3] - 2023-05-12 + ### Fixed - - LoraHumTempBatmon fix (changed field from int to double) + +- LoraHumTempBatmon fix (changed field from int to double) ## [2.6.2] - 2023-05-11 + ### Fixed - - LoraHumTempBatmon fix (removed deviceName) + +- LoraHumTempBatmon fix (removed deviceName) ## [2.6.1] - 2023-05-11 + ### Fixed - - LoraRisingHfDecoder fix (rssi should be float to match the InfluxDb measurements) + +- LoraRisingHfDecoder fix (rssi should be float to match the InfluxDb measurements) ## [2.6.0] - 2023-05-11 + ### Added - - New abstract test class for Kaitai Structs - - New generic streams decoder for Kaitai Structs - - New Lora Humidity Temperature Batmon sensor routing & decoding + +- New abstract test class for Kaitai Structs +- New generic streams decoder for Kaitai Structs +- New Lora Humidity Temperature Batmon sensor routing & decoding + ### Changed - - Updated LoraRouting to use the new generic Kaitai Struct Decoder - - Updated tests to use the new generic test class - - Schema injection for LoraEnAccessControl - - artifactId from cerndb-nile-kafka-streams to nile-kafka-streams - - deployment image path from gitlab-registry.cern.ch/db/kafka-streams to gitlab-registry.cern.ch/nile/kafka-streams - - QA lora-routing from lora-mqtt-qa to lora-mqtt - - QA topic names that didn't fit the convention production-topic-name-qa -### Fixed - - Access Control sensors decoding + +- Updated LoraRouting to use the new generic Kaitai Struct Decoder +- Updated tests to use the new generic test class +- Schema injection for LoraEnAccessControl +- artifactId from cerndb-nile-kafka-streams to nile-kafka-streams +- deployment image path from gitlab-registry.cern.ch/db/kafka-streams to gitlab-registry.cern.ch/nile/kafka-streams +- QA lora-routing from lora-mqtt-qa to lora-mqtt +- QA topic names that didn't fit the convention production-topic-name-qa + +### Fixed + +- Access Control sensors decoding + ### Removed - - Custom decoders: LoraBatmonDecoder + +- Custom decoders: LoraBatmonDecoder ## [2.5.0] - 2023-04-05 + ### Added - - New generic Kaitai Struct Decoder - - New abstract test class for Kaitai Structs - - New generic streams decoder for Kaitai Structs - - New LoraEnAccessControl routing & decoding - - New LoraEnParkingControl routing & decoding + +- New generic Kaitai Struct Decoder +- New abstract test class for Kaitai Structs +- New generic streams decoder for Kaitai Structs +- New LoraEnAccessControl routing & decoding +- New LoraEnParkingControl routing & decoding + ### Changed - - Updated already existing Kaitai Structs to do the decoding themselves - - Updated LoraRouting to use the new generic Kaitai Struct Decoder - - Updated tests to use the new generic test class - - RpCalibration now uses Kaitai Structs and the new generic streams decoder + +- Updated already existing Kaitai Structs to do the decoding themselves +- Updated LoraRouting to use the new generic Kaitai Struct Decoder +- Updated tests to use the new generic test class +- RpCalibration now uses Kaitai Structs and the new generic streams decoder + ### Removed - - Custom decoders: RpCalibration, LoraBatmonDecoder, LoraLoraBeItTempRisingHfDecoder, LoraCrackSensorsDecoder + +- Custom decoders: RpCalibration, LoraBatmonDecoder, LoraLoraBeItTempRisingHfDecoder, LoraCrackSensorsDecoder ## [2.4.6] - 2023-01-26 + ### Fixed - - Bugfix: LoraBatmonDecode: maxSnr first value set to Double.MIN_VALUE instead of 0.0 + +- Bugfix: LoraBatmonDecode: maxSnr first value set to Double.MIN_VALUE instead of 0.0 ## [2.4.6]-[2.4.7] - 2023-01-26 + ### Fixed - - Bugfix: LoraBatmonDecode: maxSnr first value set to Double.MIN_VALUE instead of 0.0 + +- Bugfix: LoraBatmonDecode: maxSnr first value set to Double.MIN_VALUE instead of 0.0 ## [2.4.5] - 2022-11-30 + ### Added - - [NILE-926] Added GM ASG's crack sensors routing & decoding + +- [NILE-926] Added GM ASG's crack sensors routing & decoding + ### Fixed - - Bugfix: [NILE-913] Stream applications shouldn't return null on failure + +- Bugfix: [NILE-913] Stream applications shouldn't return null on failure ## [2.4.4] - 2022-09-22 + ### Fixed - - Fixed GeolocationDecode null pointer exception bug (missing offset injection) + +- Fixed GeolocationDecode null pointer exception bug (missing offset injection) ## [2.4.3] - 2022-09-21 + ### Fixed - - Fixed null pointer exception in LoraContact + +- Fixed null pointer exception in LoraContact ## [2.4.2] - 2022-09-21 + ### Fixed - - Fixed constructors access modifier + +- Fixed constructors access modifier ## [2.4.1] - 2022-09-21 + ### Fixed - - Added empty constructor in streams classes that had one with arguments -- keeps Main.java:49 happy + +- Added empty constructor in streams classes that had one with arguments -- keeps Main.java:49 happy ## [2.4.0] - 2022-09-21 + ### Added - - [NILE-885] Added offset logging when there is a failure - + +- [NILE-885] Added offset logging when there is a failure + ## [2.3.1] - 2022-07-18 + ### Added - - Added more information in Batmon streams application + +- Added more information in Batmon streams application ## [2.3.0] - 2022-05-24 + ### Added - - [NILE-887] Added Batmon routing & decoding + +- [NILE-887] Added Batmon routing & decoding ## [2.2.0] - 2022-06-04 + ### Added + - Routing for new applications (lora-SY-temp-humi-isolde & BE-it-temp) - [NILE-861] Added LoRaWAN environmental sensors decoding + ### Removed - - Removed Cranes project + +- Removed Cranes project ## [1.0.0] - 2021-11-25 + ### Added - - [NILE-846] Cranes project in routing - - [NILE-846] Cranes decoder and CranesDecode - - [NILE-692] - - JUnit5, kafka-streams-test-utils test & mockito dependencies, - - CHANGELOG.MD, - - Tests & test resources for LoraRouting topology - - Tests for Health - - Tests for configs - - Overload NewDecoder#decode(JsonObject, long) -> NewDecoder#decode(JsonObject) + +- [NILE-846] Cranes project in routing +- [NILE-846] Cranes decoder and CranesDecode +- [NILE-692] + - JUnit5, kafka-streams-test-utils test & mockito dependencies, + - CHANGELOG.MD, + - Tests & test resources for LoraRouting topology + - Tests for Health + - Tests for configs + - Overload NewDecoder#decode(JsonObject, long) -> NewDecoder#decode(JsonObject) + ### Changed - - [NILE-692] - - Updated LoraRouting & Utils, to route using the applicationName instead of deviceName, - - Updated some already existing tests with JUnit + +- [NILE-692] + - Updated LoraRouting & Utils, to route using the applicationName instead of deviceName, + - Updated some already existing tests with JUnit + ### Removed - - Unused decoders (Decoder.java, NewDecoder.java) and the corresponding streams + +- Unused decoders (Decoder.java, NewDecoder.java) and the corresponding streams diff --git a/README.md b/README.md index ba2fe537b628ac880fcb172d554c9d5651c2f8a0..fdbb7bada412f729f18f66e16aa69fbead855c2f 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,94 @@ # Nile Common -Nile Common is the common core library for the Nile streaming ecosystem. -It contains all the common classes and utilities that are used by the other Nile streaming applications. +**Nile Common** is a Java library designed for building Kafka streaming applications. This library +encapsulates a range of functionalities including stream processing, schema management, serialization & deserialization, +and Health checks, thus providing a robust foundation for developing data streaming solutions. + +## Getting Started + +### Prerequisites + +- Java 11 or higher + +### Adding Dependency + +Currently, the library is available as a maven artifact. To add the dependency to your project, add the following to +your `pom.xml` file: + +```xml + +<dependency> + <groupId>ch.cern.nile</groupId> + <artifactId>nile-common</artifactId> + <version>1.0.0</version> +</dependency> +``` + +Since this library is not yet available on Maven Central, you will need to also set up the registry in your `pom.xml` +file: + +```xml +<repositories> + <repository> + <id>gitlab-maven</id> + <url>https://gitlab.cern.ch/api/v4/projects/170995/packages/maven</url> + </repository> +</repositories> + +<distributionManagement> +<repository> + <id>gitlab-maven</id> + <url>https://gitlab.cern.ch/api/v4/projects/170995/packages/maven</url> +</repository> + +<snapshotRepository> + <id>gitlab-maven</id> + <url>https://gitlab.cern.ch/api/v4/projects/170995/packages/maven</url> +</snapshotRepository> +</distributionManagement> +``` + +## Basic Usage + +### Extending AbstractStream + +Extend the AbstractStream class to implement your custom streaming logic. This involves defining the stream processing +steps within the createTopology method. + +```java +package com.example.streams; + +import ch.cern.nile.common.streams.AbstractStream; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; + +public class MyCustomStream extends AbstractStream { + + public MyCustomStream(String sourceTopic, String sinkTopic) { + super(sourceTopic, sinkTopic); + } + + @Override + public void createTopology(StreamsBuilder builder) { + // Define your custom stream processing logic + builder.stream(sourceTopic, Consumed.with(Serdes.String(), new JsonSerde())) + .filter(StreamUtils::filterRecord) + .transformValues(InjectOffsetTransformer::new) + .mapValues(value -> { /* your transformation logic */ }) + .filter(StreamUtils::filterNull).to(sinkTopic); + } + + + @Override + public Map<String, Object> enrichCustomFunction(Map<String, Object> map, JsonObject value) { + // Optional: Define your custom enrichment logic + // In this example, we are adding usage of the schema injector + return SchemaInjector.inject(map); + } +} +``` + +## Support & Contact + +For support, questions, or feedback, please contact [Nile Support](mailto:nile-support.cern.ch). \ No newline at end of file diff --git a/ci_settings.xml b/ci_settings.xml deleted file mode 100644 index 57951e92f75c86876e49eca1e23eaeb10eb80e62..0000000000000000000000000000000000000000 --- a/ci_settings.xml +++ /dev/null @@ -1,16 +0,0 @@ -<settings xmlns="http://maven.apache.org/SETTINGS/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.1.0 http://maven.apache.org/xsd/settings-1.1.0.xsd"> - <servers> - <server> - <id>gitlab-maven</id> - <configuration> - <httpHeaders> - <property> - <name>Job-Token</name> - <value>${CI_JOB_TOKEN}</value> - </property> - </httpHeaders> - </configuration> - </server> - </servers> -</settings> diff --git a/pom.xml b/pom.xml index f8c16daaf2d5affb96c3276a59e6ae9509dd5ecb..8a5ffbb9ec97d3f1d92320fb9e3d4bbbcb042fc2 100644 --- a/pom.xml +++ b/pom.xml @@ -6,27 +6,12 @@ <groupId>ch.cern.nile</groupId> <artifactId>nile-common</artifactId> - <version>1.0.1</version> - - <repositories> - <repository> - <id>gitlab-maven</id> - <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> - </repository> - </repositories> - <distributionManagement> - <repository> - <id>gitlab-maven</id> - <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> - </repository> - <snapshotRepository> - <id>gitlab-maven</id> - <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> - </snapshotRepository> - </distributionManagement> + <version>1.0.0</version> <properties> <maven.compiler.release>11</maven.compiler.release> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <kafka.version>2.0.0</kafka.version> </properties> @@ -79,15 +64,96 @@ <version>${kafka.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>5.2.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-annotations</artifactId> + <version>4.8.1</version> + </dependency> </dependencies> <build> <plugins> + <!-- Checkstyle plugin --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>3.3.1</version> + <configuration> + <configLocation> + https://gitlab.cern.ch/nile/java-build-tools/-/raw/master/src/main/resources/checkstyle.xml?ref_type=heads + </configLocation> + <suppressionsLocation> + https://gitlab.cern.ch/nile/java-build-tools/-/raw/master/src/main/resources/checkstyle-suppressions.xml?ref_type=heads + </suppressionsLocation> + <consoleOutput>true</consoleOutput> + <failsOnError>true</failsOnError> + <linkXRef>false</linkXRef> + </configuration> + </plugin> + + <!-- PMD plugin --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-pmd-plugin</artifactId> + <version>3.21.1-pmd-7.0.0-SNAPSHOT</version> + <dependencies> + <dependency> + <groupId>net.sourceforge.pmd</groupId> + <artifactId>pmd-core</artifactId> + <version>7.0.0-rc4</version> + </dependency> + <dependency> + <groupId>net.sourceforge.pmd</groupId> + <artifactId>pmd-java</artifactId> + <version>7.0.0-rc4</version> + </dependency> + </dependencies> + <configuration> + <linkXRef>false</linkXRef> + <rulesets> + <ruleset> + https://gitlab.cern.ch/nile/java-build-tools/-/raw/master/src/main/resources/pmd_java_ruleset.xml?ref_type=heads + </ruleset> + </rulesets> + <includeTests>true</includeTests> + <failOnViolation>true</failOnViolation> + <printFailingErrors>true</printFailingErrors> + </configuration> + </plugin> + + <!-- SpotBugs plugin --> + <plugin> + <groupId>com.github.spotbugs</groupId> + <artifactId>spotbugs-maven-plugin</artifactId> + <version>4.8.1.0</version> + <configuration> + <effort>Max</effort> + <xmlOutput>false</xmlOutput> + <htmlOutput>true</htmlOutput> + <plugins> + <plugin> + <groupId>com.h3xstream.findsecbugs</groupId> + <artifactId>findsecbugs-plugin</artifactId> + <version>1.12.0</version> + </plugin> + </plugins> + </configuration> + </plugin> + + <!-- Compiler plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> + + <!-- Surefire plugin for testing --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> @@ -95,4 +161,38 @@ </plugin> </plugins> </build> + + + <repositories> + <repository> + <id>gitlab-maven</id> + <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> + </repository> + </repositories> + + <pluginRepositories> + <pluginRepository> + <id>apache.snapshots</id> + <name>Apache Snapshot Repository</name> + <url>https://repository.apache.org/snapshots</url> + <releases> + <enabled>false</enabled> + </releases> + <snapshots> + <enabled>true</enabled> + </snapshots> + </pluginRepository> + </pluginRepositories> + + <distributionManagement> + <repository> + <id>gitlab-maven</id> + <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> + </repository> + <snapshotRepository> + <id>gitlab-maven</id> + <url>${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/maven</url> + </snapshotRepository> + </distributionManagement> + </project> diff --git a/src/main/java/ch/cern/nile/common/Main.java b/src/main/java/ch/cern/nile/common/Main.java deleted file mode 100644 index 6724ee65e28b8498e5ac8ed1f6f7899e54928aab..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/Main.java +++ /dev/null @@ -1,58 +0,0 @@ -package ch.cern.nile.common; - - -import ch.cern.nile.common.clients.KafkaStreamsClient; -import ch.cern.nile.common.configs.PropertiesCheck; -import ch.cern.nile.common.configs.StreamConfig; -import ch.cern.nile.common.configs.StreamType; -import ch.cern.nile.common.streams.Streaming; - -import java.io.FileInputStream; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.Properties; - -public class Main { - - /** - * Main method. - * - * @param args the properties files - */ - public static void main(String[] args) { - // Check if properties file was passed - if (args.length < 1) { - throw new RuntimeException("Expecting args[0] to be the path to the configuration file"); - } - - // Loading properties file - String configsPath = args[0]; - final Properties configs = new Properties(); - try { - configs.load(new FileInputStream(configsPath)); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - - StreamType sType = StreamType.valueOf(configs.getProperty(StreamConfig.CommonProperties.STREAM_TYPE.getValue(), null)); - - PropertiesCheck.validateProperties(configs, sType); - - // Initialize Kafka Client - final KafkaStreamsClient client = new KafkaStreamsClient(); - client.configure(configs); - - // Start Streaming - try { - Class<?> clazz = Class.forName(configs.getProperty(StreamConfig.CommonProperties.STREAM_CLASS.getValue())); - final Streaming streaming; - streaming = (Streaming) clazz.getDeclaredConstructor().newInstance(); - streaming.configure(configs); - streaming.stream(client); - } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | ClassCastException - | InvocationTargetException | NoSuchMethodException e) { - e.printStackTrace(); - } - } -} diff --git a/src/main/java/ch/cern/nile/common/StreamingApplication.java b/src/main/java/ch/cern/nile/common/StreamingApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..b18289ee5a9f2746c7f578342335bb52a951b2ec --- /dev/null +++ b/src/main/java/ch/cern/nile/common/StreamingApplication.java @@ -0,0 +1,84 @@ +package ch.cern.nile.common; + + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.cern.nile.common.clients.KafkaStreamsClient; +import ch.cern.nile.common.configuration.PropertiesCheck; +import ch.cern.nile.common.configuration.StreamType; +import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.exceptions.StreamingException; +import ch.cern.nile.common.streams.Streaming; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * {@link StreamingApplication} is the entry point for initializing and starting a Kafka Streams application. + * This class provides the main method to load configuration properties, perform necessary validations, + * and bootstrap the streaming process using a specified streaming implementation. + */ +public final class StreamingApplication { + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamingApplication.class); + private static final int MIN_ARGS_LENGTH = 1; + + private StreamingApplication() { + } + + /** + * The main method for the StreamingApplication, serving as the entry point of the application. + * It loads configuration properties from a provided file path, validates these properties, + * and initializes the streaming process using a dynamically loaded Streaming implementation. + * + * @param args command-line arguments, expecting the path to the properties file as the first argument + * @throws IllegalArgumentException if the properties file path is not provided + * @throws StreamingException if there are issues with loading the properties file, + * validating properties, or starting the streaming process + */ + @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "This method is only used internally") + public static void main(final String[] args) { + if (args.length < MIN_ARGS_LENGTH) { + throw new IllegalArgumentException("Properties file not passed"); + } + + final String configPath = args[0]; + final Properties configs = new Properties(); + try { + configs.load(Files.newInputStream(Paths.get(configPath))); + } catch (IOException e) { + final String message = "Error while loading the properties file"; + LOGGER.error(message, e); + throw new StreamingException(message, e); + } + + final StreamType sType = + StreamType.valueOf(configs.getProperty(CommonProperties.STREAM_TYPE.getValue(), null)); + + PropertiesCheck.validateProperties(configs, sType); + + final KafkaStreamsClient client = new KafkaStreamsClient(); + client.configure(configs); + + try { + final Class<?> clazz = + Class.forName(configs.getProperty(CommonProperties.STREAM_CLASS.getValue())); + final Streaming streaming; + streaming = (Streaming) clazz.getDeclaredConstructor().newInstance(); + streaming.configure(configs); + streaming.stream(client); + } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | ClassCastException + | InvocationTargetException | NoSuchMethodException e) { + final String message = "Error while starting the stream"; + LOGGER.error(message, e); + throw new StreamingException(message, e); + } + } + +} diff --git a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java index 74b490dc5eccac6f4c5c7c63be5d3583e20d5cd0..5ec6f0519060a1fe628532243890513f3600a338 100644 --- a/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java +++ b/src/main/java/ch/cern/nile/common/clients/KafkaStreamsClient.java @@ -4,8 +4,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Properties; -import ch.cern.nile.common.configs.Configure; -import ch.cern.nile.common.exceptions.ReverseDnsLookupException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.Serdes; @@ -14,72 +12,99 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; -import ch.cern.nile.common.configs.StreamConfig; + +import ch.cern.nile.common.configuration.Configure; +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.exceptions.ReverseDnsLookupException; import ch.cern.nile.common.json.JsonSerde; /** - * This class is responsible for creating and configuring KafkaStreams instances. + * A client for creating and configuring KafkaStreams instances. */ public class KafkaStreamsClient implements Configure { - private Properties properties; + private static final String TEST_CLUSTER_NAME = "test"; + + private Properties properties; - @Override - public void configure(Properties configs) { - final String clientId = configs.getProperty(StreamConfig.ClientProperties.CLIENT_ID.getValue()); - properties = new Properties(); - properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId); - properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); + /** + * Configures the KafkaStreams instance using the provided properties. This method sets up various + * configuration options such as application ID, client ID, bootstrap servers, security protocols, + * and serialization/deserialization settings based on the properties provided. + * + * @param props the properties to be used for the configuration. Expected properties include + * client ID, Kafka cluster information, and security settings. + */ + @Override + public void configure(final Properties props) { + final String clientId = props.getProperty(ClientProperties.CLIENT_ID.getValue()); + this.properties = new Properties(); + this.properties.put(StreamsConfig.APPLICATION_ID_CONFIG, clientId); + this.properties.put(StreamsConfig.CLIENT_ID_CONFIG, clientId); - String kafkaCluster = configs.getProperty(StreamConfig.ClientProperties.KAFKA_CLUSTER.getValue()); + final String kafkaCluster = props.getProperty(ClientProperties.KAFKA_CLUSTER.getValue()); - if (!kafkaCluster.equals("test")) { - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster)); - properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); - properties.put(SaslConfigs.SASL_MECHANISM, "GSSAPI"); - properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); - properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, configs.getProperty(StreamConfig.ClientProperties.TRUSTSTORE_LOCATION.getValue())); - } else { - properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, configs.getProperty("bootstrap.servers")); + if (TEST_CLUSTER_NAME.equals(kafkaCluster)) { + this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("bootstrap.servers")); + } else { + this.properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.reverseDnsLookup(kafkaCluster)); + this.properties.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + this.properties.put(SaslConfigs.SASL_MECHANISM, "GSSAPI"); + this.properties.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka"); + this.properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, + props.getProperty(ClientProperties.TRUSTSTORE_LOCATION.getValue())); + } + + this.properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + this.properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + this.properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + LogAndContinueExceptionHandler.class.getName()); + this.properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + DefaultProductionExceptionHandler.class.getName()); } - properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName()); - properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultProductionExceptionHandler.class.getName()); - } + /** + * Creates a KafkaStreams instance using the provided topology. + * + * @param topology the topology to be used for the KafkaStreams instance + * @return a configured KafkaStreams instance + */ + public KafkaStreams create(final Topology topology) { + return new KafkaStreams(topology, properties); + } - /** - * Creates a KafkaStreams instance using the provided topology. - * - * @param topology the topology to be used for the KafkaStreams instance - * @return a configured KafkaStreams instance - */ - public KafkaStreams create(Topology topology) { - return new KafkaStreams(topology, properties); - } + /** + * Resolves the provided Kafka cluster domain to a comma-separated list of hostnames with port 9093. + * + * @param kafkaCluster the domain of the Kafka cluster + * @return a comma-separated list of hostnames with port 9093 + * @throws RuntimeException if the hostname resolution fails + */ + private String reverseDnsLookup(final String kafkaCluster) { + try { + return performDnsLookup(kafkaCluster); + } catch (UnknownHostException e) { + throw new ReverseDnsLookupException( + "Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster, e); + } + } - /** - * Resolves the provided Kafka cluster domain to a comma-separated list of - * hostnames with port 9093. - * - * @param kafkaCluster the domain of the Kafka cluster - * @return a comma-separated list of hostnames with port 9093 - * @throws RuntimeException if the hostname resolution fails - */ - private String reverseDnsLookup(String kafkaCluster) { - try { - StringBuilder sb = new StringBuilder(); - InetAddress[] address = InetAddress.getAllByName(kafkaCluster); - for (InetAddress host : address) { - final String hostName = InetAddress.getByName(host.getHostAddress()).getHostName(); - // FIXME: add configuration for the port - sb.append(hostName).append(":9093,"); - } - sb.deleteCharAt(sb.length() - 1); - return sb.toString(); - } catch (UnknownHostException e) { - throw new ReverseDnsLookupException("Failed to perform reverse DNS lookup for the Kafka cluster: " + kafkaCluster, e); + /** + * Resolves the provided Kafka cluster domain to a comma-separated list of hostnames with port 9093. + * This method performs a reverse DNS lookup and is used internally for setting up Kafka connections. + * + * @param kafkaCluster the domain of the Kafka cluster + * @return a comma-separated list of hostnames with port 9093 + * @throws ReverseDnsLookupException if the hostname resolution fails + */ + protected String performDnsLookup(final String kafkaCluster) throws UnknownHostException { + final StringBuilder stringBuilder = new StringBuilder(); + final InetAddress[] address = InetAddress.getAllByName(kafkaCluster); + for (final InetAddress host : address) { + final String hostName = InetAddress.getByName(host.getHostAddress()).getHostName(); + stringBuilder.append(hostName).append(":9093,"); + } + stringBuilder.deleteCharAt(stringBuilder.length() - 1); + return stringBuilder.toString(); } - } } diff --git a/src/main/java/ch/cern/nile/common/configs/Configure.java b/src/main/java/ch/cern/nile/common/configs/Configure.java deleted file mode 100644 index b0b3c4db0c965f004194736065406f3723b3b634..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/configs/Configure.java +++ /dev/null @@ -1,10 +0,0 @@ -package ch.cern.nile.common.configs; - -import java.util.Properties; - -/** - * Interface for classes that can be configured with a Properties object. - */ -public interface Configure { - void configure(Properties configs); -} diff --git a/src/main/java/ch/cern/nile/common/configs/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configs/PropertiesCheck.java deleted file mode 100644 index bb95779c67cda0c09186abdd21a65e966f85898c..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/configs/PropertiesCheck.java +++ /dev/null @@ -1,71 +0,0 @@ -package ch.cern.nile.common.configs; - -import java.util.Objects; -import java.util.Properties; -import java.util.Set; - -import ch.cern.nile.common.exceptions.MissingPropertyException; -import ch.cern.nile.common.exceptions.UnknownStreamTypeException; - -/** - * A utility class to validate properties based on the type of stream. - */ -public final class PropertiesCheck { - - private PropertiesCheck(){} - - private static final Set<String> CLIENT_PROPERTIES = StreamConfig.ClientProperties.getValues(); - private static final Set<String> COMMON_PROPERTIES = StreamConfig.CommonProperties.getValues(); - private static final Set<String> DECODING_PROPERTIES = StreamConfig.DecodingProperties.getValues(); - private static final Set<String> ROUTING_PROPERTIES = StreamConfig.RoutingProperties.getValues(); - private static final Set<String> ENRICHMENT_PROPERTIES = StreamConfig.EnrichmentProperties.getValues(); - - /** - * Validates the properties file based on the type of stream. - * - * @param properties - properties already loaded from file into java.util.Properties object. - * @param streamType - type of stream defined in the properties file. - * @throws MissingPropertyException if a required property is missing from the properties object. - * @throws UnknownStreamTypeException if the stream type is unknown. - */ - public static void validateProperties(Properties properties, StreamType streamType) { - Objects.requireNonNull(properties, "Properties object cannot be null"); - Objects.requireNonNull(streamType, "Properties file is missing stream.type property"); - - validateRequiredProperties(properties, CLIENT_PROPERTIES); - validateRequiredProperties(properties, COMMON_PROPERTIES); - - switch (streamType) { - case DECODING: - validateRequiredProperties(properties, DECODING_PROPERTIES); - break; - case ROUTING: - validateRequiredProperties(properties, ROUTING_PROPERTIES); - break; - case ENRICHMENT: - validateRequiredProperties(properties, ENRICHMENT_PROPERTIES); - break; - default: - throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType)); - } - } - - /** - * Validates the required properties within the given properties object. - * - * @param props - properties object to check for required properties. - * @param propsToCheck - set of required property keys. - * @throws MissingPropertyException if a required property is missing from the properties object. - */ - private static void validateRequiredProperties(Properties props, Set<String> propsToCheck) { - Objects.requireNonNull(props, "Properties object cannot be null"); - Objects.requireNonNull(propsToCheck, "Properties to check cannot be null"); - - for (String prop : propsToCheck) { - if (!props.containsKey(prop)) { - throw new MissingPropertyException(String.format("Properties file is missing: %s property.", prop)); - } - } - } - -} diff --git a/src/main/java/ch/cern/nile/common/configs/StreamConfig.java b/src/main/java/ch/cern/nile/common/configs/StreamConfig.java deleted file mode 100644 index e0312f9290a5ec40a0982ca1fcbe744cc8f27434..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/configs/StreamConfig.java +++ /dev/null @@ -1,109 +0,0 @@ -package ch.cern.nile.common.configs; - -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; - -/** - * A class containing enums representing various stream configuration property categories. - */ -public class StreamConfig { - - public enum ClientProperties { - SOURCE_TOPIC("source.topic"), - KAFKA_CLUSTER("kafka.cluster"), - CLIENT_ID("client.id"), - TRUSTSTORE_LOCATION("truststore.location"); - - private final String value; - - ClientProperties(String value) { - this.value = value; - } - - public static Set<String> getValues() { - return Arrays.stream(values()).map(o -> o.value).collect(Collectors.toSet()); - } - - public String getValue() { - return value; - } - } - - public enum CommonProperties { - STREAM_TYPE("stream.type"), - STREAM_CLASS("stream.class"); - - private final String value; - - CommonProperties(String value) { - this.value = value; - } - - public static Set<String> getValues() { - return Arrays.stream(values()).map(o -> o.value).collect(Collectors.toSet()); - } - - public String getValue() { - return value; - } - } - - - public enum DecodingProperties { - SINK_TOPIC("sink.topic"); - - private final String value; - - DecodingProperties(String value) { - this.value = value; - } - - public static Set<String> getValues() { - return Arrays.stream(values()).map(o -> o.value).collect(Collectors.toSet()); - } - - public String getValue() { - return value; - } - } - - public enum RoutingProperties { - ROUTING_CONFIG_PATH("routing.config.path"), - DLQ_TOPIC("dlq.topic"); - - private final String value; - - RoutingProperties(String value) { - this.value = value; - } - - public static Set<String> getValues() { - return Arrays.stream(values()).map(o -> o.value).collect(Collectors.toSet()); - } - - public String getValue() { - return value; - } - } - - public enum EnrichmentProperties { - ENRICHMENT_CONFIG_PATH("enrichment.config.path"), - SINK_TOPIC("sink.topic"); - - private final String value; - - EnrichmentProperties(String value) { - this.value = value; - } - - public static Set<String> getValues() { - return Arrays.stream(values()).map(o -> o.value).collect(Collectors.toSet()); - } - - public String getValue() { - return value; - } - } - -} diff --git a/src/main/java/ch/cern/nile/common/configuration/Configure.java b/src/main/java/ch/cern/nile/common/configuration/Configure.java new file mode 100644 index 0000000000000000000000000000000000000000..fa9e8eeb9f9f6de05b175592e4af4e0bdfd4bce1 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/Configure.java @@ -0,0 +1,16 @@ +package ch.cern.nile.common.configuration; + +import java.util.Properties; + +/** + * Interface for classes that can be configured with a Properties object. + */ +public interface Configure { + + /** + * Configure this class. + * + * @param properties the properties to use for configuration + */ + void configure(Properties properties); +} diff --git a/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java new file mode 100644 index 0000000000000000000000000000000000000000..6563b5d003ccac0a49d067498d7f999e4636c9b9 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/PropertiesCheck.java @@ -0,0 +1,81 @@ +package ch.cern.nile.common.configuration; + +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.DecodingProperties; +import ch.cern.nile.common.configuration.properties.EnrichmentProperties; +import ch.cern.nile.common.configuration.properties.PropertyEnum; +import ch.cern.nile.common.configuration.properties.RoutingProperties; +import ch.cern.nile.common.exceptions.MissingPropertyException; +import ch.cern.nile.common.exceptions.UnknownStreamTypeException; + +/** + * A utility class to validate properties based on the type of stream. + */ +public final class PropertiesCheck { + + private static final Set<String> CLIENT_PROPERTIES = PropertyEnum.getValues(ClientProperties.class); + private static final Set<String> COMMON_PROPERTIES = PropertyEnum.getValues(CommonProperties.class); + private static final Set<String> DECODING_PROPERTIES = PropertyEnum.getValues(DecodingProperties.class); + private static final Set<String> ROUTING_PROPERTIES = PropertyEnum.getValues(RoutingProperties.class); + private static final Set<String> ENRICHMENT_PROPERTIES = PropertyEnum.getValues(EnrichmentProperties.class); + + private PropertiesCheck() { + } + + /** + * Validates the properties file based on the type of stream (DECODING, ROUTING, or ENRICHMENT). + * This method checks if all required properties for the specified stream type are present in the + * properties object, throwing exceptions if any are missing. + * + * @param properties the properties already loaded from file into java.util.Properties object + * @param streamType the type of stream defined in the properties file + * @throws MissingPropertyException if a required property is missing from the properties object + * @throws UnknownStreamTypeException if the stream type is unknown + */ + public static void validateProperties(final Properties properties, final StreamType streamType) { + Objects.requireNonNull(properties, "Properties object cannot be null"); + Objects.requireNonNull(streamType, "Properties file is missing stream.type property"); + + validateRequiredProperties(properties, CLIENT_PROPERTIES); + validateRequiredProperties(properties, COMMON_PROPERTIES); + + switch (streamType) { + case DECODING: + validateRequiredProperties(properties, DECODING_PROPERTIES); + break; + case ROUTING: + validateRequiredProperties(properties, ROUTING_PROPERTIES); + break; + case ENRICHMENT: + validateRequiredProperties(properties, ENRICHMENT_PROPERTIES); + break; + default: + // Cannot happen as the stream type is validated before this switch statement. + throw new UnknownStreamTypeException(String.format("Stream type unknown: %s.", streamType)); + } + } + + /** + * Validates the required properties within the given properties object. + * + * @param props the properties object to check for required properties. + * @param propsToCheck the set of required property keys. + * @throws MissingPropertyException if a required property is missing from the properties object. + */ + private static void validateRequiredProperties(final Properties props, final Set<String> propsToCheck) { + Objects.requireNonNull(props, "Properties object cannot be null"); + Objects.requireNonNull(propsToCheck, "Properties to check cannot be null"); + + for (final String prop : propsToCheck) { + if (!props.containsKey(prop)) { + throw new MissingPropertyException(String.format("Properties file is missing: %s property.", prop)); + } + } + } + +} diff --git a/src/main/java/ch/cern/nile/common/configs/StreamType.java b/src/main/java/ch/cern/nile/common/configuration/StreamType.java similarity index 56% rename from src/main/java/ch/cern/nile/common/configs/StreamType.java rename to src/main/java/ch/cern/nile/common/configuration/StreamType.java index b3b3565fe06e28e6d655c01c6d8d3bd8ddbbbb75..5b14d010e9bf8d66d5ab312d62366567e5e46daa 100644 --- a/src/main/java/ch/cern/nile/common/configs/StreamType.java +++ b/src/main/java/ch/cern/nile/common/configuration/StreamType.java @@ -1,12 +1,12 @@ -package ch.cern.nile.common.configs; +package ch.cern.nile.common.configuration; /** * Enum representing the types of streams supported by the application. */ public enum StreamType { - ROUTING, - DECODING, - ENRICHMENT + ROUTING, + DECODING, + ENRICHMENT } diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/ClientProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/ClientProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..99efbaf2c3e52c86a69ef1e350ef4b8ef1ac8aa7 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/ClientProperties.java @@ -0,0 +1,21 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + +/** + * Enum representing various properties specific to clients in the application. + */ +@Getter +public enum ClientProperties implements PropertyEnum { + SOURCE_TOPIC("source.topic"), + KAFKA_CLUSTER("kafka.cluster"), + CLIENT_ID("client.id"), + TRUSTSTORE_LOCATION("truststore.location"); + + private final String value; + + ClientProperties(final String value) { + this.value = value; + } + +} diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/CommonProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/CommonProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..bacde5e1c023a5f37b8b3003ee7581325bf1f24a --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/CommonProperties.java @@ -0,0 +1,19 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + +/** + * Enum representing common properties used throughout the application. + */ +@Getter +public enum CommonProperties implements PropertyEnum { + STREAM_TYPE("stream.type"), + STREAM_CLASS("stream.class"); + + private final String value; + + CommonProperties(final String value) { + this.value = value; + } + +} diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/DecodingProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/DecodingProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..5f79c32fc8fa4d945cb82e5ffd67468ef7c4604e --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/DecodingProperties.java @@ -0,0 +1,18 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + +/** + * Enum representing properties related to the decoding process in the application. + */ +@Getter +public enum DecodingProperties implements PropertyEnum { + SINK_TOPIC("sink.topic"); + + private final String value; + + DecodingProperties(final String value) { + this.value = value; + } + +} diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/EnrichmentProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/EnrichmentProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..61d3094aab4975fae076a68fcc91fc971a382db0 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/EnrichmentProperties.java @@ -0,0 +1,20 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + + +/** + * Enum representing properties related to data enrichment in the application. + */ +@Getter +public enum EnrichmentProperties implements PropertyEnum { + ENRICHMENT_CONFIG_PATH("enrichment.config.path"), + SINK_TOPIC("sink.topic"); + + private final String value; + + EnrichmentProperties(final String value) { + this.value = value; + } + +} diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/PropertyEnum.java b/src/main/java/ch/cern/nile/common/configuration/properties/PropertyEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..8b411b0d5e4492baf93435afc0d2986dfa41cd6d --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/PropertyEnum.java @@ -0,0 +1,38 @@ +package ch.cern.nile.common.configuration.properties; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Interface representing an enumeration of property keys. Enums implementing this interface + * can be used to define sets of configuration properties. + * Each enum constant in an implementing class represents a specific property key. + * <p> + * This interface provides a method to retrieve the string values associated with each enum constant. + * <p> + * Implementing enums should define a private final field to store the property key and use the + * {@link lombok.Getter} annotation to automatically generate the required getter method. + */ +public interface PropertyEnum { + + /** + * Retrieves the string value associated with this enum constant. + * It's suggested to use {@link lombok.Getter} to generate this method. + * + * @return the string value associated with this enum constant + */ + String getValue(); + + /** + * Retrieves the string values associated with each enum constant of a given enum type that + * implements PropertyEnum. + * + * @param enumClass the class object of the enum type. + * @param <E> the type of the enum class that implements PropertyEnum + * @return a set containing the string values of all the enum constants in the specified enum + */ + static <E extends Enum<E> & PropertyEnum> Set<String> getValues(final Class<E> enumClass) { + return Arrays.stream(enumClass.getEnumConstants()).map(PropertyEnum::getValue).collect(Collectors.toSet()); + } +} diff --git a/src/main/java/ch/cern/nile/common/configuration/properties/RoutingProperties.java b/src/main/java/ch/cern/nile/common/configuration/properties/RoutingProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..e97016c77a213e70f0060a11ee1826365a0c5794 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/configuration/properties/RoutingProperties.java @@ -0,0 +1,20 @@ +package ch.cern.nile.common.configuration.properties; + +import lombok.Getter; + + +/** + * Enum representing properties related to message routing within the application. + */ +@Getter +public enum RoutingProperties implements PropertyEnum { + ROUTING_CONFIG_PATH("routing.config.path"), + DLQ_TOPIC("dlq.topic"); + + private final String value; + + RoutingProperties(final String value) { + this.value = value; + } + +} diff --git a/src/main/java/ch/cern/nile/common/exceptions/DecodingException.java b/src/main/java/ch/cern/nile/common/exceptions/DecodingException.java index 7b02130f1aa1fd86f33345708e5ca21ccd23882c..628c86ab37c02c9e20e9ffd87d97d896659f6082 100644 --- a/src/main/java/ch/cern/nile/common/exceptions/DecodingException.java +++ b/src/main/java/ch/cern/nile/common/exceptions/DecodingException.java @@ -2,12 +2,14 @@ package ch.cern.nile.common.exceptions; public class DecodingException extends RuntimeException { - public DecodingException(String message, Throwable err) { - super(message, err); - } + private static final long serialVersionUID = 1L; - public DecodingException(String message) { - super(message); - } + public DecodingException(final String message, final Throwable cause) { + super(message, cause); + } + + public DecodingException(final String message) { + super(message); + } } diff --git a/src/main/java/ch/cern/nile/common/exceptions/HealthProbeException.java b/src/main/java/ch/cern/nile/common/exceptions/HealthProbeException.java new file mode 100644 index 0000000000000000000000000000000000000000..7cca9b20565e3173cf59573cde5296feb514e6db --- /dev/null +++ b/src/main/java/ch/cern/nile/common/exceptions/HealthProbeException.java @@ -0,0 +1,11 @@ +package ch.cern.nile.common.exceptions; + +public class HealthProbeException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public HealthProbeException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/ch/cern/nile/common/exceptions/InvalidStreamTypeException.java b/src/main/java/ch/cern/nile/common/exceptions/InvalidStreamTypeException.java deleted file mode 100644 index 47c56519b0e49c0b69677c6fb1ca9e9771d1c281..0000000000000000000000000000000000000000 --- a/src/main/java/ch/cern/nile/common/exceptions/InvalidStreamTypeException.java +++ /dev/null @@ -1,9 +0,0 @@ -package ch.cern.nile.common.exceptions; - -public class InvalidStreamTypeException extends IllegalArgumentException { - - public InvalidStreamTypeException(String message) { - super(message); - } - -} \ No newline at end of file diff --git a/src/main/java/ch/cern/nile/common/exceptions/MissingPropertyException.java b/src/main/java/ch/cern/nile/common/exceptions/MissingPropertyException.java index 17651663fcee7adb683c13e56af98552cdb53a43..2c1a2cc945b7cd04973bb697a730efdde1ae8358 100644 --- a/src/main/java/ch/cern/nile/common/exceptions/MissingPropertyException.java +++ b/src/main/java/ch/cern/nile/common/exceptions/MissingPropertyException.java @@ -2,8 +2,14 @@ package ch.cern.nile.common.exceptions; public class MissingPropertyException extends RuntimeException { - public MissingPropertyException(String message) { - super(message); - } + private static final long serialVersionUID = 1L; -} \ No newline at end of file + public MissingPropertyException(final String message) { + super(message); + } + + public MissingPropertyException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/ch/cern/nile/common/exceptions/ReverseDnsLookupException.java b/src/main/java/ch/cern/nile/common/exceptions/ReverseDnsLookupException.java index e97b759a3055ff097cb0e87f715601c05a7c7865..fd31391959ec5105abd68e2c4a1d98ac2eb2d253 100644 --- a/src/main/java/ch/cern/nile/common/exceptions/ReverseDnsLookupException.java +++ b/src/main/java/ch/cern/nile/common/exceptions/ReverseDnsLookupException.java @@ -2,8 +2,10 @@ package ch.cern.nile.common.exceptions; public class ReverseDnsLookupException extends RuntimeException { - public ReverseDnsLookupException(String message, Throwable cause) { - super(message, cause); - } + private static final long serialVersionUID = 1L; -} \ No newline at end of file + public ReverseDnsLookupException(final String message, final Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java b/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java new file mode 100644 index 0000000000000000000000000000000000000000..0bc33ae6e340cee5fd00e43a8bdea5d25de0ec2a --- /dev/null +++ b/src/main/java/ch/cern/nile/common/exceptions/StreamingException.java @@ -0,0 +1,14 @@ +package ch.cern.nile.common.exceptions; + +public class StreamingException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public StreamingException(final Throwable cause) { + super(cause); + } + + public StreamingException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/ch/cern/nile/common/exceptions/UnknownStreamTypeException.java b/src/main/java/ch/cern/nile/common/exceptions/UnknownStreamTypeException.java index 4de99513d998b893643451441bcf9647e7f05346..6fb32f5888d5e097227c59f69de396cb812551bc 100644 --- a/src/main/java/ch/cern/nile/common/exceptions/UnknownStreamTypeException.java +++ b/src/main/java/ch/cern/nile/common/exceptions/UnknownStreamTypeException.java @@ -2,8 +2,10 @@ package ch.cern.nile.common.exceptions; public class UnknownStreamTypeException extends RuntimeException { - public UnknownStreamTypeException(String message) { - super(message); - } + private static final long serialVersionUID = 1L; -} \ No newline at end of file + public UnknownStreamTypeException(final String message) { + super(message); + } + +} diff --git a/src/main/java/ch/cern/nile/common/json/JsonPojoDeserializer.java b/src/main/java/ch/cern/nile/common/json/JsonPojoDeserializer.java index d54fe36efd2e9d575d55e145788c27638c86fb11..36b6638c86c62255b135338d6d34ecfb256f60f0 100644 --- a/src/main/java/ch/cern/nile/common/json/JsonPojoDeserializer.java +++ b/src/main/java/ch/cern/nile/common/json/JsonPojoDeserializer.java @@ -1,50 +1,74 @@ package ch.cern.nile.common.json; -import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import java.util.Map; + +import com.google.gson.Gson; + import org.apache.kafka.common.serialization.Deserializer; +/** + * A deserializer for JSON POJOs using Gson. This class implements the Deserializer interface + * from Apache Kafka and provides a mechanism to convert JSON byte data back into Java objects (POJOs) + * of a specified type. + * + * @param <T> The type of the POJO to be deserialized. + */ public class JsonPojoDeserializer<T> implements Deserializer<T> { - private static final Gson gson = new Gson(); - Class<T> tClass; + private static final Gson GSON = new Gson(); - /** - * Default constructor needed by Kafka. - */ - public JsonPojoDeserializer() { - } + /** + * The class type for the deserialization. + */ + private Class<T> tClass; - JsonPojoDeserializer(Class<T> clazz) { - this.tClass = clazz; - } + /** + * Constructs a new JsonPojoDeserializer with the given class type for deserialization. + * + * @param clazz the class type for the deserialization + */ + JsonPojoDeserializer(final Class<T> clazz) { + this.tClass = clazz; + } - @Override - @SuppressWarnings("unchecked") - public void configure(Map<String, ?> props, boolean isKey) { - if (tClass == null) { - tClass = (Class<T>) props.get("JsonPOJOClass"); + /** + * Configures this class with the given properties. + * + * @param props the properties from the consumer configuration + * @param isKey is ignored in this implementation + */ + @Override + @SuppressWarnings("unchecked") + public void configure(final Map<String, ?> props, final boolean isKey) { + if (tClass == null) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } } - } - - /** - * Deserialize the provided byte array into an object of type T. - * - * @param topic The topic associated with the data. - * @param bytes The byte array to be deserialized. - * @return The deserialized object of type T or null if the byte array is null. - */ - @Override - public T deserialize(String topic, byte[] bytes) { - if (bytes == null) { - return null; + + /** + * Deserializes the provided byte array into an object of type T. + * + * @param topic the topic associated with the data + * @param bytes the byte array to be deserialized + * @return the deserialized object of type T or null if the byte array is null + */ + @Override + public T deserialize(final String topic, final byte[] bytes) { + T deserializedData = null; + if (bytes != null) { + deserializedData = GSON.fromJson(new String(bytes, StandardCharsets.UTF_8), tClass); + } + return deserializedData; } - return gson.fromJson(new String(bytes, StandardCharsets.UTF_8), tClass); - } - @Override - public void close() { - } + /** + * Closes this deserializer. + * This method is required by the Serializer interface but does nothing in this implementation. + */ + @Override + public void close() { + // Nothing to do + } -} \ No newline at end of file +} diff --git a/src/main/java/ch/cern/nile/common/json/JsonPojoSerializer.java b/src/main/java/ch/cern/nile/common/json/JsonPojoSerializer.java index b11221e77ad0d07a8aed8789bc48f0b4ac9fbcf9..384a57ddb85bcbfe5dda5e9516a8b01ef0c831a2 100644 --- a/src/main/java/ch/cern/nile/common/json/JsonPojoSerializer.java +++ b/src/main/java/ch/cern/nile/common/json/JsonPojoSerializer.java @@ -1,41 +1,58 @@ package ch.cern.nile.common.json; -import com.google.gson.Gson; import java.nio.charset.StandardCharsets; import java.util.Map; + +import com.google.gson.Gson; + import org.apache.kafka.common.serialization.Serializer; +/** + * A serializer for JSON POJOs using Gson. This class implements the Serializer interface + * from Apache Kafka and provides a mechanism to convert Java objects (POJOs) of a specified type + * into JSON byte data. + * + * @param <T> The type of the POJO to be serialized. + */ public class JsonPojoSerializer<T> implements Serializer<T> { - private static final Gson gson = new Gson(); - - /** - * Default constructor needed by Kafka. - */ - public JsonPojoSerializer() { - } - - @Override - public void configure(Map<String, ?> props, boolean isKey) { - } - - /** - * Serialize the provided data as a JSON string and convert it to bytes. - * - * @param topic The topic associated with the data. - * @param data The data to be serialized. - * @return The serialized data as bytes or null if the data is null. - */ - @Override - public byte[] serialize(String topic, T data) { - if (data == null) { - return null; + private static final Gson GSON = new Gson(); + + /** + * Configures this serializer. This method is part of the Serializer interface but is not used + * in this implementation. + * + * @param props is ignored in this implementation + * @param isKey is ignored in this implementation + */ + @Override + public void configure(final Map<String, ?> props, final boolean isKey) { + // Nothing to do } - return gson.toJson(data).getBytes(StandardCharsets.UTF_8); - } - @Override - public void close() { - } + /** + * Serializes the provided data into a JSON string using Gson and converts it to a byte array. + * + * @param topic the topic associated with the data. This is not used in the serialization process + * @param data the POJO to be serialized + * @return the serialized data as bytes, or null if the data is null + */ + @Override + public byte[] serialize(final String topic, final T data) { + byte[] serializedData = null; + if (data != null) { + serializedData = GSON.toJson(data).getBytes(StandardCharsets.UTF_8); + } + return serializedData; + } + + /** + * Closes this serializer. + * This method is required by the Serializer interface but does nothing in this implementation. + */ + @Override + public void close() { + // Nothing to do + } } diff --git a/src/main/java/ch/cern/nile/common/json/JsonSerde.java b/src/main/java/ch/cern/nile/common/json/JsonSerde.java index 1afd5838baa17247718b553bc35cd5fc190efcaf..a5b518cf6a816a52300e394a531c120313b20d2a 100644 --- a/src/main/java/ch/cern/nile/common/json/JsonSerde.java +++ b/src/main/java/ch/cern/nile/common/json/JsonSerde.java @@ -1,34 +1,63 @@ package ch.cern.nile.common.json; -import com.google.gson.JsonObject; import java.util.Map; + +import com.google.gson.JsonObject; + import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; +/** + * A Serde (Serializer/Deserializer) implementation for JSON objects using Gson. This class + * provides both serialization and deserialization capabilities for Kafka streams to handle + * JSON objects represented by the {@link JsonObject} class. + */ public class JsonSerde implements Serde<JsonObject> { - final JsonPojoSerializer<JsonObject> serializer = new JsonPojoSerializer<>(); - final JsonPojoDeserializer<JsonObject> deserializer = new JsonPojoDeserializer<>(JsonObject.class); - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - serializer.configure(configs, isKey); - deserializer.configure(configs, isKey); - } - - @Override - public void close() { - serializer.close(); - deserializer.close(); - } - - @Override - public Serializer<JsonObject> serializer() { - return serializer; - } - - @Override - public Deserializer<JsonObject> deserializer() { - return deserializer; - } + + private final JsonPojoSerializer<JsonObject> jsonSerializer = new JsonPojoSerializer<>(); + private final JsonPojoDeserializer<JsonObject> jsonDeserializer = new JsonPojoDeserializer<>(JsonObject.class); + + /** + * Configures this Serde with the given properties. This method configures both the internal + * serializer and deserializer with the provided configuration settings. + * + * @param configs the properties from the consumer or producer configuration + * @param isKey indicates whether this Serde is being used for key serialization/deserialization + * This parameter is ignored in this implementation + */ + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + jsonSerializer.configure(configs, isKey); + jsonDeserializer.configure(configs, isKey); + } + + /** + * Closes this Serde. This method closes both the internal serializer and deserializer. + */ + @Override + public void close() { + jsonSerializer.close(); + jsonDeserializer.close(); + } + + /** + * Returns the serializer component of this Serde. + * + * @return The {@link JsonPojoSerializer} for serializing JSON objects + */ + @Override + public Serializer<JsonObject> serializer() { + return jsonSerializer; + } + + /** + * Returns the deserializer component of this Serde. + * + * @return The {@link JsonPojoDeserializer} for deserializing JSON objects + */ + @Override + public Deserializer<JsonObject> deserializer() { + return jsonDeserializer; + } } diff --git a/src/main/java/ch/cern/nile/common/models/Application.java b/src/main/java/ch/cern/nile/common/models/Application.java index b91c0afb721c9651170f4f34a8a228cab28a7f3d..8f0136991627f580134869057722aa3e266bb539 100644 --- a/src/main/java/ch/cern/nile/common/models/Application.java +++ b/src/main/java/ch/cern/nile/common/models/Application.java @@ -1,17 +1,24 @@ package ch.cern.nile.common.models; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +/** + * Model representing an application with its name and associated topic. + * Primarily used in serialization and deserialization processes. + */ @NoArgsConstructor @Getter @Setter @ToString +@SuppressFBWarnings(value = "EI_EXPOSE_REP", + justification = "This is a model class used for serialization and deserialization") public class Application { - private String name; - private Topic topic; + private String name; + private Topic topic; } diff --git a/src/main/java/ch/cern/nile/common/models/Topic.java b/src/main/java/ch/cern/nile/common/models/Topic.java index a65ec396fb11b5ebbc04400c2b2e4a7ce2c6993b..686ee8aaf2c8285189134e9eb5836021bcf8aea4 100644 --- a/src/main/java/ch/cern/nile/common/models/Topic.java +++ b/src/main/java/ch/cern/nile/common/models/Topic.java @@ -1,16 +1,23 @@ package ch.cern.nile.common.models; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +/** + * Model representing a topic, identified by its name. + * Used in serialization and deserialization processes. + */ @NoArgsConstructor @Getter @Setter @ToString +@SuppressFBWarnings(value = "EI_EXPOSE_REP", + justification = "This is a model class used for serialization and deserialization") public class Topic { - private String name; + private String name; } diff --git a/src/main/java/ch/cern/nile/common/probes/Health.java b/src/main/java/ch/cern/nile/common/probes/Health.java index 532cba07ef13d67e9a875555a077370b449efae4..1ca381edf6a2adde845a2b9353e23353a5b28f8f 100644 --- a/src/main/java/ch/cern/nile/common/probes/Health.java +++ b/src/main/java/ch/cern/nile/common/probes/Health.java @@ -1,45 +1,88 @@ package ch.cern.nile.common.probes; -import com.sun.net.httpserver.HttpServer; import java.io.IOException; import java.net.InetSocketAddress; + +import com.sun.net.httpserver.HttpServer; + import org.apache.kafka.streams.KafkaStreams; +import ch.cern.nile.common.exceptions.HealthProbeException; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * A simple HTTP server that responds to health checks at the "/health" endpoint. + * It returns a 200 OK response if the KafkaStreams instance is running, or a 500 Internal Server Error + * if it is not running. By default, the server listens on port 8899. + */ public class Health { - private static final int OK = 200; - private static final int ERROR = 500; - private static final int PORT = 8899; - - private final KafkaStreams streams; - private HttpServer server; - - public Health(KafkaStreams streams) { - this.streams = streams; - } - - /** - * Start the Health http server. - */ - public void start() { - try { - server = HttpServer.create(new InetSocketAddress(PORT), 0); - } catch (IOException ioe) { - throw new RuntimeException("Could not setup http server: ", ioe); + private static final int OK_RESPONSE = 200; + private static final int ERROR_RESPONSE = 500; + private static final int PORT = 8899; + + private final KafkaStreams streams; + private HttpServer server; + private final HttpServerFactory httpServerFactory; + + /** + * Creates a new Health instance that will respond to health checks on port 8899. + * Health checks determine the running state of the provided KafkaStreams instance. + * + * @param streams the KafkaStreams instance to check the state of + */ + public Health(final KafkaStreams streams) { + this(streams, new DefaultHttpServerFactory()); } - server.createContext("/health", exchange -> { - int responseCode = streams.state().isRunning() ? OK : ERROR; - exchange.sendResponseHeaders(responseCode, 0); - exchange.close(); - }); - server.start(); - } - - /** - * Stops the Health HTTP server. - */ - public void stop() { - server.stop(0); - } + /** + * Creates a new Health instance that will respond to health checks on port 8899, using the provided + * HttpServerFactory. This constructor is useful for testing. Health checks determine the running state + * of the provided KafkaStreams instance. + * + * @param streams the KafkaStreams instance to check the state of + * @param httpServerFactory the factory to use to create the HttpServer instance + */ + @SuppressFBWarnings(value = "EI_EXPOSE_REP2", + justification = "This is an internal class and the HttpServerFactory is not exposed to the outside") + public Health(final KafkaStreams streams, final HttpServerFactory httpServerFactory) { + this.streams = streams; + this.httpServerFactory = httpServerFactory; + } + + /** + * Starts the Health HTTP server. The server listens for health check requests and responds + * based on the state of the KafkaStreams instance. + */ + public void start() { + try { + server = httpServerFactory.createHttpServer(new InetSocketAddress(PORT), 0); + } catch (IOException ex) { + throw new HealthProbeException("Failed to create HTTP server", ex); + } + server.createContext("/health", exchange -> { + final int responseCode = streams.state().isRunning() ? OK_RESPONSE : ERROR_RESPONSE; + exchange.sendResponseHeaders(responseCode, 0); + exchange.close(); + }); + server.start(); + } + + /** + * Stops the Health HTTP server, terminating the health check responses. + */ + public void stop() { + server.stop(0); + } + + /** + * The default HttpServerFactory implementation used to create HttpServer instances. + */ + private static final class DefaultHttpServerFactory implements HttpServerFactory { + @Override + public HttpServer createHttpServer(final InetSocketAddress address, final int backlog) throws IOException { + return HttpServer.create(address, backlog); + } + } } diff --git a/src/main/java/ch/cern/nile/common/probes/HttpServerFactory.java b/src/main/java/ch/cern/nile/common/probes/HttpServerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..51969de0cff83d70e07f3838fa8cafd8cdb1eaac --- /dev/null +++ b/src/main/java/ch/cern/nile/common/probes/HttpServerFactory.java @@ -0,0 +1,24 @@ +package ch.cern.nile.common.probes; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.sun.net.httpserver.HttpServer; + +/** + * Factory for creating HttpServer instances. This interface is used to allow mocking of HttpServer + * in tests and to provide flexibility in the instantiation of HttpServer, facilitating dependency + * injection and customization. + */ +public interface HttpServerFactory { + + /** + * Creates a new HttpServer instance bound to the specified address with the given backlog. + * + * @param address the address to bind the server to + * @param backlog the maximum number of pending connections + * @return the created HttpServer instance + * @throws IOException if an I/O error occurs when creating the HttpServer + */ + HttpServer createHttpServer(InetSocketAddress address, int backlog) throws IOException; +} diff --git a/src/main/java/ch/cern/nile/common/schema/JsonType.java b/src/main/java/ch/cern/nile/common/schema/JsonType.java index f15754311393ad58a7651ca960c7b2b6ad12f49d..784a9af9238b318a4dd3ce2fd4c6b86f5ec636d2 100644 --- a/src/main/java/ch/cern/nile/common/schema/JsonType.java +++ b/src/main/java/ch/cern/nile/common/schema/JsonType.java @@ -2,40 +2,54 @@ package ch.cern.nile.common.schema; import java.util.Date; -enum JsonType { - BYTE(Byte.class, "int8"), - SHORT(Short.class, "int16"), - INTEGER(Integer.class, "int32"), - LONG(Long.class, "int64"), - FLOAT(Float.class, "float"), - DOUBLE(Double.class, "double"), - BOOLEAN(Boolean.class, "boolean"), - STRING(String.class, "string"), - DATE(Date.class, "int64"), - BYTE_ARRAY(byte[].class, "bytes"); - - private final Class<?> clazz; - private final String type; +import lombok.Getter; - JsonType(Class<?> clazz, String type) { - this.clazz = clazz; - this.type = type; - } +/** + * Enum mapping Java classes to their corresponding JSON types for Connect schema(s). + * This enum provides a convenient way to determine the JSON type representation + * of various Java data types. + */ +@Getter +enum JsonType { + BYTE(Byte.class, "int8"), + SHORT(Short.class, "int16"), + INTEGER(Integer.class, "int32"), + LONG(Long.class, "int64"), + FLOAT(Float.class, "float"), + DOUBLE(Double.class, "double"), + BOOLEAN(Boolean.class, "boolean"), + STRING(String.class, "string"), + DATE(Date.class, "int64"), + BYTE_ARRAY(byte[].class, "bytes"); - public Class<?> getClazz() { - return clazz; - } + private final Class<?> clazz; + private final String type; - public String getType() { - return type; - } + /** + * Constructs a new JsonType enum constant. + * + * @param clazz the Java class associated with this JSON type + * @param type the string representation of the JSON type + */ + JsonType(final Class<?> clazz, final String type) { + this.clazz = clazz; + this.type = type; + } - public static JsonType fromClass(Class<?> clazz) { - for (JsonType jsonType : JsonType.values()) { - if (jsonType.getClazz().equals(clazz)) { - return jsonType; - } + /** + * Returns the JsonType corresponding to the given Java class. + * Throws an IllegalArgumentException if the class is not supported. + * + * @param clazz the Java class to find the corresponding JsonType for + * @return the JsonType corresponding to the given class + * @throws IllegalArgumentException if the class is not supported + */ + static JsonType fromClass(final Class<?> clazz) { + for (final JsonType jsonType : values()) { + if (jsonType.getClazz().equals(clazz)) { + return jsonType; + } + } + throw new IllegalArgumentException("Unsupported class: " + clazz.getSimpleName()); } - throw new IllegalArgumentException("Unsupported class: " + clazz.getSimpleName()); - } } diff --git a/src/main/java/ch/cern/nile/common/schema/SchemaInjector.java b/src/main/java/ch/cern/nile/common/schema/SchemaInjector.java index be2b36e8c0de9d7e2159c5de034220f01bf54794..f6e1d078c66b24f85b111f50d5264e9e7ade7e1a 100644 --- a/src/main/java/ch/cern/nile/common/schema/SchemaInjector.java +++ b/src/main/java/ch/cern/nile/common/schema/SchemaInjector.java @@ -2,77 +2,90 @@ package ch.cern.nile.common.schema; import java.util.Date; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; -public final class SchemaInjector { - - private SchemaInjector() { - } +import com.google.gson.JsonPrimitive; - /** - * Injects a Connect schema into the given data. - * - * @param data Data to inject the schema into. - * @return Data with the schema injected. - */ - public static Map<String, Object> inject(Map<String, Object> data) { - Map<String, Object> dataCopy = new HashMap<>(data); - Map<String, Object> schemaMap = generateSchemaMap(dataCopy); +/** + * Utility class for injecting Connect schemas into given data. The class provides static methods + * to generate a schema based on the data types present in a map and inject this schema into the data. + */ +public final class SchemaInjector { - Map<String, Object> result = new HashMap<>(); - result.put("schema", schemaMap); - result.put("payload", dataCopy); + private SchemaInjector() { + } - return result; - } + /** + * Injects a Connect schema into the given data. The method generates a schema based on the data types + * in the input map and returns a new map containing both the original data and the generated schema. + * + * @param data the data to inject the schema into + * @return a new map containing the original data and the injected schema + */ + public static Map<String, Object> inject(final Map<String, Object> data) { + final Map<String, Object> dataCopy = new HashMap<>(data); + final Map<String, Object> schemaMap = generateSchemaMap(dataCopy); + + final Map<String, Object> result = new HashMap<>(); + result.put("schema", schemaMap); + result.put("payload", dataCopy); + + return result; + } - private static Map<String, Object> generateSchemaMap(Map<String, Object> data) { - Map<String, Object> schemaMap = new HashMap<>(); - schemaMap.put("type", "struct"); - schemaMap.put("fields", generateFieldMaps(data)); + private static Map<String, Object> generateSchemaMap(final Map<String, Object> data) { + final Map<String, Object> schemaMap = new HashMap<>(); + schemaMap.put("type", "struct"); + schemaMap.put("fields", generateFieldMaps(data)); - return schemaMap; - } + return schemaMap; + } - private static Iterable<Map<String, Object>> generateFieldMaps(Map<String, Object> data) { - return data.entrySet().stream().map(SchemaInjector::generateFieldMap).collect(Collectors.toList()); - } + private static Iterable<Map<String, Object>> generateFieldMaps(final Map<String, Object> data) { + return data.entrySet().stream().map(SchemaInjector::generateFieldMap).collect(Collectors.toList()); + } - private static Map<String, Object> generateFieldMap(Map.Entry<String, Object> entry) { - Map<String, Object> fieldMap = new HashMap<>(); - String key = entry.getKey(); - Object value = entry.getValue(); + private static Map<String, Object> generateFieldMap(final Map.Entry<String, Object> entry) { + final Map<String, Object> fieldMap = new HashMap<>(); + final String key = entry.getKey(); + Object value = entry.getValue(); - validateValue(value); + validateValue(value); + if (value instanceof JsonPrimitive) { + // TODO(#): test this quick bugfix further + value = ((JsonPrimitive) value).getAsString(); + } - JsonType type = JsonType.fromClass(value.getClass()); + final JsonType type = JsonType.fromClass(value.getClass()); - fieldMap.put("field", key); - fieldMap.put("type", type.getType()); - fieldMap.put("optional", !key.toLowerCase().contains("timestamp")); + fieldMap.put("field", key); + fieldMap.put("type", type.getType()); + fieldMap.put("optional", !key.toLowerCase(Locale.ENGLISH).contains("timestamp")); - addTimestampAndDateFields(fieldMap, key, type); + addTimestampAndDateFields(fieldMap, key, type); - return fieldMap; - } + return fieldMap; + } - private static void validateValue(Object value) { - if (value == null) { - throw new IllegalArgumentException("Null values are not allowed in the data map."); + private static void validateValue(final Object value) { + if (value == null) { + throw new IllegalArgumentException("Null values are not allowed in the data map."); + } } - } - - private static void addTimestampAndDateFields(Map<String, Object> fieldMap, String key, JsonType type) { - boolean isTimestampField = key.toLowerCase().contains("timestamp"); - boolean isDateType = type.getClazz().equals(Date.class); - - if (isTimestampField) { - fieldMap.put("name", "org.apache.kafka.connect.data.Timestamp"); - fieldMap.put("version", 1); - } else if (isDateType) { - fieldMap.put("name", "org.apache.kafka.connect.data.Date"); - fieldMap.put("version", 1); + + private static void addTimestampAndDateFields(final Map<String, Object> fieldMap, final String key, + final JsonType type) { + final boolean isTimestampField = key.toLowerCase(Locale.ENGLISH).contains("timestamp"); + final boolean isDateType = type.getClazz().equals(Date.class); + + if (isTimestampField) { + fieldMap.put("name", "org.apache.kafka.connect.data.Timestamp"); + fieldMap.put("version", 1); + } else if (isDateType) { + fieldMap.put("name", "org.apache.kafka.connect.data.Date"); + fieldMap.put("version", 1); + } } - } } diff --git a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java index 8ba7298202272fba6720f4706e3661d344ea7e00..27ca7de2dc519a67dfbadb6840c8c863784b55a4 100644 --- a/src/main/java/ch/cern/nile/common/streams/AbstractStream.java +++ b/src/main/java/ch/cern/nile/common/streams/AbstractStream.java @@ -1,144 +1,138 @@ package ch.cern.nile.common.streams; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import java.time.DateTimeException; -import java.time.Instant; -import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; + import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.ValueTransformer; -import org.apache.kafka.streams.processor.ProcessorContext; -import ch.cern.nile.common.clients.KafkaStreamsClient; -import ch.cern.nile.common.configs.StreamConfig; -import ch.cern.nile.common.probes.Health; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import ch.cern.nile.common.clients.KafkaStreamsClient; +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.configuration.properties.DecodingProperties; +import ch.cern.nile.common.exceptions.StreamingException; +import ch.cern.nile.common.probes.Health; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + +/** + * AbstractStream is an abstract class implementing the {@link Streaming} interface, providing a framework + * for building and managing Kafka Streams applications. It encapsulates common functionality such + * as configuring the stream, handling its lifecycle, and managing health checks. Implementations of + * this class should provide the specific logic for creating the Kafka Streams topology by overriding + * the createTopology method. + * <p> + * Usage: + * Subclasses should implement the {@link AbstractStream#createTopology(StreamsBuilder) CreateTopology} + * method to define their specific processing topology. + * They can then use this abstract class to handle common streaming functionalities + * such as starting, stopping, and monitoring the Kafka Streams application. + */ public abstract class AbstractStream implements Streaming { - private final Logger LOGGER = LoggerFactory.getLogger(getClass()); - - KafkaStreams streams; - protected Properties configs; - protected String sourceTopic; - protected String sinkTopic; - protected long lastReadOffset = -2; - private Health health; - private CountDownLatch latch; - - @Override - public void configure(Properties configs) { - this.configs = configs; - } - - @Override - public void stream(KafkaStreamsClient kafkaStreamsClient) { - init(kafkaStreamsClient); - Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, "streams-shutdown-hook")); - start(); - System.exit(0); - } - - public String getProperty(String key) { - return configs.getProperty(key); - } - - protected static void addTimestamp(JsonArray gatewayInfo, Map<String, Object> map) throws DateTimeException { - final String timestampKey = "timestamp"; - final String timeKey = "time"; - - for (JsonElement element : gatewayInfo) { - if (element.isJsonObject()) { - JsonObject entry = element.getAsJsonObject(); - if (entry.has(timeKey)) { - map.put(timestampKey, Instant.parse(entry.get(timeKey).getAsString()).toEpochMilli()); - break; - } - } - } - if (!map.containsKey(timestampKey)) { - throw new DateTimeException(String.format("No '%s' field found in gateway info (dropping the message): %s", timeKey, gatewayInfo)); - } - } + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStream.class); + @Getter(AccessLevel.PROTECTED) + private String sourceTopic; - protected static boolean filterNull(String k, Object v) { - return v != null; - } + @Getter(AccessLevel.PROTECTED) + private String sinkTopic; - protected static boolean filterEmpty(String k, Object v) { - return !(v instanceof List && ((List<?>) v).isEmpty()); - } + @Setter(AccessLevel.PROTECTED) + private long lastReadOffset = -2; - protected boolean filterRecord(String k, JsonObject v) { - return v != null && v.get("applicationID") != null && v.get("applicationName") != null && v.get("deviceName") != null && v.get("devEUI") != null - && v.get("data") != null; - } + private Properties properties; + private KafkaStreams streams; + private Health health; + private CountDownLatch latch; - protected void logStreamsException(Exception e) { - LOGGER.warn(String.format("Error reading from topic %s. Last read offset %s:", sourceTopic, lastReadOffset), e); - if (streams != null) { - LOGGER.info(String.format("Streams state is: %s", streams.state().toString())); - } - } - - public abstract void createTopology(StreamsBuilder builder); - - private void init(KafkaStreamsClient kafkaStreamsClient) { - final StreamsBuilder builder = new StreamsBuilder(); - sourceTopic = configs.getProperty(StreamConfig.ClientProperties.SOURCE_TOPIC.getValue()); - sinkTopic = configs.getProperty(StreamConfig.DecodingProperties.SINK_TOPIC.getValue()); - createTopology(builder); - final Topology topology = builder.build(); - streams = kafkaStreamsClient.create(topology); - health = new Health(streams); - latch = new CountDownLatch(1); - } - - private void start() { - LOGGER.info("Starting streams..."); - try { - streams.start(); - health.start(); - latch.await(); - } catch (Exception e) { - LOGGER.error("Could not start streams.", e); - System.exit(1); + /** + * Configures the Kafka Streams application with provided settings. + * + * @param configs the configuration settings for the Kafka Streams application + */ + @Override + @SuppressFBWarnings(value = "EI_EXPOSE_REP2", + justification = "This is an internal class and the properties are not exposed to the outside") + public void configure(final Properties configs) { + this.properties = configs; } - } - - private void shutDown() { - LOGGER.info("Shutting down streams..."); - streams.close(); - health.stop(); - latch.countDown(); - } - - public static class InjectOffsetTransformer implements ValueTransformer<JsonObject, JsonObject> { - - private ProcessorContext context; + /** + * Starts the Kafka Streams application using the provided KafkaStreamsClient and initializes + * its lifecycle management, including graceful shutdown. This method also adds a shutdown hook + * to the JVM and terminates the JVM upon completion of the stream. + * + * @param kafkaStreamsClient the client used to create and manage the Kafka Streams instance + * @throws StreamingException if an error occurs during streaming + */ @Override - public void init(ProcessorContext context) { - this.context = context; + @SuppressWarnings("PMD.DoNotTerminateVM") + @SuppressFBWarnings(value = "DM_EXIT", justification = "This is a Kafka Streams application") + public void stream(final KafkaStreamsClient kafkaStreamsClient) { + init(kafkaStreamsClient); + Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown, "streams-shutdown-hook")); + start(); + System.exit(0); } - @Override - public JsonObject transform(JsonObject value) { - value.addProperty("offset", context.offset()); - return value; + /** + * Implement this method to define the topology of the Kafka Streams application. + * + * @param builder the {@link StreamsBuilder} to use to create the topology + */ + public abstract void createTopology(StreamsBuilder builder); + + /** + * Use this method to log any exceptions that occur while streaming. + * + * @param exception the exception to log + */ + protected void logStreamsException(final Exception exception) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn( + String.format("Error reading from topic %s. Last read offset %s:", sourceTopic, lastReadOffset), + exception); + } + if (streams != null & LOGGER.isInfoEnabled()) { + LOGGER.info("Current state of streams: {}", streams.state()); + } } - @Override - public void close() { + private void init(final KafkaStreamsClient kafkaStreamsClient) { + final StreamsBuilder builder = new StreamsBuilder(); + sourceTopic = properties.getProperty(ClientProperties.SOURCE_TOPIC.getValue()); + sinkTopic = properties.getProperty(DecodingProperties.SINK_TOPIC.getValue()); + createTopology(builder); + final Topology topology = builder.build(); + streams = kafkaStreamsClient.create(topology); + health = new Health(streams); + latch = new CountDownLatch(1); } - } + private void start() { + LOGGER.info("Starting streams..."); + streams.start(); + health.start(); + try { + latch.await(); + } catch (InterruptedException e) { + LOGGER.error("Error while waiting for latch", e); + throw new StreamingException("Error while waiting for latch", e); + } + + } + + private void shutDown() { + LOGGER.info("Shutting down streams..."); + streams.close(); + health.stop(); + latch.countDown(); + } } diff --git a/src/main/java/ch/cern/nile/common/streams/InjectOffsetTransformer.java b/src/main/java/ch/cern/nile/common/streams/InjectOffsetTransformer.java new file mode 100644 index 0000000000000000000000000000000000000000..4733c46a2dd75801dfa4ef9f849600ce87b6df86 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/streams/InjectOffsetTransformer.java @@ -0,0 +1,64 @@ +package ch.cern.nile.common.streams; + +import com.google.gson.JsonObject; + +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.processor.ProcessorContext; + + +/** + * The {@link InjectOffsetTransformer} is a Kafka Streams ValueTransformer that enriches each input JsonObject + * with the offset information of the current record being processed. This transformer is typically used + * in Kafka Streams topologies where tracking the position of records within a Kafka topic is necessary. + * <p> + * Usage: + * This transformer should be used within a Kafka Streams topology, typically in a transformValues() operation. + * It adds an "offset" property to the input JsonObject, which contains the offset value of the record + * in the source topic. The enhanced JsonObject can then be further processed in the stream. + * <p> + * Example: + * <pre>{@code + * StreamsBuilder builder = new StreamsBuilder(); + * builder.<String, JsonObject>stream("source-topic") + * .transformValues(InjectOffsetTransformer::new) + * .to("sink-topic"); + * }</pre> + */ +public class InjectOffsetTransformer implements ValueTransformer<JsonObject, JsonObject> { + + private ProcessorContext context; + + /** + * Initialize the transformer with the given processor context. This method is called when the + * transformer is instantiated and provides access to the ProcessorContext for retrieving metadata + * such as the record's offset. + * + * @param processorContext the processor context provided by the Kafka Streams framework + */ + @Override + public void init(final ProcessorContext processorContext) { + this.context = processorContext; + } + + /** + * Transform the input JsonObject by injecting the current record's offset. The offset is added as + * a property to the JsonObject, enabling downstream processors to access this metadata. + * + * @param value the input JsonObject to be transformed + * @return the transformed JsonObject with the offset property added + */ + @Override + public JsonObject transform(final JsonObject value) { + value.addProperty("offset", context.offset()); + return value; + } + + /** + * Close the transformer. + */ + @Override + public void close() { + // Nothing to do + } + +} diff --git a/src/main/java/ch/cern/nile/common/streams/StreamUtils.java b/src/main/java/ch/cern/nile/common/streams/StreamUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..d53e80c4701707c9136eb95c7b2676cacdc0f312 --- /dev/null +++ b/src/main/java/ch/cern/nile/common/streams/StreamUtils.java @@ -0,0 +1,113 @@ +package ch.cern.nile.common.streams; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.cern.nile.common.exceptions.DecodingException; + +/** + * {@link StreamUtils} is a utility class providing static methods to assist in stream processing. + */ +public final class StreamUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(StreamUtils.class); + + private StreamUtils() { + } + + /** + * Adds the most recent timestamp found in the gatewayInfo JsonArray to the provided map. + * The timestamp is added as an epoch millisecond value under the key "timestamp". + * + * @param gatewayInfo the JsonArray containing gateway information, each entry expected to + * have a "time" field with an ISO-8601 formatted timestamp + * @param map the map to which the most recent timestamp will be added + * @throws DecodingException if no valid timestamp is found in the gatewayInfo + */ + public static void addTimestamp(final JsonArray gatewayInfo, final Map<String, Object> map) { + final String timeKey = "time"; + Instant timestamp = null; + for (final JsonElement element : gatewayInfo) { + if (!element.isJsonObject()) { + continue; + } + + final JsonObject entry = element.getAsJsonObject(); + if (!entry.has(timeKey)) { + continue; + } + + final Instant currentTimestamp = Instant.parse(entry.get(timeKey).getAsString()); + if (timestamp == null || currentTimestamp.isAfter(timestamp)) { + timestamp = currentTimestamp; + } + } + + if (timestamp == null) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("No valid {} found in gatewayInfo: {}", timeKey, gatewayInfo); + } + timestamp = Instant.now(); + } + + map.put("timestamp", timestamp.toEpochMilli()); + } + + /** + * Filters out null values. + * + * @param ignored ignored parameter (unused in current implementation) + * @param value the value to be checked for null + * @return true if the value is not null, false otherwise + */ + public static boolean filterNull(final String ignored, final Object value) { + return value != null; + } + + /** + * Filters out empty lists and maps. + * Returns true if the value is neither an empty list nor an empty map, otherwise false. + * <p> + * This method is useful in stream processing scenarios where empty collections (lists or maps) are considered + * irrelevant or need to be filtered out. + * + * @param ignored ignored parameter (unused in current implementation) + * @param value the value to be checked, expected to be a List or Map + * @return true if the value is not an empty list or map, false otherwise + */ + public static boolean filterEmpty(final String ignored, final Object value) { + boolean isNotEmpty = true; + + if (value instanceof List) { + isNotEmpty = !((List<?>) value).isEmpty(); + } else if (value instanceof Map) { + isNotEmpty = !((Map<?, ?>) value).isEmpty(); + } + + return isNotEmpty; + } + + /** + * Filters records based on the presence of required fields in a JsonObject. + * Returns true if the JsonObject contains all required fields ("applicationID", "applicationName", + * "deviceName", "devEUI", and "data"), otherwise false. + * + * @param ignored ignored parameter (unused in current implementation) + * @param value the JsonObject to be checked for required fields + * @return true if all required fields are present, false otherwise + */ + public static boolean filterRecord(final String ignored, final JsonObject value) { + return value != null && value.get("applicationID") != null && value.get("applicationName") != null + && value.get("deviceName") != null && value.get("devEUI") != null + && value.get("data") != null; + } + +} diff --git a/src/main/java/ch/cern/nile/common/streams/Streaming.java b/src/main/java/ch/cern/nile/common/streams/Streaming.java index 1aa171e03453b9068aedb48d574c1d9adff04ed5..cc7987a2e53fa493b7caca26e149ff74d0002a73 100644 --- a/src/main/java/ch/cern/nile/common/streams/Streaming.java +++ b/src/main/java/ch/cern/nile/common/streams/Streaming.java @@ -1,10 +1,23 @@ package ch.cern.nile.common.streams; -import ch.cern.nile.common.configs.Configure; import ch.cern.nile.common.clients.KafkaStreamsClient; +import ch.cern.nile.common.configuration.Configure; +/** + * The Streaming interface defines the essential functions for a streaming application. + * It extends the Configure interface, allowing for configuration setup. Implementations + * of this interface are responsible for defining the streaming behavior, including + * how streams are created and managed using a KafkaStreamsClient. + */ public interface Streaming extends Configure { - void stream(KafkaStreamsClient kafkaStreamsClient); + /** + * Initializes and starts the streaming process using the provided KafkaStreamsClient. + * Implementations should define the setup and execution of the stream, including the + * creation and management of Kafka Streams instances. + * + * @param kafkaStreamsClient the KafkaStreamsClient used to create and manage the stream. + */ + void stream(KafkaStreamsClient kafkaStreamsClient); } diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index 9ba3a4660dd6986368abf9a3a229028b46a856e0..a5459ae79b93762f407ef0faa0770e1333a10a9d 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -1,8 +1,7 @@ # Root logger option log4j.rootLogger=INFO, stdout - # Direct log messages to stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n \ No newline at end of file +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1} - %m%n diff --git a/src/test/java/ch/cern/nile/common/clients/KafkaStreamsClientTest.java b/src/test/java/ch/cern/nile/common/clients/KafkaStreamsClientTest.java new file mode 100644 index 0000000000000000000000000000000000000000..02c9d5ce2d414151ed6f17001071a339e1067c3f --- /dev/null +++ b/src/test/java/ch/cern/nile/common/clients/KafkaStreamsClientTest.java @@ -0,0 +1,119 @@ +package ch.cern.nile.common.clients; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.UnknownHostException; +import java.util.Properties; + +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.exceptions.ReverseDnsLookupException; + +import lombok.SneakyThrows; + +class KafkaStreamsClientTest { + + private static final String INVALID_CLUSTER = "invalidCluster"; + + private KafkaStreamsClient client; + private Properties properties; + private Topology topology; + + @Mock + private KafkaStreams kafkaStreams; + + private AutoCloseable closeable; + + @BeforeEach + public void setup() { + closeable = MockitoAnnotations.openMocks(this); + client = new KafkaStreamsClient() { + + @Override + @SuppressWarnings("checkstyle:HiddenField") + public KafkaStreams create(final Topology topology) { + return kafkaStreams; + } + + @Override + protected String performDnsLookup(final String kafkaCluster) throws UnknownHostException { + if (INVALID_CLUSTER.equals(kafkaCluster)) { + throw new UnknownHostException("Invalid cluster"); + } + return "localhost:9092"; + } + }; + properties = new Properties(); + topology = Mockito.mock(Topology.class); + } + + @AfterEach + @SneakyThrows + public void tearDown() { + closeable.close(); + } + + @Test + void givenNonTestCluster_whenConfigure_thenKafkaStreamsCreated() { + properties.setProperty(ClientProperties.CLIENT_ID.getValue(), "testClientId"); + properties.setProperty(ClientProperties.KAFKA_CLUSTER.getValue(), "nonTestCluster"); + properties.setProperty(ClientProperties.TRUSTSTORE_LOCATION.getValue(), "/path/to/truststore"); + properties.setProperty(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); + + client.configure(properties); + + final KafkaStreams streams = client.create(topology); + assertNotNull(streams, "KafkaStreams object should not be null"); + } + + @Test + void givenTestCluster_whenConfigure_thenKafkaStreamsCreated() { + properties.setProperty(ClientProperties.CLIENT_ID.getValue(), "testClientId"); + properties.setProperty(ClientProperties.KAFKA_CLUSTER.getValue(), "test"); + properties.setProperty("bootstrap.servers", "localhost:9092"); + + client.configure(properties); + + final KafkaStreams streams = client.create(topology); + assertNotNull(streams, "KafkaStreams object should not be null"); + } + + @Test + void givenInvalidCluster_whenConfigure_thenReverseDnsLookupExceptionThrown() { + properties.setProperty(ClientProperties.CLIENT_ID.getValue(), "testClientId"); + properties.setProperty(ClientProperties.KAFKA_CLUSTER.getValue(), "invalidCluster"); + + assertThrows(ReverseDnsLookupException.class, () -> client.configure(properties), + "Should throw ReverseDnsLookupException"); + } + + @Test + void givenKnownDomain_whenPerformDnsLookup_thenResultContainsPort9093() throws UnknownHostException { + final String domain = "www.google.com"; + final String result = new KafkaStreamsClient().performDnsLookup(domain); + + assertNotNull(result, "Result should not be null"); + assertTrue(result.contains(":9093"), "Result should contain port 9093"); + } + + @Test + void givenLocalhost_whenPerformDnsLookup_thenResultContainsPort9093() throws UnknownHostException { + final String domain = "localhost"; + final String result = new KafkaStreamsClient().performDnsLookup(domain); + + assertNotNull(result, "Result should not be null"); + assertTrue(result.contains(":9093"), "Result should contain port 9093"); + } + +} diff --git a/src/test/java/ch/cern/nile/common/configs/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configs/PropertiesCheckTest.java deleted file mode 100644 index 523fa2214fbd3ae20db1b0429462ebe9e9de70f0..0000000000000000000000000000000000000000 --- a/src/test/java/ch/cern/nile/common/configs/PropertiesCheckTest.java +++ /dev/null @@ -1,58 +0,0 @@ -package ch.cern.nile.common.configs; - -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.Properties; - -import org.junit.jupiter.api.Test; - -class PropertiesCheckTest { - - @Test - void validateProperties_ThrowsRuntimeException_forIllegalArguments() { - final Properties properties = new Properties(); - - assertThrows(RuntimeException.class, - () -> PropertiesCheck.validateProperties(null, StreamType.ROUTING), - "Properties object cannot be null"); - - assertThrows(RuntimeException.class, - () -> PropertiesCheck.validateProperties(properties, null), - "Properties file is missing stream.type property"); - } - - @Test - void validateProperties_PassesValidation_forDecoding() { - final Properties properties = new Properties(); - initClientAndCommonProperties(properties); - properties.put(StreamConfig.DecodingProperties.SINK_TOPIC.getValue(), ""); - PropertiesCheck.validateProperties(properties, StreamType.DECODING); - } - - @Test - void validateProperties_PassesValidation_forRouting() { - final Properties properties = new Properties(); - initClientAndCommonProperties(properties); - properties.put(StreamConfig.RoutingProperties.ROUTING_CONFIG_PATH.getValue(), ""); - properties.put(StreamConfig.RoutingProperties.DLQ_TOPIC.getValue(), ""); - PropertiesCheck.validateProperties(properties, StreamType.ROUTING); - } - - @Test - void validateProperties_PassesValidation_forEnrichment() { - final Properties properties = new Properties(); - initClientAndCommonProperties(properties); - properties.put(StreamConfig.EnrichmentProperties.ENRICHMENT_CONFIG_PATH.getValue(), ""); - properties.put(StreamConfig.EnrichmentProperties.SINK_TOPIC.getValue(), ""); - PropertiesCheck.validateProperties(properties, StreamType.ENRICHMENT); - } - - private void initClientAndCommonProperties(Properties properties) { - properties.put(StreamConfig.ClientProperties.CLIENT_ID.getValue(), ""); - properties.put(StreamConfig.ClientProperties.KAFKA_CLUSTER.getValue(), ""); - properties.put(StreamConfig.ClientProperties.SOURCE_TOPIC.getValue(), ""); - properties.put(StreamConfig.ClientProperties.TRUSTSTORE_LOCATION.getValue(), ""); - properties.put(StreamConfig.CommonProperties.STREAM_CLASS.getValue(), ""); - properties.put(StreamConfig.CommonProperties.STREAM_TYPE.getValue(), ""); - } -} diff --git a/src/test/java/ch/cern/nile/common/configs/StreamConfigTest.java b/src/test/java/ch/cern/nile/common/configs/StreamConfigTest.java deleted file mode 100644 index e2a7674f5ecc63999bf035a3735fcf67b33ef340..0000000000000000000000000000000000000000 --- a/src/test/java/ch/cern/nile/common/configs/StreamConfigTest.java +++ /dev/null @@ -1,46 +0,0 @@ -package ch.cern.nile.common.configs; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.Set; -import org.junit.jupiter.api.Test; - -class StreamConfigTest { - - @Test - void testClientProperties() { - Set<String> expectedConfigs = Set.of("source.topic", "kafka.cluster", "client.id", "truststore.location"); - assertEquals(expectedConfigs, StreamConfig.ClientProperties.getValues()); - assertThrows(IllegalArgumentException.class, () -> StreamConfig.ClientProperties.valueOf("unknown.property")); - } - - @Test - void testCommonProperties() { - Set<String> expectedConfigs = Set.of("stream.type", "stream.class"); - assertEquals(expectedConfigs, StreamConfig.CommonProperties.getValues()); - assertThrows(IllegalArgumentException.class, () -> StreamConfig.CommonProperties.valueOf("unknown.property")); - } - - @Test - void testDecodingProperties() { - Set<String> expectedConfigs = Set.of("sink.topic"); - assertEquals(expectedConfigs, StreamConfig.DecodingProperties.getValues()); - assertThrows(IllegalArgumentException.class, () -> StreamConfig.DecodingProperties.valueOf("unknown.property")); - } - - @Test - void testRoutingProperties() { - Set<String> expectedConfigs = Set.of("routing.config.path", "dlq.topic"); - assertEquals(expectedConfigs, StreamConfig.RoutingProperties.getValues()); - assertThrows(IllegalArgumentException.class, () -> StreamConfig.RoutingProperties.valueOf("unknown.property")); - } - - @Test - void testEnrichmentProperties() { - Set<String> expectedConfigs = Set.of("enrichment.config.path", "sink.topic"); - assertEquals(expectedConfigs, StreamConfig.EnrichmentProperties.getValues()); - assertThrows(IllegalArgumentException.class, () -> StreamConfig.EnrichmentProperties.valueOf("unknown.property")); - } - -} diff --git a/src/test/java/ch/cern/nile/common/configs/StreamTypeTest.java b/src/test/java/ch/cern/nile/common/configs/StreamTypeTest.java deleted file mode 100644 index 5dd6bf9e89e05f43513dd2f1d7def845f16f19b9..0000000000000000000000000000000000000000 --- a/src/test/java/ch/cern/nile/common/configs/StreamTypeTest.java +++ /dev/null @@ -1,29 +0,0 @@ -package ch.cern.nile.common.configs; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import org.junit.jupiter.api.Test; - -class StreamTypeTest { - - @Test - void findByValue_MapsToRouting_forValueRouting() { - assertEquals(StreamType.ROUTING, StreamType.valueOf("ROUTING")); - } - - @Test - void findByValue_MapsToDecoding_forValueDecoding() { - assertEquals(StreamType.DECODING, StreamType.valueOf("DECODING")); - } - - @Test - void findByValue_MapsToEnrichment_forValueEnrichment() { - assertEquals(StreamType.ENRICHMENT, StreamType.valueOf("ENRICHMENT")); - } - - @Test - void findByValue_ThrowsRuntimeException_forUnknownStreamType() { - assertThrows(IllegalArgumentException.class, () -> StreamType.valueOf("Unknown")); - } -} diff --git a/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java new file mode 100644 index 0000000000000000000000000000000000000000..58aa58cdca7c9dc280ea42aad6f085061fbb0d42 --- /dev/null +++ b/src/test/java/ch/cern/nile/common/configuration/PropertiesCheckTest.java @@ -0,0 +1,86 @@ +package ch.cern.nile.common.configuration; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Properties; + +import org.junit.jupiter.api.Test; + +import ch.cern.nile.common.configuration.properties.ClientProperties; +import ch.cern.nile.common.configuration.properties.CommonProperties; +import ch.cern.nile.common.configuration.properties.DecodingProperties; +import ch.cern.nile.common.configuration.properties.EnrichmentProperties; +import ch.cern.nile.common.configuration.properties.RoutingProperties; +import ch.cern.nile.common.exceptions.MissingPropertyException; + +class PropertiesCheckTest { + + @Test + void givenNullProperties_whenValidateProperties_thenThrowsRuntimeException() { + assertThrows(RuntimeException.class, () -> PropertiesCheck.validateProperties(null, StreamType.ROUTING), + "Properties object cannot be null"); + } + + @Test + void givenNullStreamType_whenValidateProperties_thenThrowsRuntimeException() { + final Properties properties = new Properties(); + + assertThrows(RuntimeException.class, () -> PropertiesCheck.validateProperties(properties, null), + "Properties file is missing stream.type property"); + } + + @Test + void givenValidDecodingProperties_whenValidateProperties_thenPassesValidation() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + properties.put(DecodingProperties.SINK_TOPIC.getValue(), ""); + + assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.DECODING), + "Should not throw exception"); + } + + @Test + void givenValidRoutingProperties_whenValidateProperties_thenPassesValidation() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + properties.put(RoutingProperties.ROUTING_CONFIG_PATH.getValue(), ""); + properties.put(RoutingProperties.DLQ_TOPIC.getValue(), ""); + + assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING), + "Should not throw exception"); + } + + @Test + void givenValidEnrichmentProperties_whenValidateProperties_thenPassesValidation() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + properties.put(EnrichmentProperties.ENRICHMENT_CONFIG_PATH.getValue(), ""); + properties.put(EnrichmentProperties.SINK_TOPIC.getValue(), ""); + + assertDoesNotThrow(() -> PropertiesCheck.validateProperties(properties, StreamType.ENRICHMENT), + "Should not throw exception"); + } + + @Test + void givenMissingRequiredProperty_whenValidateProperties_thenThrowsMissingPropertyException() { + final Properties properties = new Properties(); + initClientAndCommonProperties(properties); + // Remove a required property for routing, for example + properties.remove(RoutingProperties.ROUTING_CONFIG_PATH.getValue()); + + assertThrows(MissingPropertyException.class, + () -> PropertiesCheck.validateProperties(properties, StreamType.ROUTING), + "Properties file is missing: routing.config.path property."); + } + + private void initClientAndCommonProperties(final Properties properties) { + properties.put(ClientProperties.CLIENT_ID.getValue(), ""); + properties.put(ClientProperties.KAFKA_CLUSTER.getValue(), ""); + properties.put(ClientProperties.SOURCE_TOPIC.getValue(), ""); + properties.put(ClientProperties.TRUSTSTORE_LOCATION.getValue(), ""); + properties.put(CommonProperties.STREAM_CLASS.getValue(), ""); + properties.put(CommonProperties.STREAM_TYPE.getValue(), ""); + } + +} diff --git a/src/test/java/ch/cern/nile/common/configuration/StreamTypeTest.java b/src/test/java/ch/cern/nile/common/configuration/StreamTypeTest.java new file mode 100644 index 0000000000000000000000000000000000000000..daecfbcb10409ef31f001f22a2184070b5d0ecfc --- /dev/null +++ b/src/test/java/ch/cern/nile/common/configuration/StreamTypeTest.java @@ -0,0 +1,35 @@ +package ch.cern.nile.common.configuration; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +class StreamTypeTest { + + @Test + void givenKnownStreamTypeRouting_whenFindByValue_thenMapsToRouting() { + final StreamType result = StreamType.valueOf("ROUTING"); + + assertEquals(StreamType.ROUTING, result, "Should return expected stream type"); + } + + @Test + void givenKnownStreamTypeDecoding_whenFindByValue_thenMapsToDecoding() { + final StreamType result = StreamType.valueOf("DECODING"); + + assertEquals(StreamType.DECODING, result, "Should return expected stream type"); + } + + @Test + void givenKnownStreamTypeEnrichment_whenFindByValue_thenMapsToEnrichment() { + final StreamType result = StreamType.valueOf("ENRICHMENT"); + + assertEquals(StreamType.ENRICHMENT, result, "Should return expected stream type"); + } + + @Test + void givenUnknownStreamType_whenFindByValue_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> StreamType.valueOf("Unknown"), "Should throw exception"); + } +} diff --git a/src/test/java/ch/cern/nile/common/configuration/properties/StreamConfigTest.java b/src/test/java/ch/cern/nile/common/configuration/properties/StreamConfigTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8d943222ebfee23fddd21ff1f30de8c48545531f --- /dev/null +++ b/src/test/java/ch/cern/nile/common/configuration/properties/StreamConfigTest.java @@ -0,0 +1,87 @@ +package ch.cern.nile.common.configuration.properties; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Set; + +import org.junit.jupiter.api.Test; + +class StreamConfigTest { + + private static final String UNKNOWN_PROPERTY = "unknown.property"; + private static final String SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION = "Should throw IllegalArgumentException"; + private static final String SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS = "Should return expected set of configs"; + + @Test + void givenClientPropertiesEnum_whenGetValues_thenReturnsExpectedSet() { + final Set<String> expectedConfigs = Set.of("source.topic", "kafka.cluster", "client.id", "truststore.location"); + final Set<String> actualConfigs = PropertyEnum.getValues(ClientProperties.class); + + assertEquals(expectedConfigs, actualConfigs, SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS); + } + + @Test + void givenClientPropertiesEnum_whenValueOfWithUnknownProperty_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> ClientProperties.valueOf(UNKNOWN_PROPERTY), + SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION); + } + + @Test + void givenCommonPropertiesEnum_whenGetValues_thenReturnsExpectedSet() { + final Set<String> expectedConfigs = Set.of("stream.type", "stream.class"); + final Set<String> actualConfigs = PropertyEnum.getValues(CommonProperties.class); + + assertEquals(expectedConfigs, actualConfigs, SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS); + } + + @Test + void givenCommonPropertiesEnum_whenValueOfWithUnknownProperty_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> CommonProperties.valueOf(UNKNOWN_PROPERTY), + SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION); + } + + @Test + void givenDecodingPropertiesEnum_whenGetValues_thenReturnsExpectedSet() { + final Set<String> expectedConfigs = Set.of("sink.topic"); + final Set<String> actualConfigs = PropertyEnum.getValues(DecodingProperties.class); + + assertEquals(expectedConfigs, actualConfigs, SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS); + } + + @Test + void givenDecodingPropertiesEnum_whenValueOfWithUnknownProperty_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> DecodingProperties.valueOf(UNKNOWN_PROPERTY), + SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION); + } + + @Test + void givenRoutingPropertiesEnum_whenGetValues_thenReturnsExpectedSet() { + final Set<String> expectedConfigs = Set.of("routing.config.path", "dlq.topic"); + final Set<String> actualConfigs = PropertyEnum.getValues(RoutingProperties.class); + + assertEquals(expectedConfigs, actualConfigs, SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS); + } + + @Test + void givenRoutingPropertiesEnum_whenValueOfWithUnknownProperty_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, () -> RoutingProperties.valueOf(UNKNOWN_PROPERTY), + SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION); + } + + @Test + void givenEnrichmentPropertiesEnum_whenGetValues_thenReturnsExpectedSet() { + final Set<String> expectedConfigs = Set.of("enrichment.config.path", "sink.topic"); + final Set<String> actualConfigs = PropertyEnum.getValues(EnrichmentProperties.class); + + assertEquals(expectedConfigs, actualConfigs, SHOULD_RETURN_EXPECTED_SET_OF_CONFIGS); + } + + @Test + void givenEnrichmentPropertiesEnum_whenValueOfWithUnknownProperty_thenThrowsIllegalArgumentException() { + assertThrows(IllegalArgumentException.class, + () -> EnrichmentProperties.valueOf(UNKNOWN_PROPERTY), + SHOULD_THROW_ILLEGAL_ARGUMENT_EXCEPTION); + } + +} diff --git a/src/test/java/ch/cern/nile/common/json/JsonPojoDeserializerTest.java b/src/test/java/ch/cern/nile/common/json/JsonPojoDeserializerTest.java index f5f1261a9b2cd24ef21dbd00e19ec6dd97a4424b..aeddeffeb9836281353ef1c93faaf03b6015ec0b 100644 --- a/src/test/java/ch/cern/nile/common/json/JsonPojoDeserializerTest.java +++ b/src/test/java/ch/cern/nile/common/json/JsonPojoDeserializerTest.java @@ -3,53 +3,50 @@ package ch.cern.nile.common.json; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import java.util.Map; +import org.junit.jupiter.api.Test; + import ch.cern.nile.common.models.Application; import ch.cern.nile.common.models.Topic; -import org.junit.jupiter.api.Test; class JsonPojoDeserializerTest { - private final JsonPojoDeserializer<Application> applicationDeserializer = new JsonPojoDeserializer<>(Application.class); - private final JsonPojoDeserializer<Topic> topicDeserializer = new JsonPojoDeserializer<>(Topic.class); - - @Test - void deserialize_Application_ReturnsApplication() { - String json = "{\"name\":\"my-app\",\"topic\":{\"name\":\"my-topic\"}}"; - Application expected = new Application(); - expected.setName("my-app"); - expected.setTopic(new Topic()); - expected.getTopic().setName("my-topic"); - Application actual = applicationDeserializer.deserialize("test-topic", json.getBytes()); - assertEquals(expected.toString(), actual.toString()); - } - - @Test - void deserialize_Topic_ReturnsTopic() { - String json = "{\"name\":\"my-topic\"}"; - Topic expected = new Topic(); - expected.setName("my-topic"); - Topic actual = topicDeserializer.deserialize("test-topic", json.getBytes()); - assertEquals(expected.toString(), actual.toString()); - } - - @Test - void deserialize_NullBytes_ReturnsNull() { - assertNull(applicationDeserializer.deserialize("test-topic", null)); - } - - @Test - void deserialize_NullJson_ReturnsNull() { - assertNull(applicationDeserializer.deserialize("test-topic", "null".getBytes())); - } - - @Test - void configure_SetJsonPOJOClass_SetsClass() { - try (JsonPojoDeserializer<Topic> deserializer = new JsonPojoDeserializer<>()) { - assertNull(deserializer.tClass); - deserializer.configure(Map.of("JsonPOJOClass", Topic.class), true); - assertEquals(Topic.class, deserializer.tClass); + private static final String TEST_TOPIC = "test-topic"; + private final JsonPojoDeserializer<Application> applicationDeserializer = + new JsonPojoDeserializer<>(Application.class); + private final JsonPojoDeserializer<Topic> topicDeserializer = new JsonPojoDeserializer<>(Topic.class); + + @Test + void givenJsonWithApplication_whenDeserialize_thenReturnsApplication() { + final String json = "{\"name\":\"my-app\",\"topic\":{\"name\":\"my-topic\"}}"; + final Application expected = new Application(); + expected.setName("my-app"); + expected.setTopic(new Topic()); + expected.getTopic().setName("my-topic"); + final Application actual = applicationDeserializer.deserialize(TEST_TOPIC, json.getBytes()); + + assertEquals(expected.toString(), actual.toString(), "Application deserialized incorrectly"); + } + + @Test + void givenJsonWithTopic_whenDeserialize_thenReturnsTopic() { + final String json = "{\"name\":\"my-topic\"}"; + + final Topic expected = new Topic(); + expected.setName("my-topic"); + final Topic actual = topicDeserializer.deserialize(TEST_TOPIC, json.getBytes()); + + assertEquals(expected.toString(), actual.toString(), "Topic deserialized incorrectly"); + } + + @Test + void givenNullBytes_whenDeserialize_thenReturnsNull() { + assertNull(applicationDeserializer.deserialize(TEST_TOPIC, null), "Null bytes should return null"); + } + + @Test + void givenNullJson_whenDeserialize_thenReturnsNull() { + assertNull(applicationDeserializer.deserialize(TEST_TOPIC, "null".getBytes()), + "Null json should return null"); } - } } diff --git a/src/test/java/ch/cern/nile/common/json/JsonPojoSerializerTest.java b/src/test/java/ch/cern/nile/common/json/JsonPojoSerializerTest.java index 4f7d9b7e021882533e85b2986882baaa4bd785bb..cb4037213513de8eba06792c075047a2e887d2fe 100644 --- a/src/test/java/ch/cern/nile/common/json/JsonPojoSerializerTest.java +++ b/src/test/java/ch/cern/nile/common/json/JsonPojoSerializerTest.java @@ -1,45 +1,52 @@ package ch.cern.nile.common.json; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Collections; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.Test; class JsonPojoSerializerTest { - @Test - void configure_doesNotThrowException() { - try (JsonPojoSerializer<Object> serializer = new JsonPojoSerializer<>()) { - serializer.configure(Collections.emptyMap(), true); + @Test + void givenEmptyConfig_whenConfigure_thenDoesNotThrowException() { + try (JsonPojoSerializer<Object> serializer = new JsonPojoSerializer<>()) { + assertDoesNotThrow(() -> serializer.configure(Collections.emptyMap(), true), + "Should not throw exception"); + } } - } - @Test - void serialize_withNullData_ReturnsNull() { - try (JsonPojoSerializer<Object> serializer = new JsonPojoSerializer<>()) { - assertNull(serializer.serialize("topic", null)); + @Test + void givenNullData_whenSerialize_thenReturnsNull() { + try (JsonPojoSerializer<Object> serializer = new JsonPojoSerializer<>()) { + assertNull(serializer.serialize("topic", null), "Should return null"); + } } - } - @Test - void serialize_withNonNullData_ReturnsJsonBytes() { - Map<String, String> data = new HashMap<>(); - data.put("key", "value"); + @Test + void givenNonNullData_whenSerialize_thenReturnsJsonBytes() { + final Map<String, String> data = new HashMap<>(); + data.put("key", "value"); - byte[] expectedBytes = "{\"key\":\"value\"}".getBytes(); + final byte[] expectedBytes = "{\"key\":\"value\"}".getBytes(); - try (JsonPojoSerializer<Map<String, String>> serializer = new JsonPojoSerializer<>()) { - assertArrayEquals(expectedBytes, serializer.serialize("topic", data)); - } - } + try (JsonPojoSerializer<Map<String, String>> serializer = new JsonPojoSerializer<>()) { + final byte[] actualBytes = serializer.serialize("topic", data); - @Test - void close_doesNotThrowException() { - JsonPojoSerializer<Object> serializer = new JsonPojoSerializer<>(); - serializer.close(); - } + assertArrayEquals(expectedBytes, actualBytes, "Should return expected bytes"); + } + } + @Test + @SuppressWarnings("EmptyTryBlock") + void givenSerializer_whenClosed_thenDoesNotThrowException() { + assertDoesNotThrow(() -> { + try (JsonPojoSerializer<Object> ignored = new JsonPojoSerializer<>()) { + } + }, "Should not throw exception"); + } } diff --git a/src/test/java/ch/cern/nile/common/json/JsonSerdeTest.java b/src/test/java/ch/cern/nile/common/json/JsonSerdeTest.java index 228333c7570c58754c8874243317b39068454d39..2e40e65aae52b36f55263373948d50c1bc3acd2a 100644 --- a/src/test/java/ch/cern/nile/common/json/JsonSerdeTest.java +++ b/src/test/java/ch/cern/nile/common/json/JsonSerdeTest.java @@ -4,19 +4,21 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import java.util.HashMap; import java.util.Map; + import org.junit.jupiter.api.Test; -public class JsonSerdeTest { +class JsonSerdeTest { - @Test - public void testConfigure() { - try (JsonSerde jsonSerde = new JsonSerde()) { - Map<String, Object> configs = new HashMap<>(); - configs.put("config-key", "config-value"); - jsonSerde.configure(configs, true); - assertNotNull(jsonSerde.serializer()); - assertNotNull(jsonSerde.deserializer()); - } - } + @Test + void givenEmptyConfigs_whenConfigure_thenSerializerAndDeserializerNotNull() { + try (JsonSerde jsonSerde = new JsonSerde()) { + final Map<String, Object> configs = new HashMap<>(); + configs.put("config-key", "config-value"); + jsonSerde.configure(configs, true); + + assertNotNull(jsonSerde.serializer(), "Should not be null"); + assertNotNull(jsonSerde.deserializer(), "Should not be null"); + } + } } diff --git a/src/test/java/ch/cern/nile/common/probes/HealthTest.java b/src/test/java/ch/cern/nile/common/probes/HealthTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f0fcc1d2d1c4fb497eb44ff71564383d021bfe75 --- /dev/null +++ b/src/test/java/ch/cern/nile/common/probes/HealthTest.java @@ -0,0 +1,102 @@ +package ch.cern.nile.common.probes; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +import org.apache.kafka.streams.KafkaStreams; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +class HealthTest { + + private static final int PORT = 8899; + private static final int OK_RESPONSE = 200; + private static final int ERROR_RESPONSE = 500; + + private KafkaStreams mockStreams; + private HttpServer mockServer; + private HttpServerFactory mockFactory; + + private Health health; + + @BeforeEach + void before() throws IOException { + mockStreams = mock(KafkaStreams.class); + mockServer = mock(HttpServer.class); + mockFactory = mock(HttpServerFactory.class); + when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenReturn(mockServer); + + health = new Health(mockStreams, mockFactory); + } + + @Test + void givenHealthServer_whenStart_thenServerStartsAndCreatesHealthContext() throws IOException { + health.start(); + + verify(mockFactory).createHttpServer(new InetSocketAddress(PORT), 0); + verify(mockServer).createContext(eq("/health"), any(HttpHandler.class)); + verify(mockServer).start(); + } + + @Test + void givenHealthServer_whenStop_thenServerStops() { + health.start(); + health.stop(); + + verify(mockServer).stop(0); + } + + @Test + @SuppressWarnings("PMD.CloseResource") + @SuppressFBWarnings(value = "CloseResource", justification = "Mocked HttpServer") + void givenKafkaStreamsRunning_whenHealthCheck_thenResponseStatus200() throws IOException { + when(mockStreams.state()).thenReturn(KafkaStreams.State.RUNNING); + health.start(); + + final ArgumentCaptor<HttpHandler> handlerCaptor = ArgumentCaptor.forClass(HttpHandler.class); + verify(mockServer).createContext(eq("/health"), handlerCaptor.capture()); + + final HttpExchange mockExchange = mock(HttpExchange.class); + handlerCaptor.getValue().handle(mockExchange); + verify(mockExchange).sendResponseHeaders(OK_RESPONSE, 0); + verify(mockExchange).close(); + } + + @Test + @SuppressWarnings("PMD.CloseResource") + @SuppressFBWarnings(value = "CloseResource", justification = "Mocked HttpServer") + void givenKafkaStreamsNotRunning_whenHealthCheck_thenResponseStatus500() throws IOException { + when(mockStreams.state()).thenReturn(KafkaStreams.State.NOT_RUNNING); + health.start(); + + final ArgumentCaptor<HttpHandler> handlerCaptor = ArgumentCaptor.forClass(HttpHandler.class); + verify(mockServer).createContext(eq("/health"), handlerCaptor.capture()); + + final HttpExchange mockExchange = mock(HttpExchange.class); + handlerCaptor.getValue().handle(mockExchange); + verify(mockExchange).sendResponseHeaders(ERROR_RESPONSE, 0); + verify(mockExchange).close(); + } + + @Test + void givenHttpServerCreationFails_whenStart_thenThrowsRuntimeException() throws IOException { + when(mockFactory.createHttpServer(any(InetSocketAddress.class), anyInt())).thenThrow(IOException.class); + + assertThrows(RuntimeException.class, () -> health.start(), "Should throw RuntimeException"); + } +} diff --git a/src/test/java/ch/cern/nile/common/schema/SchemaInjectorTest.java b/src/test/java/ch/cern/nile/common/schema/SchemaInjectorTest.java index 0b0ae4b70eccce0f9243c1a22d0cc7a58340cc37..5137a65bff45c27183dd279f8406249d727a28fc 100644 --- a/src/test/java/ch/cern/nile/common/schema/SchemaInjectorTest.java +++ b/src/test/java/ch/cern/nile/common/schema/SchemaInjectorTest.java @@ -1,60 +1,92 @@ package ch.cern.nile.common.schema; -import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.junit.jupiter.api.Assertions.*; - -class SchemaInjectorTest extends SchemaTestBase { - - @Test - @SuppressWarnings("unchecked") - void inject_ReturnsCorrectSchemaAndPayload_WhenInputDataIsValid() { - final Map<String, Object> result = SchemaInjector.inject(data); - assertNotNull(result); - assertTrue(result.containsKey("schema")); - assertTrue(result.containsKey("payload")); - final Map<String, Object> schema = (Map<String, Object>) result.get("schema"); - assertEquals("struct", schema.get("type")); - final List<Map<String, Object>> fields = (List<Map<String, Object>>) schema.get("fields"); - assertEquals(data.size(), fields.size()); - - for (Map<String, Object> field : fields) { - final String fieldName = (String) field.get("field"); - assertTrue(data.containsKey(fieldName)); - assertNotNull(field.get("type")); - - - if (fieldName.equals("timestamp_col")) { - assertFalse(Boolean.parseBoolean(field.get("optional").toString())); - } else { - assertTrue(Boolean.parseBoolean(field.get("optional").toString())); - } - - if (fieldName.equals("timestamp_col")) { - assertEquals("org.apache.kafka.connect.data.Timestamp", field.get("name")); - assertEquals(1, field.get("version")); - } else if (fieldName.equals("date_col")) { - assertEquals("org.apache.kafka.connect.data.Date", field.get("name")); - assertEquals(1, field.get("version")); - } +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class SchemaInjectorTest { + + private static final String TIMESTAMP_COL = "timestamp_col"; + private static final String TIMESTAMP_TYPE = "org.apache.kafka.connect.data.Timestamp"; + private static final Map<String, Object> DATA = new HashMap<>(); + private static final String DATE_COL = "date_col"; + + @BeforeAll + public static void before() { + DATA.put("byte_col", (byte) 1); + DATA.put("short_col", (short) 2); + DATA.put("int_col", 3); + DATA.put("long_col", (long) 4); + DATA.put("float_col", 5.0f); + DATA.put("double_col", 6.0); + DATA.put("boolean_col", true); + DATA.put("string_col", "test"); + DATA.put("timestamp_col", 1_501_834_166_000L); + DATA.put(DATE_COL, new Date()); + DATA.put("bytes_col", new byte[]{1, 2, 3}); } - final Map<String, Object> payload = (Map<String, Object>) result.get("payload"); - assertEquals(data, payload); - } - - @Test - void inject_ThrowsIllegalArgumentException_WhenNullValuePresent() { - data.put("nullValue", null); - assertThrows(IllegalArgumentException.class, () -> SchemaInjector.inject(data)); - } - - @Test - void inject_ThrowsIllegalArgumentException_WhenUnsupportedTypePresent() { - data.put("unsupportedType", new Object()); - assertThrows(IllegalArgumentException.class, () -> SchemaInjector.inject(data)); - } + @Test + void givenValidInputData_whenInject_thenReturnsCorrectSchemaAndPayload() { + final Map<String, Object> result = SchemaInjector.inject(DATA); + + assertNotNull(result, "Should not be null"); + + assertTrue(result.containsKey("schema"), "Should contain schema"); + assertTrue(result.containsKey("payload"), "Should contain payload"); + + final Map<String, Object> schema = (Map<String, Object>) result.get("schema"); + assertEquals("struct", schema.get("type"), "Should be struct"); + final List<Map<String, Object>> fields = (List<Map<String, Object>>) schema.get("fields"); + assertEquals(DATA.size(), fields.size(), "Should contain all fields"); + + for (final Map<String, Object> field : fields) { + final String fieldName = (String) field.get("field"); + assertTrue(DATA.containsKey(fieldName), String.format("Should contain field %s", fieldName)); + assertNotNull(field.get("type"), String.format("Should contain type for field %s", fieldName)); + + if (TIMESTAMP_COL.equals(fieldName)) { + assertFalse(Boolean.parseBoolean(field.get("optional").toString()), "Should not be optional"); + } else { + assertTrue(Boolean.parseBoolean(field.get("optional").toString()), "Should be optional"); + } + + if (TIMESTAMP_COL.equals(fieldName)) { + assertEquals(TIMESTAMP_TYPE, field.get("name"), "Should be timestamp"); + assertEquals(1, field.get("version"), "Should be version 1"); + } else if (DATE_COL.equals(fieldName)) { + assertEquals("org.apache.kafka.connect.data.Date", field.get("name"), "Should be date"); + assertEquals(1, field.get("version"), "Should be version 1"); + } + } + + final Map<String, Object> payload = (Map<String, Object>) result.get("payload"); + assertEquals(DATA, payload, "Should contain all fields"); + } + + @Test + void givenDataWithNullValue_whenInject_thenThrowsIllegalArgumentException() { + final Map<String, Object> data = new HashMap<>(DATA); + data.put("nullValue", null); + + assertThrows(IllegalArgumentException.class, () -> SchemaInjector.inject(data), "Should throw exception"); + } + + @Test + void givenDataWithUnsupportedType_whenInject_thenThrowsIllegalArgumentException() { + final Map<String, Object> data = new HashMap<>(DATA); + data.put("unsupportedType", new Object()); + + assertThrows(IllegalArgumentException.class, () -> SchemaInjector.inject(data), "Should throw exception"); + } } diff --git a/src/test/java/ch/cern/nile/common/schema/SchemaTestBase.java b/src/test/java/ch/cern/nile/common/schema/SchemaTestBase.java deleted file mode 100644 index 9eca0fa7a2dae6077f84b29a67536dbd92db13ef..0000000000000000000000000000000000000000 --- a/src/test/java/ch/cern/nile/common/schema/SchemaTestBase.java +++ /dev/null @@ -1,26 +0,0 @@ -package ch.cern.nile.common.schema; - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; -import org.junit.jupiter.api.BeforeEach; - -public class SchemaTestBase { - public Map<String, Object> data; - - @BeforeEach - void setUp() { - data = new HashMap<>(); - data.put("byte_col", (byte) 1); - data.put("short_col", (short) 2); - data.put("int_col", 3); - data.put("long_col", (long) 4); - data.put("float_col", 5.0f); - data.put("double_col", 6.0); - data.put("boolean_col", true); - data.put("string_col", "test"); - data.put("timestamp_col", 1501834166000L); - data.put("date_col", new Date()); - data.put("bytes_col", new byte[]{1, 2, 3}); - } -} diff --git a/src/test/java/ch/cern/nile/common/streams/StreamUtilsTest.java b/src/test/java/ch/cern/nile/common/streams/StreamUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cc94c90b1702474983f808df367a5b5bfd8d963c --- /dev/null +++ b/src/test/java/ch/cern/nile/common/streams/StreamUtilsTest.java @@ -0,0 +1,93 @@ +package ch.cern.nile.common.streams; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import org.junit.jupiter.api.Test; + +class StreamUtilsTest { + + private static final String KEY = "key"; + + @Test + void givenValidGatewayInfo_whenAddingTimestamp_thenTimestampIsAdded() { + final JsonArray gatewayInfo = new JsonArray(); + final JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("time", Instant.now().toString()); + gatewayInfo.add(jsonObject); + + final Map<String, Object> map = new HashMap<>(); + StreamUtils.addTimestamp(gatewayInfo, map); + + assertTrue(map.containsKey("timestamp"), "Timestamp not added to map."); + } + + @Test + void givenMissingAllGatewayTimestamps_whenAddingTimestamp_thenCurrentTimestampIsAdded() { + final JsonArray gatewayInfo = new JsonArray(); + final Map<String, Object> map = new HashMap<>(); + + StreamUtils.addTimestamp(gatewayInfo, map); + assertTrue(map.containsKey("timestamp"), "Timestamp not added to map."); + } + + @Test + void givenNonNullValue_whenFilteringNull_thenReturnsTrue() { + assertTrue(StreamUtils.filterNull(KEY, new Object()), "Non-null value should return true."); + } + + @Test + void givenNullValue_whenFilteringNull_thenReturnsFalse() { + assertFalse(StreamUtils.filterNull(KEY, null), "Null value should return false."); + } + + @Test + void givenNonEmptyList_whenFilteringEmpty_thenReturnsTrue() { + assertTrue(StreamUtils.filterEmpty(KEY, List.of(1, 2, 3)), "Non-empty list should return true."); + } + + @Test + void givenEmptyList_whenFilteringEmpty_thenReturnsFalse() { + assertFalse(StreamUtils.filterEmpty(KEY, List.of()), "Empty list should return false."); + } + + @Test + void givenNonEmptyMap_whenFilteringEmpty_thenReturnsTrue() { + final Map<String, String> nonEmptyMap = new HashMap<>(); + nonEmptyMap.put(KEY, "value"); + assertTrue(StreamUtils.filterEmpty(KEY, nonEmptyMap), "Non-empty map should return true."); + } + + @Test + void givenEmptyMap_whenFilteringEmpty_thenReturnsFalse() { + final Map<String, String> emptyMap = new HashMap<>(); + assertFalse(StreamUtils.filterEmpty(KEY, emptyMap), "Empty map should return false."); + } + + @Test + void givenValidJsonObject_whenFilteringRecord_thenReturnsTrue() { + final JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("applicationID", "appId"); + jsonObject.addProperty("applicationName", "appName"); + jsonObject.addProperty("deviceName", "deviceName"); + jsonObject.addProperty("devEUI", "devEUI"); + jsonObject.addProperty("data", "data"); + + assertTrue(StreamUtils.filterRecord(KEY, jsonObject), "Valid record should return true."); + } + + @Test + void givenInvalidJsonObject_whenFilteringRecord_thenReturnsFalse() { + final JsonObject jsonObject = new JsonObject(); + + assertFalse(StreamUtils.filterRecord(KEY, jsonObject), "Invalid record should return false."); + } +}