diff --git a/quarkus-kafaka-homework/kafka-producer/pom.xml b/quarkus-kafaka-homework/kafka-producer/pom.xml
index 12d907a..726edf3 100644
--- a/quarkus-kafaka-homework/kafka-producer/pom.xml
+++ b/quarkus-kafaka-homework/kafka-producer/pom.xml
@@ -75,6 +75,11 @@
${testcontainers.version}
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
index 19e0674..c0cd619 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
+++ b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
@@ -24,3 +24,12 @@ mp.messaging.outgoing.user-message.value.serializer=org.apache.kafka.common.seri
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
+
+#Tests
+#%test.mp.messaging.incoming.user-message.connector=smallrye-kafka
+#%test.mp.messaging.incoming.user-message.topic=user-message
+#%test.mp.messaging.incoming.user-message.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
+#%test.mp.messaging.incoming.timestamp.connector=smallrye-kafka
+#%test.mp.messaging.incoming.timestamp.topic=timestamp
+#%test.mp.messaging.incoming.timestamp.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
\ No newline at end of file
diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
index 698d0bd..5c9d3e5 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
@@ -1,20 +1,32 @@
package me.jochrist.kafka;
import java.util.Collections;
import java.util.Map;
+
+import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import org.testcontainers.containers.KafkaContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import java.util.HashMap;
+import java.util.Map;
public class KafkaResource implements QuarkusTestResourceLifecycleManager {
- private final KafkaContainer kafka = new KafkaContainer();
@Override
- public Map start() {
- kafka.start();
- return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
+ public Map start()
+ {
+ Map env = new HashMap<>();
+ Map timestampInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("timestamp");
+ Map userMessageInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("user-message");
+ //Map timestampInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("timestamp");
+ //Map userMessageInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("user-message");
+ //env.putAll(timestampInMemoryIncoming);
+ //env.putAll(userMessageInMemoryIncoming);
+ env.putAll(timestampInMemoryOutgoing);
+ env.putAll(userMessageInMemoryOutgoing);
+ return env;
}
@Override
public void stop() {
- kafka.close();
+ InMemoryConnector.clear();
}
}
diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
index 9b621e1..82d06ff 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
@@ -1,54 +1,50 @@
package me.jochrist.kafka;
+
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
-
import static io.restassured.RestAssured.given;
-import static org.hamcrest.CoreMatchers.is;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
-import io.smallrye.reactive.messaging.connectors.InMemorySource;
import io.smallrye.reactive.messaging.connectors.InMemorySink;
-import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import javax.enterprise.inject.Any;
import javax.inject.Inject;
import io.quarkus.test.common.QuarkusTestResource;
-
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.sse.SseEventSource;
+import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(KafkaResource.class)
public class ProducerTest {
+ @Inject @Any
+ InMemoryConnector connector;
//See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/204
//As we dont need to return anything 204 is still a success
@Test
public void testMessage() {
+ //InMemorySink timestamps = connector.sink("timestamp");
+ String message = "test";
+
+ InMemorySink userMessageSink = connector.sink("user-message");
given()
- .when().get("/kafka/message/test")
+ .when().get("/kafka/message/"+message)
.then()
.statusCode(204);
- //InMemorySink results = connector.sink("user-message");
- }
+ Assertions.assertEquals(1, userMessageSink.received().size(), "In memory messaging sink is the wrong size");
- /*
- @Inject @Any
- InMemoryConnector connector;
+ String pl = userMessageSink.received().get(0).getPayload();
+ Assertions.assertEquals(message, pl, "In memory messaging sink has the wrong string, have we picked it up out of order?");
+ }
@Test
public void testTimestamp()
{
- InMemorySink timestamps = connector.sink("timestamp");
- System.out.println(timestamps);
+ InMemorySink timestampSink = connector.sink("timestamp");
+ await().atMost(5, TimeUnit.SECONDS).until(() -> timestampSink.received().size() > 0);
}
- */
}