diff --git a/quarkus-kafaka-homework/README.md b/quarkus-kafaka-homework/README.md
index f348bf3..22728c4 100644
--- a/quarkus-kafaka-homework/README.md
+++ b/quarkus-kafaka-homework/README.md
@@ -4,9 +4,9 @@ Brief
Implement an automated OpenShift 4 solution that will include:
-[ ] Strimzi - meh, sort of, working with local strimzi
+[X] Strimzi - meh, sort of, working with local strimzi
[X] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. - Done standalone
-[ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily.
+[X] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily.
The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week.
If you do not have an OCP4 cluster available, feel free to use CRC - https://developers.redhat.com/products/codeready-containers/overview
diff --git a/quarkus-kafaka-homework/kafka-consumer/pom.xml b/quarkus-kafaka-homework/kafka-consumer/pom.xml
index d6a15fb..872f8f7 100644
--- a/quarkus-kafaka-homework/kafka-consumer/pom.xml
+++ b/quarkus-kafaka-homework/kafka-consumer/pom.xml
@@ -57,6 +57,10 @@
io.quarkus
quarkus-smallrye-reactive-messaging-kafka
+
+ io.quarkus
+ quarkus-openshift
+
diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
index d578c6e..1577a40 100644
--- a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
+++ b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
@@ -48,13 +48,6 @@ public class Consumer {
LOG.debug("%h will watch topic \"" + topic + "\"");
}
- //@Inject @Channel("user-message") Publisher usermessage;
- //@Inject @Channel("timestamp") Publisher timestamp;
- //public Publisher stream()
- //{
- // LOG.info("timestamp: " + timestamp);
- // return usermessage;
- //}
//Multiple incomming anotations dont work for some reason, its supported in the smallrye docs -
//https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
@@ -72,6 +65,7 @@ public class Consumer {
return record.ack();
}
*/
+
@Incoming("timestamp")
public CompletionStage consumeTimestamp(Message record){
return consume(record);
@@ -91,34 +85,6 @@ public class Consumer {
return record.ack();
}
- /*
- @Incoming("timestamp")
- public CompletionStage consume(Message timestamp){
- Optional metadata = timestamp.getMetadata(IncomingKafkaRecordMetadata.class);
- System.out.println(timestamp.getPayload());
- sendToSubscriber("timestamp", timestamp.getPayload());
- return timestamp.ack();
- }
- */
- /*
- @Incoming("user-message")
- public CompletionStage consume2(Message usermessage){
- System.out.println(usermessage.getPayload());
- sendToSubscriber("user-message", usermessage.getPayload());
- return usermessage.ack();
- }
- */
- //https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
- @Incoming("timestamp")
- @Incoming("user-message")
- public CompletionStage consume3(Message record)
- {
- System.out.println(record.getPayload());
- sendToSubscriber("timestamp", record.getPayload());
- return record.ack();
- }
-
-
private void sendToSubscriber(String topic, String message) {
if(sessions.containsKey(topic))
{
@@ -131,25 +97,4 @@ public class Consumer {
});
}
}
- /*
- sessions.values().stream()
- .filter(s -> topic.equals(s.getKey()))
- .map(map -> map.getAsyncRemote().sendObject(message), result ->
- {
- if (result.getException() != null) {
- System.out.println("Unable to send message: " + result.getException());
- }
- });
- }
- */
-
- private void broadcast(String message) {
- sessions.values().forEach(s -> {
- s.getAsyncRemote().sendObject(message, result -> {
- if (result.getException() != null) {
- System.out.println("Unable to send message: " + result.getException());
- }
- });
- });
- }
}
\ No newline at end of file
diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html
index becb7ff..d922981 100644
--- a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html
+++ b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html
@@ -79,7 +79,7 @@
console.log("Connected to the web socket");
$("#send").attr("disabled", false);
$("#connect").attr("disabled", true);
- $("#name").attr("disabled", true);
+ $("#name").attr("disabled", false);
$("#msg").focus();
};
socket.onmessage =function(m) {
diff --git a/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java b/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java
index 69b16f1..558b51d 100644
--- a/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java
+++ b/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java
@@ -1,21 +1,31 @@
package me.jochrist.kafka;
+import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
+import javax.websocket.ClientEndpoint;
+import javax.websocket.ContainerProvider;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import org.junit.jupiter.api.Assertions;
+
-import static io.restassured.RestAssured.given;
-import static org.hamcrest.CoreMatchers.is;
@QuarkusTest
public class ConsumerTest {
+/*
+//https://github.com/quarkusio/quarkus-quickstarts/blob/master/websockets-quickstart/src/test/java/org/acme/websockets/ChatTest.java
+//https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-panache-quickstart/src/test/java/org/acme/panache/KafkaResource.java
- @Test
- public void testHelloEndpoint() {
- given()
- .when().get("/hello")
- .then()
- .statusCode(200)
- .body(is("hello"));
+ @TestHTTPResource("/kafka/message/timestamp")
+ public void testTimestampWebSocket() {
+ try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
+ Assertions.assertEquals("CONNECT", MESSAGES.poll(10, TimeUnit.SECONDS));
+ Assertions.assertEquals("User stu joined", MESSAGES.poll(10, TimeUnit.SECONDS));
+ session.getAsyncRemote().sendText("hello world");
+ Assertions.assertEquals(">> stu: hello world", MESSAGES.poll(10, TimeUnit.SECONDS));
+ }
}
-
+*/
}
\ No newline at end of file