From dd68b4475ba7b19104e6924a4dc87c2b9cfc91aa Mon Sep 17 00:00:00 2001 From: Jonathan Christison Date: Wed, 23 Sep 2020 15:30:18 +0100 Subject: [PATCH] Consumer is being difficult, doesn't allow multiple @Incoming statements --- quarkus-kafaka-homework/README.md | 7 ++- .../main/java/me/jochrist/kafka/Consumer.java | 59 ++++++++++++++++++- .../resources/META-INF/resources/index.html | 2 +- .../src/main/resources/application.properties | 1 + 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/quarkus-kafaka-homework/README.md b/quarkus-kafaka-homework/README.md index f51f807..f348bf3 100644 --- a/quarkus-kafaka-homework/README.md +++ b/quarkus-kafaka-homework/README.md @@ -4,8 +4,8 @@ Brief Implement an automated OpenShift 4 solution that will include: -[ ] Strimzi -[ ] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. +[ ] Strimzi - meh, sort of, working with local strimzi +[X] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. - Done standalone [ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily. The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week. @@ -30,3 +30,6 @@ If you do not have an OCP4 cluster available, feel free to use CRC - https://dev * HTML+JS * Debugging mode (maven profile) +For testing `npm install wscat` and then `wscat -c http://localhost:8080/kafka/message/foo` + + diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java index d9c4fd6..d578c6e 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java +++ b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java @@ -18,6 +18,10 @@ import javax.websocket.OnOpen; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import javax.websocket.Session; + +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.annotations.Merge; +import org.eclipse.microprofile.reactive.messaging.Metadata; import org.jboss.logging.Logger; import org.eclipse.microprofile.reactive.messaging.Incoming; @@ -25,6 +29,8 @@ import io.smallrye.reactive.messaging.annotations.Channel; import org.eclipse.microprofile.reactive.messaging.Message; import java.util.concurrent.CompletionStage; import org.reactivestreams.Publisher; +import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; +import java.util.Optional; //@Path("/kafka") @ServerEndpoint("/kafka/message/{topic}") @@ -37,7 +43,7 @@ public class Consumer { @OnOpen public void onOpen(Session session, @PathParam("topic") String topic) { - //Join kafka topic matching topid name (timestamp or user-message) + //Join kafka topic matching topic name (timestamp or user-message) sessions.put(topic, session); LOG.debug("%h will watch topic \"" + topic + "\""); } @@ -50,19 +56,68 @@ public class Consumer { // return usermessage; //} + //Multiple incomming anotations dont work for some reason, its supported in the smallrye docs - + //https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html + /* + @Incoming("timestamp") + @Incoming("user-message") + public CompletionStage consume(Message record){ + Optional metadata = record.getMetadata(IncomingKafkaRecordMetadata.class); + System.out.println(record.getPayload()); + + metadata.ifPresent(m -> { + sendToSubscriber(m.getTopic(), record.getPayload()); + }); + //sendToSubscriber("timestamp", record.getPayload()); + return record.ack(); + } + */ + @Incoming("timestamp") + public CompletionStage consumeTimestamp(Message record){ + return consume(record); + } + + @Incoming("user-message") + public CompletionStage consumeUserMessage(Message record){ + return consume(record); + } + + public CompletionStage consume(Message record) + { + Optional metadata = record.getMetadata(IncomingKafkaRecordMetadata.class); + metadata.ifPresent(m -> { + sendToSubscriber(m.getTopic(), record.getPayload()); + }); + return record.ack(); + } + + /* @Incoming("timestamp") public CompletionStage consume(Message timestamp){ + Optional metadata = timestamp.getMetadata(IncomingKafkaRecordMetadata.class); System.out.println(timestamp.getPayload()); sendToSubscriber("timestamp", timestamp.getPayload()); return timestamp.ack(); } - + */ + /* @Incoming("user-message") public CompletionStage consume2(Message usermessage){ System.out.println(usermessage.getPayload()); sendToSubscriber("user-message", usermessage.getPayload()); return usermessage.ack(); } + */ + //https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html + @Incoming("timestamp") + @Incoming("user-message") + public CompletionStage consume3(Message record) + { + System.out.println(record.getPayload()); + sendToSubscriber("timestamp", record.getPayload()); + return record.ack(); + } + private void sendToSubscriber(String topic, String message) { if(sessions.containsKey(topic)) diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html index fc3ba1a..becb7ff 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html +++ b/quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html @@ -3,7 +3,7 @@ - Quarkus Chat! + Quarkus Kafka example stolen from chat application :-) diff --git a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties index daa96f8..cf240e1 100644 --- a/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties +++ b/quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties @@ -1,5 +1,6 @@ # Configuration file # key = value +quarkus.http.port=8080 kafka.bootstrap.servers=localhost:9092 # Configure the Kafka sink (we write to it)