Echtzeit-API im Kontext von Apache Kafka

Hallo, alle miteinander. Bereits im Dezember startet bei OTUS ein neuer Stream des Software Architect- Kurses . Am Vorabend des Kursbeginns möchte ich Ihnen die Übersetzung eines interessanten Artikels mitteilen. Ich schlage außerdem vor, eine Demo-Lektion zum Thema "Idempotenz und Kommutierbarkeit der API in Warteschlangen und HTTP" anzusehen .






Eine der kniffligen Fragen, mit denen wir beim Entwerfen von Anwendungen und Systemen im Allgemeinen ständig konfrontiert sind, ist die effektive Organisation des Informationsaustauschs zwischen Komponenten bei gleichzeitiger Wahrung der Flexibilität, Schnittstellen zu ändern, ohne andere Teile des Systems übermäßig zu beeinträchtigen. Je spezifischer und optimierter eine Schnittstelle ist, desto wahrscheinlicher ist es, dass sie so situativ ist, dass eine Änderung ein vollständiges Umschreiben erfordert. Umgekehrt; Generika können sehr reaktionsschnell sein und weitgehend unterstützt werden, aber leider auf Kosten der Leistung.





(Events) , API (real-time APIs) , , ; , .





. , , - , , . : , , .





(state) - , NoSQL . , , - - . , (consumers) - , . (producers) , , .





, , , , . , . (payload) , , . :





  • : userLogin







    • : zbeeblebrox



      2020-08-17 16:26:39 BST







  • : CarParked







    • : A42 XYZ



      2020-08-17 16:36:27



      X42







  • : orderPlaced







    • :







      £2.25



      2020-08-17 16:35:41 BST











(, , ), (, , , , ).





, , , , - Apache Kafka®. Kafka - , :





  • Pub/Sub





    • () () , / .









    • , .









    • .





Kafka . , , , , , . , , , , NoSQL .





API-, Apache Kafka, , , .





API

, Kafka, , , , , , ( - ). , Kafka - . , , - Kafka (topic - Kafka, - ), .





Kafka . Kafka Go:





package main

import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    topic := "test_topic"
    p, _ := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092"})
    defer p.Close()

    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic,
            Partition: 0},
        Value: []byte("Hello world")}, nil)

}
      
      







Kafka , , , , , ( ).





, Kafka, . pub/sub, , , . Kafka , , , A/B-, , . , , . , RabbitMQ, ActiveMQ, .





package main

import (
    "fmt"

    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    topic := "test_topic"

    cm := kafka.ConfigMap{
        "bootstrap.servers":        "localhost:9092",
        "go.events.channel.enable": true,
        "group.id":                 "rmoff_01"}

    c, _ := kafka.NewConsumer(&cm)
    defer c.Close()
    c.Subscribe(topic, nil)

    for {
        select {
        case ev := <-c.Events():
            switch ev.(type) {

            case *kafka.Message:
                km := ev.(*kafka.Message)
                fmt.Printf("✅ Message '%v' received from topic '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
            }
        }
    }

}
      
      







Kafka, (Consumer Group). . -, Kafka , , , . -, - , , . Kafka , (, ).





, - . Kafka Connect API, .





Producer Consumer API Java, C/C++, Go, Python, Node.js . , HTTP Kafka? REST Proxy.





REST API Apache Kafka

, . , , , :





{
    "name": "NCP Sheffield",
    "space": "A42",
    "occupied": true
}
      
      







Kafka, . Kafka Confluent REST Proxy - REST-:





curl -X POST \
     -H "Content-Type: application/vnd.kafka.json.v2+json" \
     -H "Accept: application/vnd.kafka.v2+json" \
     --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' \
     "http://localhost:8082/topics/carpark"
      
      







, Consumer API, , REST-. Consumer API, , REST API, Consumer Group, (subscription). , REST API , :





curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/rmoff_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' \
 http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
      
      







:





curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
    {
        "topic": "carpark",
        "key": null,
        "value": {
            "name": "Sheffield NCP",
            "space": "A42",
            "occupied": true
        },
        "partition": 0,
        "offset": 0
    }
]
      
      







, . , REST-.





, Kafka. , pub/sub. - , , ? , , .





,

Apache Kafka pub/sub - iPhone . , … . Apache Kafka Kafka Streams API. Java Kafka . Kafka Streams, , Walmart, Ticketmaster Bloomberg, ksqlDB.





ksqlDB - , . API SQL Kafka. ksqlDB , , - .





ksqlDB :





CREATE STREAM CARPARK_EVENTS (NAME     VARCHAR,
                              SPACE    VARCHAR,
                              OCCUPIED BOOLEAN)
                        WITH (KAFKA_TOPIC='carpark',
                              VALUE_FORMAT='JSON');
      
      







ksqlDB , , . , . , , , :





SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
       SPACE
  FROM CARPARK_EVENTS
 WHERE NAME='Sheffield NCP'
   AND OCCUPIED=false
  EMIT CHANGES;
      
      



SQL-, , , , ( EMIT CHANGES). , push-, , , . ksqlDB pull- ( ), , , . , ksqlDB , , .





ksqlDB REST API, SQL :





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'
      
      







, :





{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
["2020-08-05 16:02:33","A42"]
["2020-08-05 16:07:31","D72"]
      
      







ksqlDB . SELECT



CREATE STREAM streamname AS



Kafka. , ksqlDB , , .. , Kafka. ksqlDB , , :





CREATE STREAM CARPARKS AS
    SELECT E.NAME AS NAME, E.SPACE,
           R.LOCATION, R.CAPACITY,
           E.OCCUPIED,
           CASE
               WHEN OCCUPIED=TRUE THEN 1
               ELSE -1
           END AS OCCUPIED_IND
    FROM   CARPARK_EVENTS E
           INNER JOIN
           CARPARK_REFERENCE R
           ON E.NAME = R.NAME;
      
      







, CASE , . CREATE STREAM Kafka, :





+----------------+-------+----------+----------------------------+----------+--------------+
|NAME            |SPACE  |OCCUPIED  |LOCATION                    |CAPACITY  |OCCUPIED_IND  |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP   |E48    |true      |{LAT=53.4265964, LON=-1.8426|1000      |1             |
|                |       |          |386}                        |          |              |
      
      







, , ksqlDB . , SQL, :





CREATE TABLE CARPARK_SPACES AS
    SELECT NAME,
           SUM(OCCUPIED_IND) AS OCCUPIED_SPACES
        FROM CARPARKS
        GROUP BY NAME;
      
      



ksqlDB REST API:





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
      
      







, , ( "pull- ", " push- ") , :





{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]
      
      



, -





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]
      
      



Java ksqlDB Python Go.





Apache Kafka , , , ( ), .





, , , , , :

















Apache Kafka Connect API, , Kafka. , Kafka S3 :





curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
    -d ' {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "carpark",
        "s3.bucket.name": "rmoff-carparks",
        "s3.region": "us-west-2",
        "flush.size": "1024",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat"
        }'
      
      







, , , S3. . , Snowflake, Kafka Connect; . Kafka Connect Kafka. , CARPARK_REFERENCE



, ksqlDB , (CDC - change data capture) , .





Apache Kafka , . , , , , , .





API Kafka , ksqlDB, , / . API , REST.





Apache Kafka, developer.confluent.io. Confluent Platform - Apache Kafka, , . , Confluent CloudGitHub Docker Compose , . - , Kafka, « - ».






- : " API HTTP".













All Articles