Hallo zusammen! Mein Name ist Vitaly, ich bin Entwickler bei Web3Tech. In diesem Beitrag werde ich die grundlegenden Konzepte und Konstrukte des Spring Cloud Stream-Frameworks für die Unterstützung und Zusammenarbeit mit Kafka-Nachrichtenbrokern mit einer vollständigen Schleife ihrer kontextbezogenen Komponententests vorstellen. Wir verwenden ein solches Schema in unserem Projekt der rein russischen elektronischen Abstimmung auf der Blockchain-Plattform von Waves Enterprise .
Als Teil des Spring Cloud-Projektteams basiert Spring Cloud Stream auf Spring Boot und verwendet Spring Integration, um die Kommunikation mit Nachrichtenbrokern bereitzustellen. Es lässt sich jedoch problemlos in verschiedene Nachrichtenbroker integrieren und erfordert nur eine minimale Konfiguration, um ereignisgesteuerte oder nachrichtengesteuerte Mikrodienste zu erstellen.
Konfiguration und Abhängigkeiten
Zuerst müssen wir die Abhängigkeit Spring-Cloud-Starter-Stream-Kafka zu build.gradle hinzufügen :
dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.cloud:spring-cloud-stream-test-support")
testImplementation("org.springframework.kafka:spring-kafka-test:springKafkaTestVersion")
}
In der Konfiguration des Spring Cloud Stream-Projekts müssen Sie die Kafka-Broker-URL, den Warteschlangennamen (Thema) und andere Bindungsparameter angeben. Hier ist ein Beispiel für eine YAML-Konfiguration für den Dienst application.yaml :
spring:
application:
name: cloud-stream-binding-kafka-app
cloud:
stream:
kafka:
binder:
brokers: 0.0.0.0:8080
configuration:
auto-offset-reset: latest
bindings:
customChannel: #Channel name
destination: 0.0.0.0:8080 #Destination to which the message is sent (topic)
group: input-group-N
contentType: application/json
consumer:
max-attempts: 1
autoCommitOffset: true
autoCommitOnError: false
Konzept und Unterricht
, , Spring Cloud Stream, , (SpringCloudStreamBindingKafkaApp.kt):
@EnableBinding(ProducerBinding::class)
@SpringBootApplication
class SpringCloudStreamBindingKafkaApp
fun main(args: Array<String>) {
SpringApplication.run(SpringCloudStreamBindingKafkaApp::class.java, *args)
}
@EnableBinding , .
.
Binding — , .
Binder — middleware .
Channel — middleware .
StreamListeners — (beans), , MessageConverter middleware “DTO”.
Message Schema — , . .
send/receive, producer consumer. , Spring Cloud Stream.
Producer Kafka, (ProducerBinding.kt):
interface ProducerBinding {
@Output(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
onsumer Kafka .
ConsumerBinding.kt:
interface ConsumerBinding {
companion object {
const val BINDING_TARGET_NAME = "customChannel"
}
@Input(BINDING_TARGET_NAME)
fun messageChannel(): MessageChannel
}
Consumer.kt:
@EnableBinding(ConsumerBinding::class)
class Consumer(val messageService: MessageService) {
@StreamListener(target = ConsumerBinding.BINDING_TARGET_NAME)
fun process(
@Payload message: Map<String, Any?>,
@Header(value = KafkaHeaders.OFFSET, required = false) offset: Int?
) {
messageService.consume(message)
}
}
Kafka . Kafka, spring-kafka-test.
MessageCollector
, . ProducerBinding payload ProducerTest.kt:
@SpringBootTest
class ProducerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var messageCollector: MessageCollector
@Test
fun `should produce somePayload to channel`() {
// ARRANGE
val request = mapOf(1 to "foo", 2 to "bar", "three" to 10101)
// ACT
producerBinding.messageChannel().send(MessageBuilder.withPayload(request).build())
val payload = messageCollector.forChannel(producerBinding.messageChannel())
.poll()
.payload
// ASSERT
val payloadAsMap = jacksonObjectMapper().readValue(payload.toString(), Map::class.java)
assertTrue(request.entries.stream().allMatch { re ->
re.value == payloadAsMap[re.key.toString()]
})
messageCollector.forChannel(producerBinding.messageChannel()).clear()
}
}
Embedded Kafka
@ClassRule . Kafka Zookeeper , . Kafka Zookeper (ConsumerTest.kt):
@SpringBootTest
@ActiveProfiles("test")
@EnableAutoConfiguration(exclude = [TestSupportBinderAutoConfiguration::class])
@EnableBinding(ProducerBinding::class)
class ConsumerTest {
@Autowired
lateinit var producerBinding: ProducerBinding
@Autowired
lateinit var objectMapper: ObjectMapper
@MockBean
lateinit var messageService: MessageService
companion object {
@ClassRule @JvmField
var embeddedKafka = EmbeddedKafkaRule(1, true, "any-name-of-topic")
}
@Test
fun `should consume via txConsumer process`() {
// ACT
val request = mapOf(1 to "foo", 2 to "bar")
producerBinding.messageChannel().send(MessageBuilder.withPayload(request)
.setHeader("someHeaderName", "someHeaderValue")
.build())
// ASSERT
val requestAsMap = objectMapper.readValue<Map<String, Any?>>(objectMapper.writeValueAsString(request))
runBlocking {
delay(20)
verify(messageService).consume(requestAsMap)
}
}
}
In diesem Beitrag habe ich die Funktionen von Spring Cloud Stream und die Verwendung mit Kafka demonstriert. Spring Cloud Stream bietet eine benutzerfreundliche Oberfläche mit vereinfachten Nuancen der Brokerkonfiguration, ist schnell implementiert, funktioniert stabil und unterstützt moderne beliebte Broker wie Kafka. Als Ergebnis gab ich eine Reihe von Beispielen mit Unit-Tests basierend auf der EmbeddedKafkaRule unter Verwendung des MessageCollector.
Alle Quellen finden Sie auf Github . Danke fürs Lesen!