Verwenden der Spring Cloud Stream-Bindung mit Kafka Message Broker

Hallo zusammen! Mein Name ist Vitaly, ich bin Entwickler bei Web3Tech. In diesem Beitrag werde ich die grundlegenden Konzepte und Konstrukte des Spring Cloud Stream-Frameworks für die Unterstützung und Zusammenarbeit mit Kafka-Nachrichtenbrokern mit einer vollständigen Schleife ihrer kontextbezogenen Komponententests vorstellen. Wir verwenden ein solches Schema in unserem Projekt der rein russischen elektronischen Abstimmung auf der Blockchain-Plattform von Waves Enterprise .





Als Teil des Spring Cloud-Projektteams basiert Spring Cloud Stream auf Spring Boot und verwendet Spring Integration, um die Kommunikation mit Nachrichtenbrokern bereitzustellen. Es lässt sich jedoch problemlos in verschiedene Nachrichtenbroker integrieren und erfordert nur eine minimale Konfiguration, um ereignisgesteuerte oder nachrichtengesteuerte Mikrodienste zu erstellen.





Konfiguration und Abhängigkeiten

Zuerst müssen wir die Abhängigkeit Spring-Cloud-Starter-Stream-Kafka zu build.gradle hinzufügen :





dependencies {
   implementation(kotlin("stdlib"))
   implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
   implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

   implementation("org.springframework.boot:spring-boot-starter-web")
   implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

   testImplementation("org.springframework.boot:spring-boot-starter-test")
   testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
   testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
      
      



In der Konfiguration des Spring Cloud Stream-Projekts müssen Sie die Kafka-Broker-URL, den Warteschlangennamen (Thema) und andere Bindungsparameter angeben. Hier ist ein Beispiel für eine YAML-Konfiguration für den Dienst application.yaml :





spring:
 application:
   name: cloud-stream-binding-kafka-app
 cloud:
   stream:
     kafka:
       binder:
         brokers: 0.0.0.0:8080
         configuration:
           auto-offset-reset: latest
     bindings:
       customChannel:                   #Channel name
         destination: 0.0.0.0:8080      #Destination to which the message is sent (topic)
         group: input-group-N
         contentType: application/json
         consumer:
           max-attempts: 1
           autoCommitOffset: true
           autoCommitOnError: false
      
      



Konzept und Unterricht

, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):





@EnableBinding(ProducerBinding::class)

@SpringBootApplication
 
 class SpringCloudStreamBindingKafkaApp

 fun main(args: Array<String>) {

    SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)

 }
      
      



@EnableBinding , .





.





Binding — , .

Binder — middleware .

Channel — middleware .

StreamListeners — (beans), , MessageConverter middleware “DTO”.

Message Schema — , . .





send/receive, producer consumer. , Spring Cloud Stream.





Producer Kafka, (ProducerBinding.kt):





interface ProducerBinding {

   @Output(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



onsumer Kafka .





ConsumerBinding.kt:





interface ConsumerBinding {

   companion object {
       const val BINDING_TARGET_NAME = "customChannel"
   }

   @Input(BINDING_TARGET_NAME)
   fun messageChannel(): MessageChannel
}
      
      



Consumer.kt:





@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {

   @StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
   fun process(
       @Payload message: Map<String, Any?>,
       @Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
   ) {
       messageService.consume(message)
   }
}
      
      



Kafka . Kafka, spring-kafka-test.





MessageCollector

, . ProducerBinding payload ProducerTest.kt:





@SpringBootTest
class ProducerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var messageCollector: MessageCollector

   @Test
   fun `should produce somePayload to channel`() {
       // ARRANGE
       val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)

       // ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
       val payload = messageCollector.forChannel(producerBinding.messageChannel())
           .poll()
           .payload

       // ASSERT
       val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
       assertTrue(request.entries.stream().allMatch { re ->
           re.value == payloadAsMap[re.key.toString()]
       })

       messageCollector.forChannel(producerBinding.messageChannel()).clear()
   }
}
      
      



Embedded Kafka

@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):





@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {

   @Autowired
   lateinit var producerBinding: ProducerBinding

   @Autowired
   lateinit var objectMapper: ObjectMapper

   @MockBean
   lateinit var messageService: MessageService

   companion object {
       @ClassRule @JvmField
       var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
   }

   @Test
   fun `should consume via txConsumer process`() {
       // ACT
       val request = mapOf(1 to "foo", 2 to "bar")
       producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
           .setHeader("someHeaderName", "someHeaderValue")
           .build())

       // ASSERT
       val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
       runBlocking {
           delay(20)
           verify(messageService).consume(requestAsMap)
       }
   }
}
      
      



In diesem Beitrag habe ich die Funktionen von Spring Cloud Stream und die Verwendung mit Kafka demonstriert. Spring Cloud Stream bietet eine benutzerfreundliche Oberfläche mit vereinfachten Nuancen der Brokerkonfiguration, ist schnell implementiert, funktioniert stabil und unterstützt moderne beliebte Broker wie Kafka. Als Ergebnis gab ich eine Reihe von Beispielen mit Unit-Tests basierend auf der EmbeddedKafkaRule unter Verwendung des MessageCollector.





Alle Quellen finden Sie auf Github . Danke fürs Lesen!








All Articles