Browse Source

Cut down junk code, comment out tests which haven't been written yet

master
Jonathan Christison 5 years ago
parent
commit
5e80c2b958
  1. 4
      quarkus-kafaka-homework/README.md
  2. 4
      quarkus-kafaka-homework/kafka-consumer/pom.xml
  3. 57
      quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java
  4. 2
      quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html
  5. 30
      quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java

4
quarkus-kafaka-homework/README.md

@ -4,9 +4,9 @@ Brief
Implement an automated OpenShift 4 solution that will include:
[ ] Strimzi - meh, sort of, working with local strimzi
[X] Strimzi - meh, sort of, working with local strimzi
[X] Quarkus Kafka producer application. This application should periodically send data (e.g. timestamp) to Kafka and should also have a JAX-RS endpoint to send different data per user request. - Done standalone
[ ] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily.
[X] Quarkus Kafka consumer application. This application should present the data to a websocket. There should be a way for the user to display the data easily.
The solution should be comfortable to deploy, test, debug and use. The implementation time frame is one week.
If you do not have an OCP4 cluster available, feel free to use CRC - https://developers.redhat.com/products/codeready-containers/overview

4
quarkus-kafaka-homework/kafka-consumer/pom.xml

@ -57,6 +57,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-openshift</artifactId>
</dependency>
</dependencies>
<build>
<plugins>

57
quarkus-kafaka-homework/kafka-consumer/src/main/java/me/jochrist/kafka/Consumer.java

@ -48,13 +48,6 @@ public class Consumer {
LOG.debug("%h will watch topic \"" + topic + "\"");
}
//@Inject @Channel("user-message") Publisher<String> usermessage;
//@Inject @Channel("timestamp") Publisher<String> timestamp;
//public Publisher<String> stream()
//{
// LOG.info("timestamp: " + timestamp);
// return usermessage;
//}
//Multiple incomming anotations dont work for some reason, its supported in the smallrye docs -
//https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
@ -72,6 +65,7 @@ public class Consumer {
return record.ack();
}
*/
@Incoming("timestamp")
public CompletionStage<Void> consumeTimestamp(Message<String> record){
return consume(record);
@ -91,34 +85,6 @@ public class Consumer {
return record.ack();
}
/*
@Incoming("timestamp")
public CompletionStage<Void> consume(Message<String> timestamp){
Optional<IncomingKafkaRecordMetadata> metadata = timestamp.getMetadata(IncomingKafkaRecordMetadata.class);
System.out.println(timestamp.getPayload());
sendToSubscriber("timestamp", timestamp.getPayload());
return timestamp.ack();
}
*/
/*
@Incoming("user-message")
public CompletionStage<Void> consume2(Message<String> usermessage){
System.out.println(usermessage.getPayload());
sendToSubscriber("user-message", usermessage.getPayload());
return usermessage.ack();
}
*/
//https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.2/advanced/incomings.html
@Incoming("timestamp")
@Incoming("user-message")
public CompletionStage<Void> consume3(Message<String> record)
{
System.out.println(record.getPayload());
sendToSubscriber("timestamp", record.getPayload());
return record.ack();
}
private void sendToSubscriber(String topic, String message) {
if(sessions.containsKey(topic))
{
@ -131,25 +97,4 @@ public class Consumer {
});
}
}
/*
sessions.values().stream()
.filter(s -> topic.equals(s.getKey()))
.map(map -> map.getAsyncRemote().sendObject(message), result ->
{
if (result.getException() != null) {
System.out.println("Unable to send message: " + result.getException());
}
});
}
*/
private void broadcast(String message) {
sessions.values().forEach(s -> {
s.getAsyncRemote().sendObject(message, result -> {
if (result.getException() != null) {
System.out.println("Unable to send message: " + result.getException());
}
});
});
}
}

2
quarkus-kafaka-homework/kafka-consumer/src/main/resources/META-INF/resources/index.html

@ -79,7 +79,7 @@
console.log("Connected to the web socket");
$("#send").attr("disabled", false);
$("#connect").attr("disabled", true);
$("#name").attr("disabled", true);
$("#name").attr("disabled", false);
$("#msg").focus();
};
socket.onmessage =function(m) {

30
quarkus-kafaka-homework/kafka-consumer/src/test/java/me/jochrist/kafka/ConsumerTest.java

@ -1,21 +1,31 @@
package me.jochrist.kafka;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import org.junit.jupiter.api.Assertions;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
@QuarkusTest
public class ConsumerTest {
/*
//https://github.com/quarkusio/quarkus-quickstarts/blob/master/websockets-quickstart/src/test/java/org/acme/websockets/ChatTest.java
//https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-panache-quickstart/src/test/java/org/acme/panache/KafkaResource.java
@Test
public void testHelloEndpoint() {
given()
.when().get("/hello")
.then()
.statusCode(200)
.body(is("hello"));
@TestHTTPResource("/kafka/message/timestamp")
public void testTimestampWebSocket() {
try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
Assertions.assertEquals("CONNECT", MESSAGES.poll(10, TimeUnit.SECONDS));
Assertions.assertEquals("User stu joined", MESSAGES.poll(10, TimeUnit.SECONDS));
session.getAsyncRemote().sendText("hello world");
Assertions.assertEquals(">> stu: hello world", MESSAGES.poll(10, TimeUnit.SECONDS));
}
}
*/
}
Loading…
Cancel
Save