Einführung
Es gibt verschiedene Möglichkeiten, Tests mit Apache Kafka zu schreiben. Beispielsweise können TestContainer und EmbeddedKafka verwendet werden . Sie können dies beispielsweise hier lesen: Die Fallstricke beim Testen von Kafka Streams . Es gibt aber auch die Möglichkeit, Tests mit KafkaServer zu schreiben.
Was wird getestet?
Angenommen, Sie müssen einen Dienst zum Senden von Nachrichten über verschiedene Kanäle entwickeln: E-Mail, Telegramm usw.
Der Dienstname sei: SenderService.
Der Dienst muss: den angegebenen Kanal abhören, die benötigten Nachrichten aus dem Kanal auswählen, Nachrichten analysieren und über den gewünschten Kanal für die endgültige Zustellung von Nachrichten senden.
Um den Dienst zu testen, müssen Sie eine Nachricht erstellen, die über den E-Mail-Sendekanal gesendet werden soll, und sicherstellen, dass die Nachricht an den endgültigen Kanal gesendet wurde.
In realen Anwendungen werden Tests natürlich schwieriger. Um den gewählten Ansatz zu veranschaulichen, ist ein solcher Test jedoch ausreichend.
Der Dienst und der Test werden implementiert mit: Java 1.8, Kafka 2.1.0, JUnit 5.5.2, Maven 3.6.1.
Bedienung
Der Dienst kann seine Arbeit starten und stoppen.
void start()
void stop()
Zu Beginn müssen Sie mindestens folgende Parameter einstellen:
String bootstrapServers
String senderTopic
EmailService emailService
bootstrapServers – kafka.
senderTopic – , .
emailService – .
.
«», , . «» . «» : Introducing the Kafka Consumer: Getting Started with the New Apache Kafka 0.9 Consumer Client.
Collection<AutoCloseable> closeables = new ArrayList<>();
ExecutorService senderTasksExecutor = Executors.newFixedThreadPool(senderTasksN);
ExecutorService tasksExecutorService = Executors.newFixedThreadPool(tasksN);
for (int i = 0; i < senderTasksN; i++) {
SenderConsumerLoop senderConsumerLoop =
new SenderConsumerLoop(
bootstrapServers,
senderTopic,
"sender",
"sender",
tasksExecutorService,
emailService
);
closeables.add(senderConsumerLoop);
senderTasksExecutor.submit(senderConsumerLoop);
}
«», .
«» . .
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (AutoCloseable autoCloseable : closeables) {
try {
autoCloseable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
senderTasksExecutor.shutdown();
tasksExecutorService.shutdown();
stop();
try {
senderTasksExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
.
«»
«» :
void run()
void close()
: run.
@Override
public void run() {
kafkaConsumer = createKafkaConsumerStringString(bootstrapServers, clientId, groupId);
kafkaConsumer.subscribe(Collections.singleton(topic));
while (true) {
calculate(kafkaConsumer.poll(Duration.ofSeconds(1)));
}
}
«kafka-». «kafka-» . . .
json- , , .
:
{
"subject": {
"subject_type": "send"
},
"body": {
"method": "email",
"recipients": "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml",
"title": "42",
"message": "73"
}
}
subject_type — . «send».
method – . «email» — .
recipients – .
title – .
message – .
:
void calculate(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
calculate(record);
}
}
:
void calculate(ConsumerRecord<String, String> record) {
JSONParser jsonParser = new JSONParser();
Object parsedObject = null;
try {
parsedObject = jsonParser.parse(record.value());
} catch (ParseException e) {
e.printStackTrace();
}
if (parsedObject instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) parsedObject;
JSONObject jsonSubject = (JSONObject) jsonObject.get(SUBJECT);
String subjectType = jsonSubject.get(SUBJECT_TYPE).toString();
if (SEND.equals(subjectType)) {
JSONObject jsonBody = (JSONObject) jsonObject.get(BODY);
calculate(jsonBody);
}
}
}
:
void calculate(JSONObject jsonBody) {
String method = jsonBody.get(METHOD).toString();
if (EMAIL_METHOD.equals(method)) {
String recipients = jsonBody.get(RECIPIENTS).toString();
String title = jsonBody.get(TITLE).toString();
String message = jsonBody.get(MESSAGE).toString();
sendEmail(recipients, title, message);
}
}
:
void sendEmail(String recipients, String title, String message) {
tasksExecutorService.submit(() -> emailService.send(recipients, title, message));
}
.
.
«kafka-»:
static KafkaConsumer<String, String> createKafkaConsumerStringString(
String bootstrapServers,
String clientId,
String groupId
) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(properties);
}
:
interface EmailService {
void send(String recipients, String title, String message);
}
.
«kafka-».
«kafka-».
.
«kafka-». .
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
}
}
}
. «kafka-». «kafka-» . .
«mock» :
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
:
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
:
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
:
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
:
Thread.sleep(6000);
, :
verify(emailService).send(recipients, title, message);
:
senderService.stop();
:
public class SenderServiceTest {
@Test
void consumeEmail() throws InterruptedException {
String brokerHost = "127.0.0.1";
int brokerPort = 29092;
String bootstrapServers = brokerHost + ":" + brokerPort;
String senderTopic = "sender_data";
try (KafkaServerService kafkaServerService = new KafkaServerService(brokerHost, brokerPort)) {
kafkaServerService.start();
kafkaServerService.createTopic(senderTopic);
SenderService.EmailService emailService = mock(SenderService.EmailService.class);
SenderService senderService = new SenderService(bootstrapServers, senderTopic, emailService);
senderService.start();
String recipients = "mrbrown@ml.ml;mrblack@ml.ml;mrwhite@ml.ml";
String title = "42";
String message = "73";
kafkaServerService.send(senderTopic, key(), createMessage(EMAIL_METHOD, recipients, title, message));
Thread.sleep(6000);
verify(emailService).send(recipients, title, message);
senderService.stop();
}
}
}
:
public class SenderFactory {
public static final String SUBJECT = "subject";
public static final String SUBJECT_TYPE = "subject_type";
public static final String BODY = "body";
public static final String METHOD = "method";
public static final String EMAIL_METHOD = "email";
public static final String RECIPIENTS = "recipients";
public static final String TITLE = "title";
public static final String MESSAGE = "message";
public static final String SEND = "send";
public static String key() {
return UUID.randomUUID().toString();
}
public static String createMessage(String method, String recipients, String title, String message) {
Map<String, Object> map = new HashMap<>();
Map<String, Object> subject = new HashMap<>();
Map<String, Object> body = new HashMap<>();
map.put(SUBJECT, subject);
subject.put(SUBJECT_TYPE, SEND);
map.put(BODY, body);
body.put(METHOD, method);
body.put(RECIPIENTS, recipients);
body.put(TITLE, title);
body.put(MESSAGE, message);
return JSONObject.toJSONString(map);
}
}
«kafka-»
:
void start()
void close()
void createTopic(String topic)
«start» .
Erstellen Sie "zookeeper" und speichern Sie seine Adresse:
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
Erstellen Sie einen Zookeeper-Client:
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Festlegen von Eigenschaften für den Server:
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
Servererstellung:
kafkaServer = TestUtils.createServer(config, new MockTime());
Alle zusammen:
public void start() {
zkServer = new EmbeddedZookeeper();
String zkConnect = zkHost + ":" + zkServer.port();
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = ZkUtils.apply(zkClient, false);
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
try {
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
brokerProps.setProperty("listeners", "PLAINTEXT://" + brokerHost + ":" + brokerPort);
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
kafkaServer = TestUtils.createServer(config, new MockTime());
}
Beenden des Dienstes:
@Override
public void close() {
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
Themenerstellung:
public void createTopic(String topic) {
AdminUtils.createTopic(
zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
}
Fazit
Abschließend ist anzumerken, dass der hier angegebene Code nur die gewählte Methode veranschaulicht.
Um Dienste mit "kafka" zu erstellen und zu testen, können Sie auf die folgende Ressource verweisen:
kafka-Streams-Beispiele