https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
It is often the case that a source of information is synchronous and blocking. To deal with such sources in your Reactor applications, apply the following pattern:
Mono blockingWrapper = Mono.fromCallable(() -> { (1)
return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
(1) Create a new Mono by using fromCallable.
(2) Return the asynchronous, blocking resource.
(3) Ensure each subscription happens on a dedicated single-threaded worker from Schedulers.boundedElastic()
You should use a Mono, because the source returns one value. You should use Schedulers.boundedElastic, because it creates a dedicated thread to wait for the blocking resource without impacting other non-blocking processing, while also ensuring that there is a limit to the amount of threads that can be created, and blocking tasks that can be enqueued and deferred during a spike.
Note that subscribeOn does not subscribe to the Mono. It specifies what kind of Scheduler to use when a subscribe call happens.
Also, note that subscribeOn operator should immediately follow the source and any further operators are defined after the subscribeOn wrapper.
https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html
Schedulers provides various Scheduler flavors usable by publishOn or subscribeOn :
- parallel()
- A fixed pool of workers that is tuned for parallel work.
- Optimized for fast Runnable non-blocking executions
- single():
- A single, reusable thread.
- Optimized for low-latency Runnable one-off executions
- newSingle()
- A per-call dedicated thread.
- boundedElastic()
- Optimized for longer executions, an alternative for blocking tasks where the number of active tasks (and threads) is capped
- An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
- immediate()
- Current thread.
- to immediately run submitted Runnable instead of scheduling them (somewhat of a no-op or "null object" Scheduler)
- fromExecutorService(ExecutorService) to create new instances around Executors
Factories prefixed with new (eg. newBoundedElastic(int, int, String) return a new instance of their flavor of Scheduler, while other factories like boundedElastic() return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler. All instances are returned in a initialized state.
'개발' 카테고리의 다른 글
Java21 - Virtual Thread (0) | 2023.08.17 |
---|---|
What is diffrent between Callable and Supplier interface? (0) | 2023.07.09 |
Thread Pool (0) | 2023.07.03 |
Retry (0) | 2023.07.03 |
[Redis] Replication / Cluster / Sentinel (0) | 2023.06.18 |