本文共 3748 字,大约阅读时间需要 12 分钟。
Akka MessageDispatcher 是维持 Akka Actor “运作”的部分, 可以说它是整个机器的引擎. 所有的MessageDispatcher 实现也同时是一个 ExecutionContext, 这意味着它们可以用来执行任何代码, 例如 .
在没有为 Actor作配置的情况下,每个 ActorSystem 将有一个缺省的派发器。 缺省派发器是可配置的,缺省情况下是一个确定的default-executor的 Dispatcher。如果通过传递ExecutionContext 来创建ActorSystem ,在ActorSystem中,此ExecutionContext 将作为所有派发器的defalut-executor 。如果没有指定ExecutionContext,将后退到akka.actor.default-dispatcher.default-executor.fallback 的executor。缺省情况下的”fork-join-executor”,在大多数情况下拥有非常好的性能。
派发器实现ExecutionContext 接口,因此可以用来运行Future 调用 等待。
// for use with Futures, Scheduler, etc.final ExecutionContext ex = system.dispatchers().lookup("my-dispatcher");
在你想为Actor配置一个不同派发器而不是默认情况下,你需要做两样东西,首先是配置派发器:
my-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 2 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 2.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100}
接着使用 “thread-pool-executor”:
my-thread-pool-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "thread-pool-executor" # Configuration for the thread pool thread-pool-executor { # minimum number of threads to cap factor-based core number to core-pool-size-min = 2 # No of core threads ... ceil(available processors * factor) core-pool-size-factor = 2.0 # maximum number of threads to cap factor-based number to core-pool-size-max = 10 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 100}
更多细节选项,请见默认派发器章节
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class), "myactor");
akka.actor.deployment { /myactor { dispatcher = my-dispatcher }}
一种代替部署配置方法是定义派发器在代码里面。如果在部署配置里面定义派发器则该值将被使用代替编码设置参数。
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor3");
注意:你在withDispatcher中指定的 “dispatcherId” 其实是配置中的一个路径. 所以在这种情况下它位于配置的顶层,但你可以把它放在下面的层次,用.来代表子层次,象这样: “foo.bar.my-dispatcher”。
一共有4种类型的消息派发器:
配置PinnedDispatcher:
my-pinned-dispatcher { executor = "thread-pool-executor" type = PinnedDispatcher}
接着使用它:
ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class) .withDispatcher("my-pinned-dispatcher"));
注意:thread-pool-executor 配置按照上面my-thread-pool-dispatcher例子是不适用的。这是因为当使用PinnedDispatcher时候,每一个角色将有自己的线程池,线程池将只有一个线程。
注意:随着时间推移这将不保证一直使用相同线程,由于核心池超时用于PinnedDispatcher 在闲置角色情况下,降低资源使用。为了一直使用相同的线程,你需要添加 thread-pool-executor.allow-core-timeout=off到PinnedDispatcher配置中。