You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The reactor-pool project aims at providing a generic object pool to reactive application that:
exposes a reactive API (Publisher input types, Mono return types)
is non-blocking (never blocking a user that makes an attempt to acquire() a resource)
has lazy acquire behavior
For use-cases where granular control of the release() is needed, the classic path of acquire() is offered, which exposes a PooledRef wrapper to the resource. This also allows access to statistics about the resource lifecycle in the pool.
// given Pool<T> poolMono<PooledRef<T>> grabResource = pool.acquire();
//no resource is actually requested yet at this pointgrabResouce.subscribe();
//now one resource is requested from the pool asynchronously//Another example, this time synchronously acquiring, immediately followed up by a release:PooledRef<T> ref = grabResource.block(); //second subscription requests a second resourceref.release().block(); //release() is also asynchronous and lazy
For use-cases where the resource itself can be consumed reactively (exposes a reactive API), a scoped mode of acquisition is offered. withPoolable:
let the consumer declaratively use the resource
provides a scope / closure in which the resource is acquired, used as instructed and released automatically
avoids dealing with an indirection (the resource is directly exposed)
//given at DbConnection type and a Pool<DbConnection> poolpool.withPoolable(resource -> resource//we declare using the connection to create a Statement...
.createStatement()
//...then performing a SELECT query...
.flatMapMany(st -> st.query("SELECT * FROM foo"))
//...then marshalling the rows to JSON
.map(row -> rowToJson(row))
//(all of which need the live resource)
)
//at this point the rest of the steps are outside the scope//so the resource can be released
.map(json -> sanitize(json));