From 3f4a81ee338de773d84ba64b3fdfe37c5644ac17 Mon Sep 17 00:00:00 2001 From: Jonathan Christison Date: Thu, 24 Sep 2020 14:04:03 +0100 Subject: [PATCH] Not really cleanup --- .../main/java/me/jochrist/kafka/Consumer.java | 25 ++++--------------- .../java/me/jochrist/kafka/ProducerTest.java | 3 ++- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java index 1577a40..dff37b0 100644 --- a/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java +++ b/quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java @@ -1,38 +1,24 @@ package me.jochrist.kafka; -import io.netty.handler.logging.LogLevel; - -import javax.inject.Inject; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.enterprise.context.ApplicationScoped; -import javax.websocket.OnClose; -import javax.websocket.OnError; -import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import javax.websocket.Session; -import io.smallrye.reactive.messaging.annotations.Blocking; -import io.smallrye.reactive.messaging.annotations.Merge; -import org.eclipse.microprofile.reactive.messaging.Metadata; import org.jboss.logging.Logger; import org.eclipse.microprofile.reactive.messaging.Incoming; -import io.smallrye.reactive.messaging.annotations.Channel; import org.eclipse.microprofile.reactive.messaging.Message; import java.util.concurrent.CompletionStage; -import org.reactivestreams.Publisher; + import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata; import java.util.Optional; -//@Path("/kafka") + @ServerEndpoint("/kafka/message/{topic}") @ApplicationScoped public class Consumer { @@ -59,9 +45,8 @@ public class Consumer { System.out.println(record.getPayload()); metadata.ifPresent(m -> { - sendToSubscriber(m.getTopic(), record.getPayload()); + sendToWebsocketSession(m.getTopic(), record.getPayload()); }); - //sendToSubscriber("timestamp", record.getPayload()); return record.ack(); } */ @@ -80,12 +65,12 @@ public class Consumer { { Optional metadata = record.getMetadata(IncomingKafkaRecordMetadata.class); metadata.ifPresent(m -> { - sendToSubscriber(m.getTopic(), record.getPayload()); + sendToWebsocketSession(m.getTopic(), record.getPayload()); }); return record.ack(); } - private void sendToSubscriber(String topic, String message) { + private void sendToWebsocketSession(String topic, String message) { if(sessions.containsKey(topic)) { Session s = sessions.get(topic); diff --git a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java index 3f2fca1..9b621e1 100644 --- a/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java +++ b/quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java @@ -31,7 +31,6 @@ public class ProducerTest { @Test public void testMessage() { - SseEventSource source = SseEventSource.target(target).build(); given() .when().get("/kafka/message/test") .then() @@ -40,6 +39,7 @@ public class ProducerTest { } + /* @Inject @Any InMemoryConnector connector; @@ -49,5 +49,6 @@ public class ProducerTest { InMemorySink timestamps = connector.sink("timestamp"); System.out.println(timestamps); } + */ }