Vor einigen Jahren mussten wir in einem unserer Projekte die Ausführung einer Aktion um einen bestimmten Zeitraum verschieben. Ermitteln Sie beispielsweise den Status der Zahlung in drei Stunden oder senden Sie die Benachrichtigung nach 45 Minuten erneut. Zu diesem Zeitpunkt fanden wir jedoch keine geeigneten Bibliotheken, die "verschoben" werden konnten und keine zusätzliche Zeit für Konfiguration und Betrieb benötigten. Wir haben die möglichen Optionen analysiert und unsere eigene kleine Bibliothek für verzögerte Warteschlangen in Java mit Redis als Repository geschrieben. In diesem Artikel werde ich über die Fähigkeiten der Bibliothek, ihre Alternativen und die "Rechen" sprechen, auf die wir dabei gestoßen sind.
Funktionalität
Was macht die verzögerte Warteschlange? Ein der ausstehenden Warteschlange hinzugefügtes Ereignis wird im angegebenen Zeitintervall an den Handler übermittelt. Wenn die Verarbeitung fehlschlägt, wird das Ereignis später erneut zugestellt. Darüber hinaus ist die maximale Anzahl von Versuchen begrenzt. Redis garantiert keine Sicherheit und Sie müssen auf den Verlust von Ereignissen vorbereitet sein. In der Cluster-Version weist Redis jedoch eine ziemlich hohe Zuverlässigkeit auf, und wir haben dies in anderthalb Betriebsjahren noch nie erlebt.
API
Fügen Sie der Warteschlange ein Ereignis hinzu
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();
Beachten Sie, dass die Methode zurückgegeben Monowird. Zum Ausführen müssen Sie einen der folgenden Schritte ausführen:
subscribe(...)block()
Ausführlichere Erläuterungen finden Sie in der Dokumentation zu Project Reactor. Der Kontext wird dem Ereignis wie folgt hinzugefügt:
eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();
Event-Handler registrieren
eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);
, :
eventService.addHandler(
DummyEvent.class,
e -> Mono
.subscriberContext()
.doOnNext(ctx -> {
Map<String, String> eventContext = ctx.get("eventContext");
log.info("context key {}", eventContext.get("key"));
})
.thenReturn(true),
1
);
eventService.removeHandler(DummyEvent.class);
"-":
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
var eventService = delayedEventService().client(redisClient).build();
:
import static com.github.fred84.queue.DelayedEventService.delayedEventService;
var eventService = delayedEventService()
.client(redisClient)
.mapper(objectMapper)
.handlerScheduler(Schedulers.fromExecutorService(executor))
.schedulingInterval(Duration.ofSeconds(1))
.schedulingBatchSize(SCHEDULING_BATCH_SIZE)
.enableScheduling(false)
.pollingTimeout(POLLING_TIMEOUT)
.eventContextHandler(new DefaultEventContextHandler())
.dataSetPrefix("")
.retryAttempts(10)
.metrics(new NoopMetrics())
.refreshSubscriptionsInterval(Duration.ofMinutes(5))
.build();
( Redis) eventService.close() , @javax.annotation.PreDestroy.
- , . :
- , Redis;
- , ( "delayed.queue.ready.for.handling.count" )
, delayed queue. 2018
Amazon Web Services.
, . : " , Amazon-, ".
:
- , JMS . SQS , 15 .
" " . , Redis :
- sorted sets,
- "sorted_set" "list" ( )
, Netflix dyno-queues
. , , .
, " " sorted set list, ( ):
var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
var payload = extractPayload(key);
var listName = extractType(key);
redis.lpush(listName, payload);
redis.zrem("delayed_events", key);
});
redis.brpop(listName)
.
"list" (, ), list . Redis , 2 .
events.forEach(key -> {
...
redis.multi();
redis.zrem("delayed_events", key);
redis.lpush(listName, payload);
redis.exec();
});
list-a . , . "sorted_set" .
events.forEach(key -> {
...
redis.multi();
redis.zadd("delayed_events", nextAttempt(key))
redis.zrem("delayed_events", key);
redis.lpush(listName, payload);
redis.exec();
});
, , " " "delayed queue" . "sorted set"
metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .
var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);
var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;
redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();
, . , list . TTL :
redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());
Spring, . " " :
Lettuce , . Project Reactor , " ".
, Subscriber
redis
.reactive()
.brpop(timeout, queue)
.map(e -> deserialize(e))
.subscribe(new InnerSubscriber<>(handler, ... params ..))
class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {
@Override
protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
Mono<Boolean> promise = handler.apply(envelope.getPayload());
promise.subscribe(r -> request(1));
}
}
, ( Netflix dyno queue, poll- ).
?
- Kotlin DSL. Kotlin
suspend funAPIProject Reactor