Eine ereignisgesteuerte Beispielanwendung, die auf Webhooks im Mail.ru Cloud Solutions S3-Objektspeicher basiert



Rube Goldberg Kaffeemaschine Eine



ereignisgesteuerte Architektur erhöht die Kosteneffizienz der verwendeten Ressourcen, da sie nur dann verwendet werden, wenn sie benötigt werden. Es gibt viele Möglichkeiten, dies zu implementieren und keine zusätzlichen Cloud-Entitäten als Worker-Anwendungen zu erstellen. Und heute werde ich nicht über FaaS sprechen, sondern über Webhooks. Ich zeige Ihnen ein Tutorial-Beispiel für die Behandlung von Ereignissen mit Object Storage-Webhooks.



Ein paar Worte zu Objektspeicherung und Webhooks. Mit Objektspeichern können Sie alle Daten in der Cloud als Objekte speichern, auf die über S3 oder eine andere API (abhängig von der Implementierung) über HTTP / HTTPS zugegriffen werden kann. Webhooks sind im Allgemeinen benutzerdefinierte HTTP-Rückrufe. Sie werden normalerweise durch ein Ereignis ausgelöst, z. B. eine Codeübermittlung an ein Repository oder einen in einem Blog veröffentlichten Kommentar. Wenn ein Ereignis eintritt, sendet die Ursprungssite eine HTTP-Anforderung an die für den Webhook angegebene URL. Infolgedessen können Sie Ereignisse auf einer Site Aktionen auf einer anderen Site auslösen lassen ( Wiki ). Wenn es sich bei der Quellwebsite um Objektspeicher handelt, werden die Ereignisse an ihrem Inhalt geändert.



Beispiele für einfache Fälle, in denen eine solche Automatisierung verwendet werden kann:



  1. . « », .
  2. , , .
  3. ( , , , ).
  4. , , Kubernetes, , .


Als Beispiel erstellen wir eine Variante von Aufgabe 1, wenn Änderungen im Objektspeicher-Bucket von Mail.ru Cloud Solutions (MCS) mithilfe von Webhooks im AWS-Objektspeicher synchronisiert werden. In einem real geladenen Fall sollten Sie für asynchrone Arbeit sorgen, indem Sie Webhooks in der Warteschlange registrieren. Für die Lernaufgabe werden wir die Implementierung jedoch ohne diese durchführen.



Arbeitsschema



Das Kommunikationsprotokoll wird ausführlich im S3-Webhooks-Handbuch zu MCS beschrieben . Das Arbeitsschema enthält folgende Elemente:



  • Ein Veröffentlichungsdienst , der sich auf der S3-Seite befindet und HTTP-Anforderungen veröffentlicht, wenn ein Webnhook ausgelöst wird.
  • Ein Webhook-Empfangsserver , der auf Anforderungen vom HTTP-Veröffentlichungsdienst wartet und entsprechende Maßnahmen ergreift. Der Server kann in jeder Sprache geschrieben werden. In unserem Beispiel schreiben wir den Server in Go.


Die Besonderheit der Webhook-Implementierung in der S3-API ist die Registrierung des Webhook-Empfangsservers beim Veröffentlichungsdienst. Insbesondere muss der Webhook-Empfangsserver das Abonnement für die Veröffentlichungsdienstnachrichten bestätigen (in anderen Webhook-Implementierungen ist es normalerweise nicht erforderlich, das Abonnement zu bestätigen).



Dementsprechend muss der Webhook-Empfangsserver zwei Hauptvorgänge unterstützen:



  • auf eine Anfrage des Publikationsdienstes zur Bestätigung der Registrierung antworten,
  • eingehende Ereignisse verarbeiten.


Installieren des Servers zum Empfangen von Webhooks



Um den Webhook-Empfangsserver auszuführen, benötigen Sie einen Linux-Server. In diesem Artikel verwenden wir als Beispiel eine virtuelle Instanz, die wir für MCS bereitstellen.



Installieren Sie die erforderliche Software und starten Sie den Webhook-Server.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...


Klonen Sie den Ordner mit dem Webhook-Empfangsserver:



ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.


Starten wir den Server:



ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80


Abonnieren des Veröffentlichungsdienstes



Sie können Ihren Server für den Empfang von Webhooks über eine API oder eine Weboberfläche registrieren. Der Einfachheit halber werden wir uns über die Weboberfläche registrieren:



  1. Gehen Sie zum Bereich Eimer im Kontrollraum.
  2. Wir gehen zum Eimer, für den wir Webhooks einrichten, und klicken auf die Ausrüstung:






Gehen Sie zur Registerkarte Webhooks und klicken Sie auf Hinzufügen:





Füllen Sie die Felder aus:







ID - Der Name des Webhooks.



Ereignis - welche Ereignisse gesendet werden sollen. Wir haben die Übertragung aller Ereignisse festgelegt, die beim Arbeiten mit Dateien auftreten (Hinzufügen und Löschen).



URL - Adresse des Webhook-Empfangsservers.



Filterpräfix / -suffix ist ein Filter, mit dem Webhooks nur für Objekte generiert werden können, deren Namen bestimmten Regeln entsprechen. Damit der Webhook nur für Dateien mit der Erweiterung .png funktioniert, schreiben Sie beispielsweise "png" in das Filtersuffix .



Derzeit werden nur die Ports 80 und 443 für den Zugriff auf den Webhook-Empfangsserver unterstützt.



Klicken Sie auf





Hook hinzufügen und sehen Sie Folgendes: Hook hinzugefügt.



Der Server zum Empfangen von Webhooks in den Protokollen zeigt den Fortschritt des Hook-Registrierungsprozesses an:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d


Die Registrierung ist beendet. Im nächsten Abschnitt werden wir uns den Algorithmus für den Server, der Webhooks empfängt, genauer ansehen.



Beschreibung des Servers zum Empfangen von Webhooks



In unserem Beispiel ist der Server in Go geschrieben. Lassen Sie uns die Grundprinzipien seiner Arbeit analysieren.



package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pong\n")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}


Betrachten wir die Hauptfunktionen:



  • Ping () ist eine Route, die mit URL / Ping antwortet, der einfachsten Implementierung einer Liveness-Sonde.
  • Webhook () - Hauptroute, URL / Webhook-Handler:

    • bestätigt die Registrierung beim Veröffentlichungsdienst (Übergang zur SubscriptionConfirmation-Funktion),
    • verarbeitet eingehende Webhooks (Gotrecords-Funktion).
  • Die Funktionen HmacSha256 und HmacSha256hex sind Implementierungen der Verschlüsselungsalgorithmen HMAC-SHA256 und HMAC-SHA256, wobei die Ausgabe als Folge von Hexadezimalzahlen für die Signatursubtraktion ausgegeben wird.
  • main ist die Hauptfunktion, verarbeitet Befehlszeilenparameter und registriert URL-Handler.


Vom Server akzeptierte Befehlszeilenparameter:



  • -port ist der Port, den der Server abhört.
  • -address ist die IP-Adresse, die der Server abhört.
  • -script ist ein externes Programm, das bei jedem eingehenden Hook aufgerufen wird.


Schauen wir uns einige Funktionen genauer an:



//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
"\"Type\":\"SubscriptionConfirmation\"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}


Diese Funktion bestimmt, was gekommen ist - eine Anfrage zur Bestätigung der Registrierung oder ein Webhook. Wie aus der Dokumentation hervorgeht , enthält die Registrierungsanforderung im Falle einer Registrierungsbestätigung die folgende Json-Struktur:



POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}


Diese Anfrage muss beantwortet werden:



content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}


Wo die Unterschrift berechnet wird als:



signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))


Wenn ein Webhook eintrifft, sieht die Struktur der Post-Anfrage folgendermaßen aus:



POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}


Dementsprechend müssen Sie je nach Anforderung verstehen, wie die Daten verarbeitet werden. Ich habe einen Datensatz als Indikator ausgewählt "Type":"SubscriptionConfirmation", da er in der Anfrage zur Bestätigung des Abonnements enthalten ist und nicht im Webhook enthalten ist. Basierend auf dem Vorhandensein / Fehlen dieses Datensatzes in der POST-Anforderung geht die weitere Programmausführung entweder in eine Funktion SubscriptionConfirmationoder in eine Funktion GotRecords.



Wir werden die SubscriptionConfirmation-Funktion nicht im Detail betrachten, sie wird gemäß den in der Dokumentation festgelegten Prinzipien implementiert . Sie können den Quellcode für diese Funktion im Git-Repository des Projekts überprüfen .



Die GotRecords-Funktion analysiert die eingehende Anforderung und ruft für jedes Record-Objekt ein externes Skript (dessen Name im Parameter -script übergeben wurde) mit den folgenden Parametern auf:



  • Eimername
  • Objektschlüssel
  • Handlung:

    • Kopieren - wenn in der ursprünglichen Anforderung EventName = ObjectCreated | PutObject | PutObjectCopy
    • löschen - wenn in der ursprünglichen Anforderung EventName = ObjectRemoved | DeleteObject


Wenn also ein Hook mit einer Post-Anforderung wie oben beschrieben und dem Parameter -script = script.sh eintrifft, wird das Skript wie folgt aufgerufen:



script.sh  bucketA some-file-to-bucket copy


Es versteht sich, dass dieser Webhook-Empfangsserver keine vollständige Produktionslösung ist, sondern ein vereinfachtes Beispiel für eine mögliche Implementierung.



Beispiel der Arbeit



Synchronisieren wir die Dateien des Haupt-Buckets in MCS mit dem Backup-Bucket in AWS. Der Haupt-Bucket heißt myfiles-ash, die Sicherung ist myfiles-backup (die Konfiguration eines Buckets unter AWS liegt außerhalb des Geltungsbereichs dieses Artikels). Wenn eine Datei im Haupt-Bucket abgelegt wird, sollte ihre Kopie in der Sicherung erscheinen. Wenn sie aus der Haupt-Datei gelöscht wird, sollte sie in der Sicherung gelöscht werden.



Wir werden mit Buckets arbeiten, die das Dienstprogramm awscli verwenden, mit dem sowohl MCS-Cloud-Speicher als auch AWS-Cloud-Speicher kompatibel sind.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...


Konfigurieren wir den Zugriff auf die S3 MCS-API:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:


Konfigurieren Sie den Zugriff auf die AWS S3-API:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:


Lassen Sie uns die Zugriffe überprüfen:



Zu AWS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup


Fügen Sie für MCS, wenn der Befehl ausgeführt wird, --endpoint-url hinzu:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash


Zugriff.



Schreiben wir nun ein Skript zur Behandlung des eingehenden Hooks und nennen es s3_backup_mcs_aws.sh



#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: ${0} sourcebucket sourcefile copy/delete"
    exit 1
    ;;
esac


Wir starten den Server:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh


Überprüfen Sie, wie es funktioniert. Fügen Sie über die Weboberfläche MCS die Datei test.txt in den Bucket myfiles-ash ein. In den Protokollen in der Konsole können Sie sehen, dass eine Anforderung an den Webhook-Server gesendet wurde:



2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt


Lassen Sie uns den Inhalt des myfiles-backup-Buckets in AWS überprüfen:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt


Löschen Sie nun über die Weboberfläche die Datei aus dem Eimer myfiles-ash.



Serverprotokolle:



2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt


Eimerinhalt:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$


Datei gelöscht, Problem gelöst.



Fazit und ToDo



Der gesamte in diesem Artikel verwendete Code befindet sich in meinem Repository . Es gibt auch Beispiele für Skripte und Beispiele für das Zählen von Signaturen zum Registrieren von Webhooks.



Dieser Code ist nichts weiter als ein Beispiel dafür, wie Sie S3-Webhooks in Ihren Aktivitäten verwenden können. Wie eingangs erwähnt, müssen Sie, wenn Sie einen solchen Server in der Produktion verwenden möchten, den Server zumindest für asynchrone Arbeiten neu schreiben: Eingehende Webhooks in einer Warteschlange (RabbitMQ oder NATS) registrieren und von dort aus von Worker-Anwendungen zerlegen und verarbeiten. Andernfalls kann es bei der massiven Ankunft von Webhooks zu einem Mangel an Serverressourcen für die Ausführung von Aufgaben kommen. Das Vorhandensein von Warteschlangen ermöglicht es Ihnen, den Server und die Mitarbeiter zu verteilen und Probleme mit sich wiederholenden Aufgaben im Falle von Fehlern zu lösen. Es ist auch wünschenswert, die Protokollierung auf eine detailliertere und standardisiertere zu ändern.



Viel Glück!



Lesen Sie mehr zum Thema:






All Articles