Wie funktioniert Auto-Commit in Kafka und können wir uns darauf verlassen?

In diesem Artikel möchte ich etwas detaillierter erklären, wie der Auto-Commit-Mechanismus für Listener in der kafka-clients-Bibliothek funktioniert (siehe Version 2.6.0).





In der Dokumentation finden Sie die folgende Formulierung, die beschreibt, wie das automatische Festschreiben funktioniert:





Auto-Commit funktioniert im Grunde genommen als Cron mit einem Zeitraum, der über die Konfigurationseigenschaft auto.commit.interval.ms festgelegt wird. Wenn der Consumer abstürzt, wird nach einem Neustart oder einer Neuverteilung die Position aller Partitionen des abgestürzten Consumer auf den zuletzt festgeschriebenen Offset zurückgesetzt.





Die Java-Dokumente für KafkaConsumer enthalten wiederum die folgende Beschreibung:





Der Verbraucher kann entweder automatisch automatisch Offsets festschreiben; Sie können diese festgeschriebene Position auch manuell steuern, indem Sie eine der Festschreibungs-APIs aufrufen (z. B. commitSync und commitAsync).





Aus diesen Formulierungen kann ein Missverständnis entstehen, dass ein nicht blockierendes automatisches Offset-Commit im Hintergrund auftritt, und es ist nicht ganz klar, wie es sich auf den Prozess des Empfangs von Nachrichten durch einen bestimmten Verbraucher bezieht und vor allem, welche Zustellgarantien wir haben ?





Schauen wir uns den Mechanismus zum Empfangen von Nachrichten durch den Listener mit der Einstellung enable.auto.commit = true am Beispiel der Implementierung der KafkaConsumer- Klasse aus der Bibliothek org.apache.kafka genauer an: kafka-clients: 2.6.0 





Betrachten Sie dazu das Beispiel im Java-Dokument KafkaConsumer :





Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  for (ConsumerRecord<String, String> record : records)
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
      
      



Wie erfolgt in diesem Fall das automatische Festschreiben? Die Antwort sollte in der Methode selbst zum Empfangen neuer Nachrichten gefunden werden. 





consumer.poll(Duration.ofMillis(100));
      
      



. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.





maybeAutoCommitOffsetsAsync





public void maybeAutoCommitOffsetsAsync(long now) {
  if (autoCommitEnabled) {
    nextAutoCommitTimer.update(now);
    if (nextAutoCommitTimer.isExpired()) {
      nextAutoCommitTimer.reset(autoCommitIntervalMs);
      doAutoCommitOffsetsAsync();
    }
  }
}
      
      



enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)





private void doAutoCommitOffsetsAsync() {
  Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
  log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);

  commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
    if (exception != null) {
      if (exception instanceof RetriableCommitFailedException) {
        log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
                  exception);
        nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
      } else {
        log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
      }
    } else {
      log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
    }
  });
}
      
      



poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync





poll KafkaConsumer:





  1. offset





  2. .





KafkaConsumer , . 





.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, . 





? “at least once delivery”, . 








All Articles