개발

How Do I Wrap a Synchronous, Blocking Call

빠빠담 2023. 7. 4. 00:11
반응형

https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

 

It is often the case that a source of information is synchronous and blocking. To deal with such sources in your Reactor applications, apply the following pattern:

 

Mono blockingWrapper = Mono.fromCallable(() -> { (1)
    return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)

(1) Create a new Mono by using fromCallable.
(2) Return the asynchronous, blocking resource.
(3) Ensure each subscription happens on a dedicated single-threaded worker from Schedulers.boundedElastic()

 

You should use a Mono, because the source returns one value. You should use Schedulers.boundedElastic, because it creates a dedicated thread to wait for the blocking resource without impacting other non-blocking processing, while also ensuring that there is a limit to the amount of threads that can be created, and blocking tasks that can be enqueued and deferred during a spike.

Note that subscribeOn does not subscribe to the Mono. It specifies what kind of Scheduler to use when a subscribe call happens.

Also, note that subscribeOn operator should immediately follow the source and any further operators are defined after the subscribeOn wrapper.

 

https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html

 

Schedulers provides various Scheduler flavors usable by publishOn or subscribeOn :

  • parallel()
    • A fixed pool of workers that is tuned for parallel work.
    • Optimized for fast Runnable non-blocking executions
  • single(): 
    • A single, reusable thread.
    • Optimized for low-latency Runnable one-off executions
  • newSingle()
    • A per-call dedicated thread.
  • boundedElastic()
    • Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped
    • An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
  • immediate()
    • Current thread.
    • to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler)
  • fromExecutorService(ExecutorService) to create new instances around Executors

Factories prefixed with new (eg. newBoundedElastic(int, int, String) return a new instance of their flavor of Scheduler, while other factories like boundedElastic() return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler. All instances are returned in a initialized state.

반응형

'개발' 카테고리의 다른 글

Java21 - Virtual Thread  (0) 2023.08.17
What is diffrent between Callable and Supplier interface?  (0) 2023.07.09
Thread Pool  (0) 2023.07.03
Retry  (0) 2023.07.03
[Redis] Replication / Cluster / Sentinel  (0) 2023.06.18