DelayedQueue-Warteschlange

Vor einigen Jahren mussten wir in einem unserer Projekte die Ausführung einer Aktion um einen bestimmten Zeitraum verschieben. Ermitteln Sie beispielsweise den Status der Zahlung in drei Stunden oder senden Sie die Benachrichtigung nach 45 Minuten erneut. Zu diesem Zeitpunkt fanden wir jedoch keine geeigneten Bibliotheken, die "verschoben" werden konnten und keine zusätzliche Zeit für Konfiguration und Betrieb benötigten. Wir haben die möglichen Optionen analysiert und unsere eigene kleine Bibliothek für verzögerte Warteschlangen in Java mit Redis als Repository geschrieben. In diesem Artikel werde ich über die Fähigkeiten der Bibliothek, ihre Alternativen und die "Rechen" sprechen, auf die wir dabei gestoßen sind.



Funktionalität



Was macht die verzögerte Warteschlange? Ein der ausstehenden Warteschlange hinzugefügtes Ereignis wird im angegebenen Zeitintervall an den Handler übermittelt. Wenn die Verarbeitung fehlschlägt, wird das Ereignis später erneut zugestellt. Darüber hinaus ist die maximale Anzahl von Versuchen begrenzt. Redis garantiert keine Sicherheit und Sie müssen auf den Verlust von Ereignissen vorbereitet sein. In der Cluster-Version weist Redis jedoch eine ziemlich hohe Zuverlässigkeit auf, und wir haben dies in anderthalb Betriebsjahren noch nie erlebt.



API



Fügen Sie der Warteschlange ein Ereignis hinzu



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();


Beachten Sie, dass die Methode zurückgegeben Monowird. Zum Ausführen müssen Sie einen der folgenden Schritte ausführen:



  • subscribe(...)
  • block()


Ausführlichere Erläuterungen finden Sie in der Dokumentation zu Project Reactor. Der Kontext wird dem Ereignis wie folgt hinzugefügt:



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();


Event-Handler registrieren



eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);


, :



eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);




eventService.removeHandler(DummyEvent.class);




"-":



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();


:



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();


( Redis) eventService.close() , @javax.annotation.PreDestroy.





- , . :



  • , Redis;
  • , ( "delayed.queue.ready.for.handling.count" )




, delayed queue. 2018

Amazon Web Services.

, . : " , Amazon-, ".





:





- , JMS . SQS , 15 .





" " . , Redis :





, Netflix dyno-queues

. , , .



, " " sorted set list, ( ):



var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
  var payload = extractPayload(key);
  var listName = extractType(key);
  redis.lpush(listName, payload);
  redis.zrem("delayed_events", key);
});


Spring Integration, :



redis.brpop(listName)


.





"list" (, ), list . Redis , 2 .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




list-a . , . "sorted_set" .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zadd("delayed_events", nextAttempt(key))
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




, , " " "delayed queue" . "sorted set"

metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .



var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);




var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;

redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();




, . , list . TTL :



redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());




Spring, . " " :





Lettuce , . Project Reactor , " ".

, Subscriber



redis
  .reactive()
  .brpop(timeout, queue)
  .map(e -> deserialize(e))
  .subscribe(new InnerSubscriber<>(handler, ... params ..))




class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {

    @Override
    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
        Mono<Boolean> promise = handler.apply(envelope.getPayload());
        promise.subscribe(r -> request(1));
    }
}


, ( Netflix dyno queue, poll- ).



?



  • Kotlin DSL. Kotlin suspend fun API Project Reactor


Links






All Articles