From 5e80c2b958f3a83c4e95d45ce7679f1c785268f1 Mon Sep 17 00:00:00 2001 From: Jonathan Christison Date: Wed, 23 Sep 2020 17:12:07 +0100 Subject: [PATCH] Cut down junk code, comment out tests which haven't been written yet --- quarkus-kafaka-homework/README.md | 4 +- .../kafka-consumer/pom.xml | 4 ++ .../main/java/me/jochrist/kafka/Consumer.java | 57 +------------------ .../resources/META-INF/resources/index.html | 2 +- .../java/me/jochrist/kafka/ConsumerTest.java | 30 ++++++---- 5 files changed, 28 insertions(+), 69 deletions(-) diff --git a/quarkus-kafaka-homework/README.md b/quarkus-kafaka-homework/README.md index f348bf3..22728c4 100644 --- a/quarkus-kafaka-homework/README.md +++ b/quarkus-kafaka-homework/README.md @@ -4,9 +4,9 @@ Brief Implement an automated OpenShift 4 solution that will include: -[ ] Strimzi - meh, sort of, working with local strimzi +[X] Strimzi - meh, sort of, working with local strimzi [X] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. - Done standalone -[ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily. +[X] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily. The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week. If you do not have an OCP4 cluster available, feel free to use CRC - https://developers.redhat.com/products/codeready-containers/overview diff --git a/quarkus-kafaka-homework/kafka-consumer/pom.xml b/quarkus-kafaka-homework/kafka-consumer/pom.xml index d6a15fb..872f8f7 100644 --- a/quarkus-kafaka-homework/kafka-consumer/pom.xml +++ b/quarkus-kafaka-homework/kafka-consumer/pom.xml @@ -57,6 +57,10 @@ io.quarkus quarkus-smallrye-reactive-messaging-kafka + + io.quarkus + quarkus-openshift + diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java index d578c6e..1577a40 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java +++ b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java @@ -48,13 +48,6 @@ public class Consumer { LOG.debug("%h will watch topic \"" + topic + "\""); } - //@Inject @Channel("user-message") Publisher usermessage; - //@Inject @Channel("timestamp") Publisher timestamp; - //public Publisher stream() - //{ - // LOG.info("timestamp: " + timestamp); - // return usermessage; - //} //Multiple incomming anotations dont work for some reason, its supported in the smallrye docs - //https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html @@ -72,6 +65,7 @@ public class Consumer { return record.ack(); } */ + @Incoming("timestamp") public CompletionStage consumeTimestamp(Message record){ return consume(record); @@ -91,34 +85,6 @@ public class Consumer { return record.ack(); } - /* - @Incoming("timestamp") - public CompletionStage consume(Message timestamp){ - Optional metadata = timestamp.getMetadata(IncomingKafkaRecordMetadata.class); - System.out.println(timestamp.getPayload()); - sendToSubscriber("timestamp", timestamp.getPayload()); - return timestamp.ack(); - } - */ - /* - @Incoming("user-message") - public CompletionStage consume2(Message usermessage){ - System.out.println(usermessage.getPayload()); - sendToSubscriber("user-message", usermessage.getPayload()); - return usermessage.ack(); - } - */ - //https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html - @Incoming("timestamp") - @Incoming("user-message") - public CompletionStage consume3(Message record) - { - System.out.println(record.getPayload()); - sendToSubscriber("timestamp", record.getPayload()); - return record.ack(); - } - - private void sendToSubscriber(String topic, String message) { if(sessions.containsKey(topic)) { @@ -131,25 +97,4 @@ public class Consumer { }); } } - /* - sessions.values().stream() - .filter(s -> topic.equals(s.getKey())) - .map(map -> map.getAsyncRemote().sendObject(message), result -> - { - if (result.getException() != null) { - System.out.println("Unable to send message: " + result.getException()); - } - }); - } - */ - - private void broadcast(String message) { - sessions.values().forEach(s -> { - s.getAsyncRemote().sendObject(message, result -> { - if (result.getException() != null) { - System.out.println("Unable to send message: " + result.getException()); - } - }); - }); - } } \ No newline at end of file diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html index becb7ff..d922981 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html +++ b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html @@ -79,7 +79,7 @@ console.log("Connected to the web socket"); $("#send").attr("disabled", false); $("#connect").attr("disabled", true); - $("#name").attr("disabled", true); + $("#name").attr("disabled", false); $("#msg").focus(); }; socket.onmessage =function(m) { diff --git a/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java b/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java index 69b16f1..558b51d 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java +++ b/quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java @@ -1,21 +1,31 @@ package me.jochrist.kafka; +import io.quarkus.test.common.http.TestHTTPResource; import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.Test; +import javax.websocket.ClientEndpoint; +import javax.websocket.ContainerProvider; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import org.junit.jupiter.api.Assertions; + -import static io.restassured.RestAssured.given; -import static org.hamcrest.CoreMatchers.is; @QuarkusTest public class ConsumerTest { +/* +//https://github.com/quarkusio/quarkus-quickstarts/blob/master/websockets-quickstart/src/test/java/org/acme/websockets/ChatTest.java +//https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-panache-quickstart/src/test/java/org/acme/panache/KafkaResource.java - @Test - public void testHelloEndpoint() { - given() - .when().get("/hello") - .then() - .statusCode(200) - .body(is("hello")); + @TestHTTPResource("/kafka/message/timestamp") + public void testTimestampWebSocket() { + try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) { + Assertions.assertEquals("CONNECT", MESSAGES.poll(10, TimeUnit.SECONDS)); + Assertions.assertEquals("User stu joined", MESSAGES.poll(10, TimeUnit.SECONDS)); + session.getAsyncRemote().sendText("hello world"); + Assertions.assertEquals(">> stu: hello world", MESSAGES.poll(10, TimeUnit.SECONDS)); + } } - +*/ } \ No newline at end of file