Browse Source

Add in memory messaging test and test basic user-message and timestamp

master
Jonathan Christison 5 years ago
parent
commit
6fe78f815c
  1. 5
      quarkus-kafaka-homework/kafka-producer/pom.xml
  2. 9
      quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties
  3. 22
      quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java
  4. 36
      quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java

5
quarkus-kafaka-homework/kafka-producer/pom.xml

@ -75,6 +75,11 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

9
quarkus-kafaka-homework/kafka-producer/src/main/resources/application.properties

@ -24,3 +24,12 @@ mp.messaging.outgoing.user-message.value.serializer=org.apache.kafka.common.seri
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
#Tests
#%test.mp.messaging.incoming.user-message.connector=smallrye-kafka
#%test.mp.messaging.incoming.user-message.topic=user-message
#%test.mp.messaging.incoming.user-message.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#%test.mp.messaging.incoming.timestamp.connector=smallrye-kafka
#%test.mp.messaging.incoming.timestamp.topic=timestamp
#%test.mp.messaging.incoming.timestamp.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

22
quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/KafkaResource.java

@ -1,20 +1,32 @@
package me.jochrist.kafka;
import java.util.Collections;
import java.util.Map;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import org.testcontainers.containers.KafkaContainer;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import java.util.HashMap;
import java.util.Map;
public class KafkaResource implements QuarkusTestResourceLifecycleManager {
private final KafkaContainer kafka = new KafkaContainer();
@Override
public Map<String, String> start() {
kafka.start();
return Collections.singletonMap("kafka.bootstrap.servers", kafka.getBootstrapServers());
public Map<String, String> start()
{
Map<String,String> env = new HashMap<>();
Map<String, String> timestampInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("timestamp");
Map<String, String> userMessageInMemoryOutgoing = InMemoryConnector.switchOutgoingChannelsToInMemory("user-message");
//Map<String, String> timestampInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("timestamp");
//Map<String, String> userMessageInMemoryIncoming = InMemoryConnector.switchIncomingChannelsToInMemory("user-message");
//env.putAll(timestampInMemoryIncoming);
//env.putAll(userMessageInMemoryIncoming);
env.putAll(timestampInMemoryOutgoing);
env.putAll(userMessageInMemoryOutgoing);
return env;
}
@Override
public void stop() {
kafka.close();
InMemoryConnector.clear();
}
}

36
quarkus-kafaka-homework/kafka-producer/src/test/java/me/jochrist/kafka/ProducerTest.java

@ -1,54 +1,50 @@
package me.jochrist.kafka;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.is;
import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySource;
import io.smallrye.reactive.messaging.connectors.InMemorySink;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.enterprise.inject.Any;
import javax.inject.Inject;
import io.quarkus.test.common.QuarkusTestResource;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.sse.SseEventSource;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@QuarkusTest
@QuarkusTestResource(KafkaResource.class)
public class ProducerTest {
@Inject @Any
InMemoryConnector connector;
//See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/204
//As we dont need to return anything 204 is still a success
@Test
public void testMessage() {
//InMemorySink<String> timestamps = connector.sink("timestamp");
String message = "test";
InMemorySink<String> userMessageSink = connector.sink("user-message");
given()
.when().get("/kafka/message/test")
.when().get("/kafka/message/"+message)
.then()
.statusCode(204);
//InMemorySink<Integer> results = connector.sink("user-message");
}
Assertions.assertEquals(1, userMessageSink.received().size(), "In memory messaging sink is the wrong size");
/*
@Inject @Any
InMemoryConnector connector;
String pl = userMessageSink.received().get(0).getPayload();
Assertions.assertEquals(message, pl, "In memory messaging sink has the wrong string, have we picked it up out of order?");
}
@Test
public void testTimestamp()
{
InMemorySink<String> timestamps = connector.sink("timestamp");
System.out.println(timestamps);
InMemorySink<String> timestampSink = connector.sink("timestamp");
await().atMost(5, TimeUnit.SECONDS).until(() -> timestampSink.received().size() > 0);
}
*/
}

Loading…
Cancel
Save