Browse Source

Not really cleanup

master
Jonathan Christison 5 years ago
parent
commit
3f4a81ee33
  1. 25
      quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
  2. 3
      quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java

25
quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java

@ -1,38 +1,24 @@
package me.jochrist.kafka;
import io.netty.handler.logging.LogLevel;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.enterprise.context.ApplicationScoped;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import javax.websocket.Session;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.annotations.Merge;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.jboss.logging.Logger;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.reactive.messaging.annotations.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;
import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import java.util.Optional;
//@Path("/kafka")
@ServerEndpoint("/kafka/message/{topic}")
@ApplicationScoped
public class Consumer {
@ -59,9 +45,8 @@ public class Consumer {
System.out.println(record.getPayload());
metadata.ifPresent(m -> {
sendToSubscriber(m.getTopic(), record.getPayload());
sendToWebsocketSession(m.getTopic(), record.getPayload());
});
//sendToSubscriber("timestamp", record.getPayload());
return record.ack();
}
*/
@ -80,12 +65,12 @@ public class Consumer {
{
Optional<IncomingKafkaRecordMetadata> metadata = record.getMetadata(IncomingKafkaRecordMetadata.class);
metadata.ifPresent(m -> {
sendToSubscriber(m.getTopic(), record.getPayload());
sendToWebsocketSession(m.getTopic(), record.getPayload());
});
return record.ack();
}
private void sendToSubscriber(String topic, String message) {
private void sendToWebsocketSession(String topic, String message) {
if(sessions.containsKey(topic))
{
Session s = sessions.get(topic);

3
quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java

@ -31,7 +31,6 @@ public class ProducerTest {
@Test
public void testMessage() {
SseEventSource source = SseEventSource.target(target).build();
given()
.when().get("/kafka/message/test")
.then()
@ -40,6 +39,7 @@ public class ProducerTest {
}
/*
@Inject @Any
InMemoryConnector connector;
@ -49,5 +49,6 @@ public class ProducerTest {
InMemorySink<String> timestamps = connector.sink("timestamp");
System.out.println(timestamps);
}
*/
}

Loading…
Cancel
Save