From 6fe78f815cccafc6cd06cc95382455039aa28d15 Mon Sep 17 00:00:00 2001 From: Jonathan Christison Date: Mon, 28 Sep 2020 09:52:01 +0100 Subject: [PATCH] Add in memory messaging test and test basic user-message and timestamp --- .../kafka-producer/pom.xml | 5 +++ .../src/main/resources/application.properties | 9 +++++ .../java/me/jochrist/kafka/KafkaResource.java | 22 +++++++++--- .../java/me/jochrist/kafka/ProducerTest.java | 36 +++++++++---------- 4 files changed, 47 insertions(+), 25 deletions(-) diff --git a/quarkus-kafaka-homework/kafka-producer/pom.xml b/quarkus-kafaka-homework/kafka-producer/pom.xml index 12d907a..726edf3 100644 --- a/quarkus-kafaka-homework/kafka-producer/pom.xml +++ b/quarkus-kafaka-homework/kafka-producer/pom.xml @@ -75,6 +75,11 @@ ${testcontainers.version} test + + org.awaitility + awaitility + test + diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties index 19e0674..c0cd619 100644 --- a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties +++ b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties @@ -24,3 +24,12 @@ mp.messaging.outgoing.user-message.value.serializer=org.apache.kafka.common.seri # Configure the Kafka source (we read from it) mp.messaging.incoming.prices.connector=smallrye-kafka mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer + +#Tests +#%test.mp.messaging.incoming.user-message.connector=smallrye-kafka +#%test.mp.messaging.incoming.user-message.topic=user-message +#%test.mp.messaging.incoming.user-message.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +#%test.mp.messaging.incoming.timestamp.connector=smallrye-kafka +#%test.mp.messaging.incoming.timestamp.topic=timestamp +#%test.mp.messaging.incoming.timestamp.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ No newline at end of file diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java index 698d0bd..5c9d3e5 100644 --- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java +++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java @@ -1,20 +1,32 @@ package me.jochrist.kafka; import java.util.Collections; import java.util.Map; + +import io.smallrye.reactive.messaging.connectors.InMemoryConnector; import org.testcontainers.containers.KafkaContainer; import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import java.util.HashMap; +import java.util.Map; public class KafkaResource implements QuarkusTestResourceLifecycleManager { - private final KafkaContainer kafka = new KafkaContainer(); @Override - public Map start() { - kafka.start(); - return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers()); + public Map start() + { + Map env = new HashMap<>(); + Map timestampInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("timestamp"); + Map userMessageInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("user-message"); + //Map timestampInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("timestamp"); + //Map userMessageInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("user-message"); + //env.putAll(timestampInMemoryIncoming); + //env.putAll(userMessageInMemoryIncoming); + env.putAll(timestampInMemoryOutgoing); + env.putAll(userMessageInMemoryOutgoing); + return env; } @Override public void stop() { - kafka.close(); + InMemoryConnector.clear(); } } diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java index 9b621e1..82d06ff 100644 --- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java +++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java @@ -1,54 +1,50 @@ package me.jochrist.kafka; + import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.Test; - import static io.restassured.RestAssured.given; -import static org.hamcrest.CoreMatchers.is; import io.smallrye.reactive.messaging.connectors.InMemoryConnector; -import io.smallrye.reactive.messaging.connectors.InMemorySource; import io.smallrye.reactive.messaging.connectors.InMemorySink; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; import javax.enterprise.inject.Any; import javax.inject.Inject; import io.quarkus.test.common.QuarkusTestResource; - -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import javax.ws.rs.sse.SseEventSource; +import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; @QuarkusTest @QuarkusTestResource(KafkaResource.class) public class ProducerTest { + @Inject @Any + InMemoryConnector connector; //See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/204 //As we dont need to return anything 204 is still a success @Test public void testMessage() { + //InMemorySink timestamps = connector.sink("timestamp"); + String message = "test"; + + InMemorySink userMessageSink = connector.sink("user-message"); given() - .when().get("/kafka/message/test") + .when().get("/kafka/message/"+message) .then() .statusCode(204); - //InMemorySink results = connector.sink("user-message"); - } + Assertions.assertEquals(1, userMessageSink.received().size(), "In memory messaging sink is the wrong size"); - /* - @Inject @Any - InMemoryConnector connector; + String pl = userMessageSink.received().get(0).getPayload(); + Assertions.assertEquals(message, pl, "In memory messaging sink has the wrong string, have we picked it up out of order?"); + } @Test public void testTimestamp() { - InMemorySink timestamps = connector.sink("timestamp"); - System.out.println(timestamps); + InMemorySink timestampSink = connector.sink("timestamp"); + await().atMost(5, TimeUnit.SECONDS).until(() -> timestampSink.received().size() > 0); } - */ }