Behandlung von Java Reactor mit Kotlin Coroutines

Beim aktuellen Job schreiben wir in Reactor. Die Technologie ist cool, aber wie immer gibt es viele ABER. Einige Dinge sind ärgerlich, der Code ist schwieriger zu schreiben und zu lesen, und ThreadLocal ist eine echte Katastrophe. Ich habe mich entschlossen zu sehen, welche Probleme verschwinden, wenn Sie zu Kotlin Coroutines wechseln, und welche Probleme im Gegenteil hinzugefügt werden.





Patientenkarte

Ich habe ein kleines Projekt für den Artikel geschrieben und die Probleme reproduziert, auf die ich bei der Arbeit gestoßen bin. Der Hauptcode ist hier . Der Algorithmus hat ihn bewusst nicht in separate Methoden zerlegt, sodass die Probleme besser erkennbar sind.





Kurz gesagt zum Algorithmus: 





Wir überweisen Geld von einem Konto auf ein anderes und zeichnen Transaktionen über die Tatsache der Überweisung auf. 





Die Übersetzung ist idempotent. Wenn sich die Transaktion bereits in der Datenbank befindet, antworten wir dem Kunden, dass alles in Ordnung ist. Beim Einfügen einer Transaktion kann eine DataIntegrityViolationException ausgelöst werden. Dies bedeutet auch, dass die Transaktion bereits vorhanden ist.





Um nicht ins Negative zu geraten, wird der + Optimistic-Sperrcode überprüft, der keine wettbewerbsfähige Aktualisierung von Konten ermöglicht. Damit es funktioniert, müssen Sie es erneut versuchen und zusätzliche Fehler behandeln.





Für diejenigen, die den Algorithmus selbst nicht mögen

Der Algorithmus für das Projekt wurde gewählt, um die Probleme zu reproduzieren, nicht um effizient und architektonisch korrekt zu sein. Anstelle einer Transaktion müssen Sie Halbleiter einfügen, eine optimistische Sperre ist überhaupt nicht erforderlich (stattdessen eine Überprüfung der Positivität eines Kontos in SQL). Wählen Sie + Einfügen sollte durch Upsert ersetzt werden.





Patientenbeschwerden

  1. Stacktrace .





  2. , . 





  3. - flatMap.





  4. .





  5. Mono.empty().





  6. , - , traceId. ( , ThreadLocal , SpringSecurity)





  7. .





  8. api .





PR Java Kotlin. 









com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .





suspend fun transfer(@RequestBody request: TransferRequest)



public Mono<Void> transfer(@RequestBody TransferRequest request)







suspend fun save(account: Account): Account



Mono<Account> save(Account account);



, , suspend , , Reactor .





runBlocking { … }



, suspend .





Retry kotlin-retry. , , ( PR).





, , . -.





:





public Mono<Void> transfer(String transactionKey, long fromAccountId,
                           long toAccountId, BigDecimal amount) {
  return transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty())
    .flatMap(withMDC(foundTransaction -> {
      if (foundTransaction.isPresent()) {
        log.warn("retry of transaction " + transactionKey);
        return Mono.empty();
      }
      return accountRepository.findById(fromAccountId)
        .switchIfEmpty(Mono.error(new AccountNotFound()))
        .flatMap(fromAccount -> accountRepository.findById(toAccountId)
          .switchIfEmpty(Mono.error(new AccountNotFound()))
          .flatMap(toAccount -> {
            var transactionToInsert = Transaction.builder()
              .amount(amount)
              .fromAccountId(fromAccountId)
              .toAccountId(toAccountId)
              .uniqueKey(transactionKey)
              .build();
            var amountAfter = fromAccount.getAmount().subtract(amount);
            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
              return Mono.error(new NotEnoghtMoney());
            }
            return transactionalOperator.transactional(
              transactionRepository.save(transactionToInsert)
                .onErrorResume(error -> {
                  //transaction was inserted on parallel transaction,
                  //we may return success response
                  if (error instanceof DataIntegrityViolationException
             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
                    return Mono.empty();
                  } else {
                    return Mono.error(error);
                  }
                })
                .then(accountRepository.transferAmount(
                  fromAccount.getId(), fromAccount.getVersion(), 
                  amount.negate()
                ))
                .then(accountRepository.transferAmount(
                  toAccount.getId(), toAccount.getVersion(), amount
                ))
            );
          }));
    }))
    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))
      .filter(OptimisticLockException.class::isInstance)
      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
    )
    .onErrorMap(
      OptimisticLockException.class,
      e -> new ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED,
        "limit of OptimisticLockException exceeded", e
      )
    )
    .onErrorResume(withMDC(e -> {
      log.error("error on transfer", e);
      return Mono.error(e);
    }));
}
      
      



:





suspend fun transfer(transactionKey: String, fromAccountId: Long,
                     toAccountId: Long, amount: BigDecimal) {
  try {
    try {
      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
        val foundTransaction = transactionRepository
          .findByUniqueKey(transactionKey)
        if (foundTransaction != null) {
          logger.warn("retry of transaction $transactionKey")
          return@retry
        }

        val fromAccount = accountRepository.findById(fromAccountId)
          ?: throw AccountNotFound()
        val toAccount = accountRepository.findById(toAccountId)
          ?: throw AccountNotFound()

        if (fromAccount.amount - amount < BigDecimal.ZERO) {
          throw NotEnoghtMoney()
        }
        val transactionToInsert = Transaction(
          amount = amount,
          fromAccountId = fromAccountId,
          toAccountId = toAccountId,
          uniqueKey = transactionKey
        )
        transactionalOperator.executeAndAwait {
          try {
            transactionRepository.save(transactionToInsert)
          } catch (e: DataIntegrityViolationException) {
            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
              throw e;
            }
          }

          accountRepository.transferAmount(
            fromAccount.id!!, fromAccount.version, amount.negate()
          )
          accountRepository.transferAmount(
            toAccount.id!!, toAccount.version, amount
          )
        }
      }
    } catch (e: OptimisticLockException) {
      throw ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED, 
        "limit of OptimisticLockException exceeded", e
      )
    }
  } catch (e: Exception) {
    logger.error(e) { "error on transfer" }
    throw e;
  }
}
      
      



Stacktraces

, . 





:





o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
	at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
  ...
      
      



:





error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
	at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	(Coroutine boundary)
	at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
	at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
	at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
	at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
	at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
	at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
	at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
	(Coroutine creation stacktrace)
	at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
	at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	...

      
      



, ( , ).





Java . , . . . Kotlin .





, - . ? . , - traceId (thread name ) .





Kotlin , , . ( : ).





flatMap. - try catch, .





:





return accountRepository.findById(fromAccountId)
  .switchIfEmpty(Mono.error(new AccountNotFound()))
  .flatMap(fromAccount -> accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound()))
    .flatMap(toAccount -> {
      ...
    })
      
      



:





val fromAccount = accountRepository.findById(fromAccountId)
  ?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
  ?: throw AccountNotFound()
...
      
      



try catch, .





:





return transactionRepository.findByUniqueKey(transactionKey)
  ...
  .onErrorMap(
    OptimisticLockException.class,
    e -> new ResponseStatusException(
      BANDWIDTH_LIMIT_EXCEEDED, 
      "limit of OptimisticLockException exceeded", e
    )
  )
      
      



:





try {
  val foundTransaction = transactionRepository
    .findByUniqueKey(transactionKey)
  ...
} catch (e: OptimisticLockException) {
  throw ResponseStatusException(
    BANDWIDTH_LIMIT_EXCEEDED, 
    "limit of OptimisticLockException exceeded", e
  )
}
      
      



throw, . Reactor :





.flatMap(foo -> {
  if (foo.isEmpty()) { 
    return Mono.error(new IllegalStateException());
  } else {
    return Mono.just(foo);
  }
})
      
      



, , . - .





Mono.empty()

. null . ¨C5C. 





Ide , mono . . , - .





Kotlin not null , , . nullable - .





:





:





return transactionRepository.findByUniqueKey(transactionKey)
  .map(Optional::of)
  .defaultIfEmpty(Optional.empty())
  .flatMap(foundTransaction -> {
    if (foundTransaction.isPresent()) {
      log.warn("retry of transaction " + transactionKey);
      return Mono.empty();
    }
...
      
      



:





val foundTransaction = transactionRepository
  .findByUniqueKey(transactionKey)
if (foundTransaction != null) {
  logger.warn("retry of transaction $transactionKey")
  return@retry
}
...
      
      



, - Reactor, .





, traceId . ThreadLocal , MDC ( ). ?





. Reactor Coroutines immutable, MDC ( ).





Java , traceId :





@Component
public class TraceIdFilter implements WebFilter {
  @Override
  public Mono<Void> filter(
    ServerWebExchange exchange, WebFilterChain chain
  ) {
    var traceId = Optional.ofNullable(
      exchange.getRequest().getHeaders().get("X-B3-TRACEID")
    )
      .orElse(Collections.emptyList())
      .stream().findAny().orElse(UUID.randomUUID().toString());
    return chain.filter(exchange)
      .contextWrite(context ->
        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
      );
  }
}
      
      



, - , traceId MDC:





public static <T, R> Function<T, Mono<R>> withMDC(
  Function<T, Mono<R>> block
) {
  return value -> Mono.deferContextual(context -> {
    Optional<Map<String, String>> mdcContext = context
      .getOrEmpty(MDC_ID_KEY);
    if (mdcContext.isPresent()) {
      try {
        MDC.setContextMap(mdcContext.get());
        return block.apply(value);
      } finally {
        MDC.clear();
      }
    } else {
      return block.apply(value);
    }
  });
}
      
      



, Mono. .. , Mono. :





.onErrorResume(withMDC(e -> {
  log.error("error on transfer", e);
  return Mono.error(e);
}))
      
      



Kotlin . , traceId MDC:





@Component
class TraceIdFilter : WebFilter {
  override fun filter(
    exchange: ServerWebExchange, chain: WebFilterChain
  ): Mono<Void> {
    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first() 
    MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
    return chain.filter(exchange)
  }
}
      
      



withContext(MDCContext()) { … }







, MDC traceId. .





Java Reactor , : , , breakpoints ...





: stepOver, , ( ). 





, suspend . issue. , , Java Reactor evaluate , .





, , .





:





return Mono.zip(
  transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty()),
  accountRepository.findById(fromAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
  accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
  var foundTransaction = fetched.getT1();
  var fromAccount = fetched.getT2();
  var toAccount = fetched.getT3();
  if (foundTransaction.isPresent()) {
    log.warn("retry of transaction " + transactionKey);
    return Mono.empty();
  }
  ...
}
      
      



:





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async { 
    accountRepository.findById(fromAccountId) 
  }
  val toAccountAsync = async { 
    accountRepository.findById(toAccountId) 
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
  ...
}
      
      



Kotlin “ ”, “ ” Reactor.





, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).





. , Reactor :





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async {
    accountRepository.findById(fromAccountId)
  }
  val toAccountAsync = async {
    accountRepository.findById(toAccountId)
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val transactionToInsert = Transaction(
    amount = amount,
    fromAccountId = fromAccountId,
    toAccountId = toAccountId,
    uniqueKey = transactionKey
  )
  transactionalOperator.executeAndAwait {
    try {
      transactionRepository.save(transactionToInsert)
    } catch (e: DataIntegrityViolationException) {
      if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
        throw e;
      }
    }
    val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
    val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
    if (fromAccount.amount - amount < BigDecimal.ZERO) {
      throw NotEnoghtMoney()
    }

    accountRepository.transferAmount(
      fromAccount.id!!, fromAccount.version, amount.negate()
    )
    accountRepository.transferAmount(
      toAccount.id!!, toAccount.version, amount
    )
  }
}
      
      



. .. , . , , ( ).





, , .





context scope

, :





  1. scope. , , .





  2. context. .





Spring , :





@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
  coroutineScope {
    withContext(MDCContext()) {
      ledger.transfer(request.transactionKey, request.fromAccountId, 
                      request.toAccountId, request.amount)
    }
  }
}
      
      



, regexp , . - .





AOP suspend

, , . aspect suspend .





Ich habe es schließlich geschafft, diesen Aspekt zu schreiben. Um zu erklären, wie dies funktioniert, benötigen Sie einen separaten Artikel.





Ich hoffe, dass es einen angemesseneren Weg gibt, Aspekte zu schreiben (ich werde versuchen, dazu beizutragen).





Behandlungsbewertung

Alle Probleme verschwanden. Ein paar neue wurden hinzugefügt, aber es ist erträglich.





Ich muss sagen, dass sich Coroutinen schnell entwickeln und ich nur eine bessere Arbeit mit ihnen erwarte.





Es ist ersichtlich, dass das JetBrains-Team auf die Probleme der Entwickler aufmerksam war. Soweit ich weiß, gab es vor etwa einem Jahr immer noch Probleme mit Debugging und Stact-Racing.





Am wichtigsten ist, dass Sie bei Coroutinen nicht alle Funktionen von Reactor und seiner mächtigen API berücksichtigen müssen. Sie schreiben einfach den Code.








All Articles