Browse Source

Consumer is being difficult, doesn't allow multiple @Incoming statements

master
Jonathan Christison 5 years ago
parent
commit
dd68b4475b
  1. 7
      quarkus-kafaka-homework/README.md
  2. 59
      quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
  3. 2
      quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html
  4. 1
      quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties

7
quarkus-kafaka-homework/README.md

@ -4,8 +4,8 @@ Brief
Implement an automated OpenShift 4 solution that will include: Implement an automated OpenShift 4 solution that will include:
[ ] Strimzi [ ] Strimzi - meh, sort of, working with local strimzi
[ ] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. [X] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. - Done standalone
[ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily. [ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily.
The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week. The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week.
@ -30,3 +30,6 @@ If you do not have an OCP4 cluster available, feel free to use CRC - https://dev
* HTML+JS * HTML+JS
* Debugging mode (maven profile) * Debugging mode (maven profile)
For testing `npm install wscat` and then `wscat -c http://localhost:8080/kafka/message/foo`

59
quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java

@ -18,6 +18,10 @@ import javax.websocket.OnOpen;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import javax.websocket.Session; import javax.websocket.Session;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.annotations.Merge;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Incoming;
@ -25,6 +29,8 @@ import io.smallrye.reactive.messaging.annotations.Channel;
import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Message;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecordMetadata;
import java.util.Optional;
//@Path("/kafka") //@Path("/kafka")
@ServerEndpoint("/kafka/message/{topic}") @ServerEndpoint("/kafka/message/{topic}")
@ -37,7 +43,7 @@ public class Consumer {
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("topic") String topic) public void onOpen(Session session, @PathParam("topic") String topic)
{ {
//Join kafka topic matching topid name (timestamp or user-message) //Join kafka topic matching topic name (timestamp or user-message)
sessions.put(topic, session); sessions.put(topic, session);
LOG.debug("%h will watch topic \"" + topic + "\""); LOG.debug("%h will watch topic \"" + topic + "\"");
} }
@ -50,19 +56,68 @@ public class Consumer {
// return usermessage; // return usermessage;
//} //}
//Multiple incomming anotations dont work for some reason, its supported in the smallrye docs -
//https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
/*
@Incoming("timestamp")
@Incoming("user-message")
public CompletionStage<Void> consume(Message<String> record){
Optional<IncomingKafkaRecordMetadata> metadata = record.getMetadata(IncomingKafkaRecordMetadata.class);
System.out.println(record.getPayload());
metadata.ifPresent(m -> {
sendToSubscriber(m.getTopic(), record.getPayload());
});
//sendToSubscriber("timestamp", record.getPayload());
return record.ack();
}
*/
@Incoming("timestamp")
public CompletionStage<Void> consumeTimestamp(Message<String> record){
return consume(record);
}
@Incoming("user-message")
public CompletionStage<Void> consumeUserMessage(Message<String> record){
return consume(record);
}
public CompletionStage<Void> consume(Message<String> record)
{
Optional<IncomingKafkaRecordMetadata> metadata = record.getMetadata(IncomingKafkaRecordMetadata.class);
metadata.ifPresent(m -> {
sendToSubscriber(m.getTopic(), record.getPayload());
});
return record.ack();
}
/*
@Incoming("timestamp") @Incoming("timestamp")
public CompletionStage<Void> consume(Message<String> timestamp){ public CompletionStage<Void> consume(Message<String> timestamp){
Optional<IncomingKafkaRecordMetadata> metadata = timestamp.getMetadata(IncomingKafkaRecordMetadata.class);
System.out.println(timestamp.getPayload()); System.out.println(timestamp.getPayload());
sendToSubscriber("timestamp", timestamp.getPayload()); sendToSubscriber("timestamp", timestamp.getPayload());
return timestamp.ack(); return timestamp.ack();
} }
*/
/*
@Incoming("user-message") @Incoming("user-message")
public CompletionStage<Void> consume2(Message<String> usermessage){ public CompletionStage<Void> consume2(Message<String> usermessage){
System.out.println(usermessage.getPayload()); System.out.println(usermessage.getPayload());
sendToSubscriber("user-message", usermessage.getPayload()); sendToSubscriber("user-message", usermessage.getPayload());
return usermessage.ack(); return usermessage.ack();
} }
*/
//https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
@Incoming("timestamp")
@Incoming("user-message")
public CompletionStage<Void> consume3(Message<String> record)
{
System.out.println(record.getPayload());
sendToSubscriber("timestamp", record.getPayload());
return record.ack();
}
private void sendToSubscriber(String topic, String message) { private void sendToSubscriber(String topic, String message) {
if(sessions.containsKey(topic)) if(sessions.containsKey(topic))

2
quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html

@ -3,7 +3,7 @@
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<title>Quarkus Chat!</title> <title>Quarkus Kafka example stolen from chat application :-)</title>
<link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css"> <link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly.min.css">
<link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css"> <link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/patternfly/3.24.0/css/patternfly-additions.min.css">

1
quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties

@ -1,5 +1,6 @@
# Configuration file # Configuration file
# key = value # key = value
quarkus.http.port=8080
kafka.bootstrap.servers=localhost:9092 kafka.bootstrap.servers=localhost:9092
# Configure the Kafka sink (we write to it) # Configure the Kafka sink (we write to it)

Loading…
Cancel
Save