@groovy.util.logging.Log abstract class RegulatingActor<T> extends groovyx.gpars.actor.DefaultActor implements java.lang.Runnable
An Actor that exerts back-pressure to regulate the rate of incoming messages.
The regulating actor counts how many messages are pending vs how many it has processed. If the number of pending messages exceeds a soft threshold, it blocks for a short time before queueing the next message. If the number of messages exceeds a hard threshold, it blocks for a long time before queuing the message. This allows effective control over how much memory the actor is using in its queue.
The RegulatingActor tracks how many messages are pending by incremenging a counter (pendingMessages). A user of a RegulatingActor should therefore send messages to it using the sendTo method, so that this counter can be incremented.
Stopping a RegulatingActor
Although they can be terminated the usual way (the terminate()
method), an
orderly shutdown that enqueues a stop message (poison pill) that is automatically
processed can be initiated by using the sendStop() method
Sometimes you may want a 'per-client' pending count; that is you don't want one over-zealous producer to block other producers. This can be especially important if there are dependencies such that blocking some producers could result in a deadlock. This can be implemented by supplying your own pendingCount instead of using the shared one. To do that, you can use the send(AcknowledgeableMessage message) method, where you construct the AcknowledgeableMessage yourself and supply it with your own counter.
Since chaining together regulated actors is a common pattern, this class provides support for a default "downstream". If the downstream actor is supplied in the constructor, you can send messages to it that are automatically per-client limited using the sendDownstream method.
If you are constructing a chain of actors with multiple layers, it can be important to
place these layers into separate thread pools. This can be done by setting the DefaultActor#parallelGroup#parallelGroup
property of the actor before starting it. This is because RegulatingActor
instances
can hard-block when downstream actors are overwhelmed, leading to starvation of threads from
the downstream actors.
The simple way to use a regulating actor is just to use the actor, eg:
def received = []
actor = RegulatingActor.actor {
received << it
}
actor.start()
actor.sendTo("hello")
Thread.sleep(500)
println received // has message "hello"
T
- the type of the messages to be processed
Modifiers | Name | Description |
---|---|---|
class |
RegulatingActor.1 |
|
class |
RegulatingActor.2 |
Fields inherited from class | Fields |
---|---|
class groovyx.gpars.actor.DefaultActor |
CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED, TIMEOUT, serialHandle |
Type | Name and description |
---|---|
static java.lang.Object |
STOP |
RegulatingActor |
downstream |
java.util.concurrent.atomic.AtomicInteger |
downstreamCounter Count of messages pending in downstream due to this actor |
java.util.concurrent.atomic.AtomicInteger |
pendingMessageCount Count of messages that have been sent to this actor but not processed |
ProgressCounter |
progress |
boolean |
stopped |
long |
throttleWarningMs |
boolean |
throttled |
boolean |
verbose |
Constructor and description |
---|
RegulatingActor
(RegulatingActor downstream, int softLimit, int hardLimit) |
RegulatingActor
(int softLimit, int hardLimit) |
RegulatingActor
() Create a RegulatingActor with basic defaults for the queue sizes that suit many tasks |
Type Params | Return Type | Name and description |
---|---|---|
|
void |
act() |
<T> |
static RegulatingActor<T> |
actor(groovy.lang.Closure c) |
|
void |
doTerminate() |
|
void |
onEnd() |
|
abstract void |
process(T message) |
|
void |
run() |
|
groovyx.gpars.actor.impl.MessageStream |
send(AcknowledgeableMessage message) |
|
void |
sendDownstream(java.lang.Object message) |
|
groovyx.gpars.actor.impl.MessageStream |
sendLimited(AcknowledgeableMessage message, boolean throttle) |
|
void |
sendStop() Send a pre-determined STOP message to the actor, which when receieved, will cause it to call terminate(). |
|
groovyx.gpars.actor.impl.MessageStream |
sendTo(T o) |
Methods inherited from class | Name |
---|---|
class groovyx.gpars.actor.DefaultActor |
groovyx.gpars.actor.DefaultActor#start(), groovyx.gpars.actor.DefaultActor#loop(java.lang.Runnable), groovyx.gpars.actor.DefaultActor#react(groovy.lang.Closure), groovyx.gpars.actor.DefaultActor#silentStart(), groovyx.gpars.actor.DefaultActor#stop(), groovyx.gpars.actor.DefaultActor#isActive(), groovyx.gpars.actor.DefaultActor#isFair(), groovyx.gpars.actor.DefaultActor#reply(java.lang.Object), groovyx.gpars.actor.DefaultActor#send(java.lang.Object), groovyx.gpars.actor.DefaultActor#terminate(), groovyx.gpars.actor.DefaultActor#makeFair(), groovyx.gpars.actor.DefaultActor#getSender(), groovyx.gpars.actor.DefaultActor#setParallelGroup(groovyx.gpars.group.PGroup), groovyx.gpars.actor.DefaultActor#replyIfExists(java.lang.Object), groovyx.gpars.actor.DefaultActor#join(), groovyx.gpars.actor.DefaultActor#join(long, java.util.concurrent.TimeUnit), groovyx.gpars.actor.DefaultActor#join(groovy.time.BaseDuration), groovyx.gpars.actor.DefaultActor#join(groovyx.gpars.actor.impl.MessageStream), groovyx.gpars.actor.DefaultActor#onStop(groovy.lang.Closure), groovyx.gpars.actor.DefaultActor#isActorThread(), groovyx.gpars.actor.DefaultActor#getJoinLatch(), groovyx.gpars.actor.DefaultActor#getParallelGroup(), groovyx.gpars.actor.DefaultActor#sendAndContinue(java.lang.Object, groovy.lang.Closure), groovyx.gpars.actor.DefaultActor#sendAndPromise(java.lang.Object), groovyx.gpars.actor.DefaultActor#threadBoundActor(), groovyx.gpars.actor.DefaultActor#leftShift(java.lang.Object), groovyx.gpars.actor.DefaultActor#call(java.lang.Object), groovyx.gpars.actor.DefaultActor#send(), groovyx.gpars.actor.DefaultActor#send(java.lang.Object, groovyx.gpars.actor.impl.MessageStream), groovyx.gpars.actor.DefaultActor#sendAndWait(java.lang.Object, groovy.time.Duration), groovyx.gpars.actor.DefaultActor#sendAndWait(java.lang.Object, long, java.util.concurrent.TimeUnit), groovyx.gpars.actor.DefaultActor#sendAndWait(java.lang.Object), groovyx.gpars.actor.DefaultActor#getRemoteClass(), groovyx.gpars.actor.DefaultActor#getOrCreateSerialHandle(), groovyx.gpars.actor.DefaultActor#wait(long), groovyx.gpars.actor.DefaultActor#wait(long, int), groovyx.gpars.actor.DefaultActor#wait(), groovyx.gpars.actor.DefaultActor#equals(java.lang.Object), groovyx.gpars.actor.DefaultActor#toString(), groovyx.gpars.actor.DefaultActor#hashCode(), groovyx.gpars.actor.DefaultActor#getClass(), groovyx.gpars.actor.DefaultActor#notify(), groovyx.gpars.actor.DefaultActor#notifyAll() |
Count of messages pending in downstream due to this actor
Count of messages that have been sent to this actor but not processed
Create a RegulatingActor with basic defaults for the queue sizes that suit many tasks
Send a pre-determined STOP message to the actor, which when receieved, will cause it to call terminate().
This method will return immediately. To wait for the stop to occur, call the
join()
method.
Groovy Documentation