Limiting Concurrency Per-Method with Concurrency Groups#
Besides setting the max concurrency overall for an actor, Ray allows methods to be separated into concurrency groups, each with its own threads(s). This allows you to limit the concurrency per-method, e.g., allow a health-check method to be given its own concurrency quota separate from request serving methods.
Tip
Concurrency groups work with both asyncio and threaded actors. The syntax is the same.
Defining Concurrency Groups#
This defines two concurrency groups, “io” with max concurrency = 2 and
“compute” with max concurrency = 4. The methods f1
and f2
are
placed in the “io” group, and the methods f3
and f4
are placed
into the “compute” group. Note that there is always a default
concurrency group for actors, which has a default concurrency of 1000
AsyncIO actors and 1 otherwise.
You can define concurrency groups for actors using the concurrency_group
decorator argument:
import ray
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
async def f2(self):
pass
@ray.method(concurrency_group="compute")
async def f3(self):
pass
@ray.method(concurrency_group="compute")
async def f4(self):
pass
async def f5(self):
pass
a = AsyncIOActor.remote()
a.f1.remote() # executed in the "io" group.
a.f2.remote() # executed in the "io" group.
a.f3.remote() # executed in the "compute" group.
a.f4.remote() # executed in the "compute" group.
a.f5.remote() # executed in the default group.
You can define concurrency groups for concurrent actors using the API setConcurrencyGroups()
argument:
class ConcurrentActor {
public long f1() {
return Thread.currentThread().getId();
}
public long f2() {
return Thread.currentThread().getId();
}
public long f3(int a, int b) {
return Thread.currentThread().getId();
}
public long f4() {
return Thread.currentThread().getId();
}
public long f5() {
return Thread.currentThread().getId();
}
}
ConcurrencyGroup group1 =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("io")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f1)
.addMethod(ConcurrentActor::f2)
.build();
ConcurrencyGroup group2 =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("compute")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f3)
.addMethod(ConcurrentActor::f4)
.build();
ActorHandle<ConcurrentActor> myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1, group2)
.remote();
myActor.task(ConcurrentActor::f1).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f2).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f3, 3, 5).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f4).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f5).remote(); // executed in the "default" group.
Default Concurrency Group#
By default, methods are placed in a default concurrency group which has a concurrency limit of 1000 for AsyncIO actors and 1 otherwise.
The concurrency of the default group can be changed by setting the max_concurrency
actor option.
The following actor has 2 concurrency groups: “io” and “default”. The max concurrency of “io” is 2, and the max concurrency of “default” is 10.
@ray.remote(concurrency_groups={"io": 2})
class AsyncIOActor:
async def f1(self):
pass
actor = AsyncIOActor.options(max_concurrency=10).remote()
The following concurrent actor has 2 concurrency groups: “io” and “default”. The max concurrency of “io” is 2, and the max concurrency of “default” is 10.
class ConcurrentActor:
public long f1() {
return Thread.currentThread().getId();
}
ConcurrencyGroup group =
new ConcurrencyGroupBuilder<ConcurrentActor>()
.setName("io")
.setMaxConcurrency(2)
.addMethod(ConcurrentActor::f1)
.build();
ActorHandle<ConcurrentActor> myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1)
.setMaxConcurrency(10)
.remote();
Setting the Concurrency Group at Runtime#
You can also dispatch actor methods into a specific concurrency group at runtime.
The following snippet demonstrates setting the concurrency group of the
f2
method dynamically at runtime.
You can use the .options
method.
# Executed in the "io" group (as defined in the actor class).
a.f2.options().remote()
# Executed in the "compute" group.
a.f2.options(concurrency_group="compute").remote()
You can use setConcurrencyGroup
method.
// Executed in the "io" group (as defined in the actor creation).
myActor.task(ConcurrentActor::f2).remote();
// Executed in the "compute" group.
myActor.task(ConcurrentActor::f2).setConcurrencyGroup("compute").remote();