diff --git a/quarkus-kafaka-homework/kafka-producer/pom.xml b/quarkus-kafaka-homework/kafka-producer/pom.xml
index 02d6cac..12d907a 100644
--- a/quarkus-kafaka-homework/kafka-producer/pom.xml
+++ b/quarkus-kafaka-homework/kafka-producer/pom.xml
@@ -63,11 +63,20 @@
io.quarkus
quarkus-smallrye-openapi
+
- io.quarkus
- quarkus-container-image-jib
+ io.smallrye.reactive
+ smallrye-reactive-messaging-in-memory
+ test
+
+
+ org.testcontainers
+ kafka
+ ${testcontainers.version}
+ test
-
+
+
diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/MessageProducer.java b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/MessageProducer.java
index 172f416..5b68801 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/MessageProducer.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/MessageProducer.java
@@ -1,15 +1,10 @@
package me.jochrist.kafka;
-import io.reactivex.Flowable;
-import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
-import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
+
@ApplicationScoped
public class MessageProducer {
diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/Producer.java b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/Producer.java
index c588f7f..bc53f7c 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/Producer.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/Producer.java
@@ -14,17 +14,6 @@ import org.jboss.resteasy.annotations.jaxrs.PathParam;
@Path("/kafka")
public class Producer {
- @Inject
- Vertx vertx;
-
- /* Simple example
- @GET
- @Produces(MediaType.TEXT_PLAIN)
- @Path("/message/{message}")
- public String hello(@PathParam String message) {
- return "hello " + message;
- }
- */
@Inject MessageProducer messageProducer;
@GET
@Produces(MediaType.TEXT_PLAIN)
@@ -34,13 +23,6 @@ public class Producer {
messageProducer.sendMessage(message);
}
- /*
- public Uni sendKafkaMessage()
- {
- return Uni.createFrom().item(() -> )
- }
- */
@Inject TimeStampProducer timeStampProducer;
-
}
\ No newline at end of file
diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/TimeStampProducer.java b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/TimeStampProducer.java
index 15c588a..2b6ff1b 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/TimeStampProducer.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/main/java/me/jochrist/kafka/TimeStampProducer.java
@@ -4,13 +4,10 @@ import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.sql.Timestamp;
-import java.util.Date;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
@ApplicationScoped
public class TimeStampProducer {
- //private Timestamp timestamp = new Timestamp(System.currentTimeMillis());
@Outgoing("timestamp")
public Flowable generate()
diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
index e484437..19e0674 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
+++ b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
@@ -3,6 +3,7 @@
quarkus.kubernetes-client.trust-certs=true
kafka.bootstrap.servers=kafka-cluster-kafka-bootstrap:9092
%dev.kafka.bootstrap.servers=localhost:9092
+%test.kafka.bootstrap.servers=localhost:9092
%dev.quarkus.http.port=8080
quarkus.http.port=8080
quarkus.swagger-ui.always-include=true
diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
new file mode 100644
index 0000000..698d0bd
--- /dev/null
+++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
@@ -0,0 +1,20 @@
+package me.jochrist.kafka;
+import java.util.Collections;
+import java.util.Map;
+import org.testcontainers.containers.KafkaContainer;
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+
+public class KafkaResource implements QuarkusTestResourceLifecycleManager {
+ private final KafkaContainer kafka = new KafkaContainer();
+
+ @Override
+ public Map start() {
+ kafka.start();
+ return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
+ }
+
+ @Override
+ public void stop() {
+ kafka.close();
+ }
+}
diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
index 07bb236..68996b9 100644
--- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
+++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java
@@ -5,18 +5,56 @@ import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
+import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
+import io.smallrye.reactive.messaging.connectors.InMemorySource;
+import io.smallrye.reactive.messaging.connectors.InMemorySink;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import javax.enterprise.inject.Any;
+import javax.inject.Inject;
+import io.quarkus.test.common.QuarkusTestResource;
+
@QuarkusTest
+@QuarkusTestResource(KafkaResource.class)
public class ProducerTest {
-/*
- * @Test
- public void testHelloEndpoint() {
+ @BeforeAll
+ public static void switchMyChannels() {
+ InMemoryConnector.switchIncomingChannelsToInMemory("timestamp");
+ InMemoryConnector.switchIncomingChannelsToInMemory("user-message");
+ InMemoryConnector.switchOutgoingChannelsToInMemory("timestamp");
+ InMemoryConnector.switchOutgoingChannelsToInMemory("user-message");
+ }
+
+ @AfterAll
+ public static void revertMyChannels() {
+ InMemoryConnector.clear();
+ }
+
+
+ //See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/204
+ //As we dont need to return anything 204 is still a success
+ @Test
+ public void testMessage() {
given()
- .when().get("/kafka/message")
+ .when().get("/kafka/message/test")
.then()
- .statusCode(200)
- .body(is("hello"));
+ .statusCode(204);
+ //InMemorySink results = connector.sink("user-message");
+
+ }
+
+ @Inject @Any
+ InMemoryConnector connector;
+
+ @Test
+ public void testTimestamp()
+ {
+ InMemorySink timestamps = connector.sink("timestamp");
+ System.out.println(timestamps);
}
-*/
+
}
diff --git a/quarkus-kafaka-homework/pom.xml b/quarkus-kafaka-homework/pom.xml
index e0e4f96..dd9a897 100644
--- a/quarkus-kafaka-homework/pom.xml
+++ b/quarkus-kafaka-homework/pom.xml
@@ -9,6 +9,21 @@
jochrist hacky Kafka project :: Parent
+
+
+
+ docker
+
+ false
+
+
+ docker-resources
+
+
+
kafka-consumer
kafka-producer
@@ -27,23 +42,10 @@
1.8.1.Final
3.0.0-M5
s2i
+ 1.14.1
-
-
-
- docker
-
- false
-
-
- docker-resources
-
-
-
+
scm:git:ssh://git@gitlab.cee.redhat.com/FOO
1.0-SNAPSHOT