简介
我问自己一个问题——“我如何在 Dart 上编写多线程程序?”。
Dart 有一个内置机制,允许你实现多线程代码执行——隔离区(isolates)。
就其本身而言,Dart 中的隔离区是 actor 模型实现的一种变体(使用独立的内存,通过发送消息进行通信),但它们不具备简单创建一组相互通信的隔离区的工具(需要不断将一些隔离区的 Send 端口传递给其他隔离区以实现它们之间的通信)、错误处理场景、负载均衡器。
在创建这个包时,我受到了 Akka.NET 和其他实现 actor 模型的框架的启发。但我并没有设定将 Akka.NET 移植到 Dart 的目标,而只是从中选取了一些我喜欢的部分并为自己进行了改造。
目前,这个包仍在开发中,我非常乐意听取任何人的评论、想法或发现的问题。
关于 Theater
Theater 是一个简化 Dart 中多线程工作、简化与隔离区工作的包。
它提供了
- 一个用于在 actor(隔离区)之间路由消息的系统,它封装了 Receive 和 Send 端口的工作;
- 在单个 actor 或一组 actor 级别的错误处理系统;
- 配置消息路由的能力(特殊的 actor——路由器,允许你设置其子 actor 之间所提供的消息路由策略之一,能够为特定类型的消息设置优先级);
- 在 actor 之间进行负载均衡(消息)的能力,创建 actor 池。
目前正在开发的是通过网络向位于其他 Dart VM 中的 actor 系统发送消息的功能。
安装
将 Theater 添加到你的 pubspec.yaml 文件中
dependencies:
theater: ^0.1.0
在使用 Theater 的文件中导入它
import 'package:theater/theater.dart';
什么是 Actor
Actor 是一个具有行为并在独立隔离区中执行的实体。它在 actor 系统中拥有自己独特的地址(路径)。它可以使用与其他 actor 的链接或仅使用其在 actor 系统中的地址(路径)接收和发送消息。每个 actor 都有在其生命周期中调用的方法(这些方法重复其隔离区的生命周期)
- onStart()。Actor 启动后调用;
- onPause()。Actor 暂停前调用;
- onResume()。Actor 恢复后调用;
- onKill()。Actor 被终止前调用。
每个 actor 都有一个邮箱。这是发送给它的消息在到达 actor 之前存放的地方。关于邮箱的类型,你可以在此处阅读。
Actor 可以创建子 actor。并充当它们的监督者(监控它们的生命周期,处理其中发生的错误)。子 actor 的生命周期也取决于其父 actor 的生命周期。
关于 actor 的注意事项
当一个 actor 暂停时,它的所有子 actor 会首先暂停。
示例:有 3 个 actor A1、A2、A3。A1 创建了 A2,A2 创建了 A3。如果 A1 暂停 A2,A3 也会被暂停。在这种情况下,A3 会先暂停,然后是 A2。
当一个 actor 被终止时,它的所有子 actor 会首先被终止。
示例:有 3 个 actor A1、A2、A3。A1 创建了 A2,A2 创建了 A3。如果 A1 销毁 A2,A3 也会被终止。在这种情况下,A3 会先被终止,然后是 A2。
使用 Actor
你可以通过阅读本 README 并查看 README 中的示例或此处的示例来了解 actor 的工作原理。
不过,我认为值得一提的是我建议如何在 Dart 程序中使用 actor。
一个 actor 必须封装一个特定的任务,如果任务可以分解为子任务,那么在这种情况下,应该为实现大任务的 actor 创建子 actor,并重复此操作,直到一个 actor 执行了某个特定的任务。
应该记住,actor(隔离区)的使用并不适用于所有任务。在隔离区之间转发消息需要一些时间,并且只应在并行计算的性能增益超过发送消息所浪费的时间时才使用。
首先,这种方法将允许在服务器上更有效地使用 Dart(更轻松、更快地实现多线程请求处理,构建更复杂的隔离区之间交互方案),但此包也可以在 Flutter 应用程序中使用。
Actor 系统
一个 actor 系统是一组以树状分层结构存在的 actor。在该包中,actor 系统由 ActorSystem 类表示。在使用它(创建 actor、发送消息等)之前,你需要对其进行初始化。在初始化期间,actor 系统将创建其操作所需的系统 actor。
在 actor 系统初始化期间创建的 actor
- 根 actor。actor 系统在初始化时创建的唯一 actor。它的独特之处在于它没有其他 actor 形式的父级,其父级和控制其生命周期的是 actor 系统。在启动时,它会创建两个 actor,一个系统守护者和一个用户守护者;
- 系统守护者。所有系统 actor 的祖先 actor;
- 用户守护者。用户创建的所有顶级 actor 的父 actor。
创建并初始化一个 actor 系统,创建一个测试 actor,并输出“Hello, world!”。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Print 'Hello, world!'
print('Hello, world!');
}
}
void main(List<String> arguments) async {
// Create actor system with name 'test_system'
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'test_actor'
await system.actorOf('test_actor', TestActor());
}
上面示例中创建的测试 actor 在 actor 系统中的绝对路径将是 “test_system/root/user/test_actor”。
ActorSystem 具有暂停、恢复和终止所有 actor 的方法。
dispose ActorSystem 方法会终止所有 actor,并关闭所有 Stream 并释放 actor 系统使用的所有资源。调用 dispose 方法后,无法再使用相同的 ActorSystem 实例。
如果你在终止 actor 系统中的所有 actor 后调用了 kill 方法,那么要继续使用相同的 ActorSystem 实例,你必须再次调用其 initialize 方法。但是,在这种情况下,所有顶级 actor 都必须重新创建。
Actor 树
在 Theater 中,actor 系统表示为 actor 的层次结构,这种结构称为 actor 树。
下图展示了 actor 树的结构
树中的 actor 分为 2 类
- 监督者 (supervisor)。监督者是可以创建自己的子 actor 的 actor(它们本身也反过来拥有一个监督者 actor);
- 被观察者 (observable)。被观察者 actor 是不能创建子 actor 的 actor。
监督者控制其子 actor 的生命周期(启动、终止、停止、恢复、重启),它们接收子 actor 中发生的错误消息,并根据既定策略(SupervisorStrategy)做出决策。你可以在[这里](#supervising-and-error-handling)阅读更多关于子 actor 错误处理的信息。
如果我们将这两类概念转换为更接近树结构的术语,这些类别可以称之为
- 监督者 actor 是树的节点;
- 被观察者 actor 是树的叶子。
actor 节点的特例是根 actor。这是一个拥有子 actor,但同时没有其他 actor 形式的监督者的 actor。它的监督者是 actor 系统本身。
Actor 类型
在 Theater 中,向用户提供了以下 actor 类型供使用
- 无类型 actor (Untyped actor)。一种没有特殊用途的通用 actor。可以接收和发送消息给其他 actor。可以创建子 actor;
- 路由器 (Routers)。根据既定的路由策略,在子 actor 之间路由传入请求的 actor;
- 池路由器 actor (Pool Router Actor)。一个路由器 actor,启动时会创建一个相同类型的 WorkerActor 池。你不能直接向其 Worker 池发送消息,所有对池的请求都只能通过它。它可以向其他 actor 发送消息,所有接收到的消息都会路由到它自己的 actor 池;
- 组路由器 actor (Group Router Actor)。一个路由器 actor。可以向其他 actor 发送消息,但它收到的所有消息都会路由给它的子 actor。它与 PoolRouterActor 的区别在于,可以直接向其子 actor 组发送消息,而不仅仅通过它;
- 工作 actor (Working actor)。一个工作 actor 在 actor 池中使用 PoolRouterActor,类似于 UntypedActor,但它不能创建子 actor,并且在工作方式上有一些内部差异。
路由
Actor 地址
Theater 中的消息路由与 actor 地址(即其路径)的概念密不可分。需要澄清的是,actor 的地址是唯一的,也就是说,不能存在两个具有相同地址的 actor。
actor 的绝对路径从 actor 系统的名称开始。如果讨论的是用户创建的 actor,actor 的路径还包括 actor 系统、根 actor 和用户守护者(user)的名称。
显示创建的顶级 actor 绝对路径的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
print(context.path);
}
}
void main(List<String> arguments) async {
// Create actor system
var actorSystem = ActorSystem('test_system');
// Initialize actor system before work with her
await actorSystem.initialize();
// Create top-level actor in actor system with name 'test_actor'
await actorSystem.actorOf('test_actor', TestActor());
}
预期输出
tcp://test_system/root/user/test_actor
在示例中,actor 的完整路径开头还有“tcp”。这意味着什么?目前正在开发通过网络(udp、tcp)在不同 Dart VM 中通信多个 actor 系统的能力。路径开头的 LLL 前缀将表示 actor 系统中用于其他 actor 系统通过网络进行通信的网络协议。
邮箱
每个 actor 在 Theater 中都有一个邮箱。邮箱是发送给 actor 的请求存放的地方。
邮箱分为 2 种类型
- 不可靠的;
- 可靠的。
不可靠邮箱
不可靠邮箱是没有交付确认的邮箱。每个 actor 默认都有一个不可靠邮箱。
可靠邮箱
可靠邮箱是带交付确认的邮箱。
交付确认意味着邮箱在向 actor 发送消息后,会等待来自 actor 的消息,以确认消息已交付。只有在收到确认后,邮箱才会向 actor 发送下一条消息。
actor 收到消息,我们指的是消息已经被接收,并且为其分配的处理程序已经启动,而不是所有分配给它的处理程序都已执行。
这会降低性能,但会增加流量,并提供一些额外的保证,即 actor 将收到发送给它的消息。由于流量的增加和发送额外消息、等待接收所浪费的时间,发送消息的速度会降低两倍以上。
在什么情况下 actor 可能收不到发送给它的消息?
如果 actor 在工作过程中被终止,他将不会处理发送给他的消息,直到它再次启动,并且这些消息当时在他的邮箱中。
但是,除此之外,每个 actor 级别还有其他内部手段,当 actor 被销毁时,可以不丢失发送给他的消息(它们会等待 actor 再次启动),使用带确认的邮箱是一种附加措施。
实际上,丢失消息的可能性微乎其微,在测试过程中未发现此类情况。
通常,使用经过身份验证的邮箱是可选的,并且会降低性能,但它允许实现优先级邮箱。
优先级邮箱
这是一种特殊类型的交付确认邮箱,你可以在其中设置消息的优先级。优先级决定了消息将被发送到 actor(其隔离区)的事件循环的顺序。
优先级通过 PriorityGenerator 类设置。
创建具有优先级邮箱的 actor(在此示例中,String 类型的消息优先级高于 int 类型的消息),向其发送消息。
// Create actor class
class TestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
// Set handler to all int type messages which actor received
context.receive<int>((message) async {
print(message);
});
}
// Override createMailboxFactory method
@override
MailboxFactory createMailboxFactory() => PriorityReliableMailboxFactory(
priorityGenerator: TestPriorityGenerator());
}
// Create priority generator class
class TestPriorityGenerator extends PriorityGenerator {
@override
int generatePriority(object) {
if (object is String) {
return 1;
} else {
return 0;
}
}
}
void main(List<String> arguments) async {
// Create actor system with name 'test_system'
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor' and get ref to it
var ref = await system.actorOf('test_actor', TestActor());
for (var i = 0; i < 5; i++) {
ref.send(i < 3 ? i : i.toString()); // Send messages 0, 1, 2, "3", "4"
}
}
在上面的例子中,向 actor 发送了 5 条消息——0、1、2、“3”、“4”。
预期输出
0
3
4
1
2
在输出中,你会注意到除了第一条消息之外,所有消息都按照其优先级被 actor 接收。这是因为第一条进入邮箱的消息在其余消息到达邮箱并重建邮箱中的优先级队列之前就已发送给 actor。
优先级邮箱的使用,就像交付邮箱一样,是可选的并且会降低性能,但将它们与不可靠邮箱结合使用可以平衡性能、可靠性和可用性。
发送消息
在 Theater 中,actor 可以通过其邮箱的链接相互发送消息。链接可以在创建 actor 时获得。但是,有一种方法可以在不引用其他 actor 的情况下通过其地址发送消息,否则在消息向上层级结构的情况下会很不方便。
通过链接发送
Actor 链接封装了 SendPort,用于向 actor 的邮箱发送消息。
可以通过使用 actor 系统创建顶级 actor,也可以通过 actor 上下文创建子 actor 来获取链接。
在这些示例中,我们使用 actor 系统创建一个顶级 actor 并获取其链接,然后向其发送消息。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
var ref = await system.actorOf('test_actor', TestActor());
// Send 'Hello, from main!' message to actor
ref.send('Hello, from main!');
}
在此示例中,我们使用 UntypedActor 上下文创建其子 actor,获取其链接并向其发送消息。
class FirstTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Create child with name 'second_test_actor'
var ref = await context.actorOf('second_test_actor', SecondTestActor());
// Send message
ref.send('Luke, I am your father.');
}
}
class SecondTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
if (message == 'Luke, I am your father.') {
print('Nooooooo!');
}
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'first_test_actor'
await system.actorOf('first_test_actor', FirstTestActor());
}
因此,你可以通过 actor 的链接向它们发送消息。如果需要,链接可以传递给其他 actor。
不通过链接发送
在 Theater 中,你可以使用 actor 链接向 actor 发送消息,该链接在你使用 actor 系统或通过 actor 上下文创建 actor 时获得。
然而,使用链接可能并不总是方便的,例如,在 actor 将消息发送给位于其上方 actor 树中的 actor 的情况下。
为了避免这种不便,Theater 有一种特殊类型的消息,其中包含收件人信息。当 actor 在其邮箱中收到此类消息时,它会验证其地址和消息中指定的地址。如果消息不是发送给它的,它会根据指定的地址将此消息沿 actor 树向上或向下传输。
要发送此类消息,你需要使用 actor 系统或 actor 上下文的 send 方法。有 2 种设置地址类型
- 绝对路径;
- 相对路径。
绝对路径是 actor 的完整路径,从 actor 系统的名称开始,例如——“test_system/root/user/test_actor”。
相对路径是相对于当前 actor 路径(通过 actor 上下文发送消息时)或相对于用户守护者(通过 actor 系统发送消息时)指定的路径。一个相对路径的示例,如果我们通过 actor 系统发送消息,并且 actor 的绝对路径是“test_system/root/user/test_actor”,那么相对路径是“../test_actor”。
使用 actor 系统通过绝对路径向 actor 发送消息的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
await system.actorOf('test_actor', TestActor());
// Send message to actor using absolute path
system.send('test_system/root/user/test_actor', 'Hello, from main!');
}
使用 actor 系统通过相对路径向 actor 发送消息的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
await system.actorOf('test_actor', TestActor());
// Send message to actor using relative path
system.send('../test_actor', 'Hello, from main!');
}
使用 actor 上下文通过绝对路径向 actor 层次结构中更高级别的 actor 发送消息的示例。
// Create first actor class
class FirstTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
// Create actor child with name 'test_child'
await context.actorOf('test_child', SecondTestActor());
}
}
// Create second actor class
class SecondTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Send message to parent using absolute path
context.send('test_system/root/user/test_actor', 'Hello, from child!');
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
await system.actorOf('test_actor', FirstTestActor());
}
使用 actor 上下文通过相对路径向子 actor 发送消息的示例。
// Create first actor class
class FirstTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Create actor child with name 'test_child'
await context.actorOf('test_child', SecondTestActor());
// Send message to child using relative path
context.send('../test_child', 'Hello, from parent!');
}
}
// Create second actor class
class SecondTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
await system.actorOf('test_actor', FirstTestActor());
}
接收消息
每个 actor 都可以接收和处理消息。要为 actor 分配特定类型消息的处理器,你可以使用 actor 上下文中的 receive 方法。可以为相同类型的消息分配多个处理器。
创建 actor 类并在启动时分配 String 和 int 类型消息接收处理程序的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print(message);
});
// Set handler to all int type messages which actor received
context.receive<int>((message) async {
print(message);
});
}
}
获取消息响应
当通过链接或不通过链接向 actor 发送消息时,可能需要接收消息的响应,这可以通过在消息本身中发送 SendPort 以进行响应来实现,或者提前在创建 actor 时向其发送一个特定的 SendPort。或者,在不使用链接通过绝对或相对路径发送消息时,你可能会错误地指定路径,这意味着消息将找不到其收件人,并且希望能够了解何时发生这种情况。Theater 为此提供了一种机制——MessageSubscription。
当你通过引用或使用路径发送消息时,你总是使用 actor 系统或 actor 上下文的 send 方法获取 MessageSubscription 实例。
使用 onResponse 方法,你可以分配一个处理程序来接收关于消息状态的响应。
可能的消息状态
- DeliveredSuccessfullyResult – 表示消息已成功传递给 actor,但他没有给你回复;
- RecipientNotFoundResult – 表示 actor 树中不存在此地址的 actor;
- MessageResult – 表示消息已成功传递,收件人已回复你的消息。
向 actor 发送消息并接收其响应的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
// Print message
print(message);
// Send message result
return MessageResult(data: 'Hello, from actor!');
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'hello_actor'
var ref = await system.actorOf('actor', TestActor());
// Send message 'Hello, from main!' to actor and get message subscription
var subscription = ref.send('Hello, from main!');
// Set onResponse handler
subscription.onResponse((response) {
if (response is MessageResult) {
print(response.data);
}
});
}
预期输出
Hello, from main!
Hello, from actor!
消息订阅封装了一个 ReceivePort,一个常规的消息订阅会在收到一条消息的结果后关闭其 ReceivePort。
但是,例如,在使用路由器 actor 时,你可能需要为每条消息接收来自不同 actor 的多个响应。或者如果你为相同类型的消息创建了多个处理程序,并且你期望从两个处理程序接收多个响应。
为此,你可以使用 asMultipleSubscription() 方法将 MessageSubscription 转换为 MultipleMessageSubscription。这样的订阅在收到第一条消息后不会关闭其 ReceivePort,但是,由于在订阅内部使用了 ReceivePort,这可能会创建一个不完全透明的情况,你需要在使用订阅的 cancel() 方法时自己关闭它——在你不再需要订阅时。
通过 Actor 系统监听消息
在 Theater 中,你可以轻松地从一个 actor 向另一个 actor 发送消息,发送或接收已发送消息的回复。但是,可能会出现一种情况,你希望在不向 actor 发送消息的情况下监听它们的消息。为此,actor 系统具有 Topic 这样的功能。
使用 ActorSystem 类,你可以订阅感兴趣的主题,以及该主题中特定类型的消息。
在此示例中,我们创建了两个 actor,订阅了主题“test_topic”中 String 类型的消息
// Create first actor class
class FirstTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Send message to actor system topic with name 'test_topic'
context.sendToTopic('test_topic', 'Hello, from first test actor!');
}
}
// Create second actor class
class SecondTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Send message to actor system topic with name 'test_topic'
context.sendToTopic('test_topic', 'Hello, from second test actor!');
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create handler to messages as String from topic with name 'test_topic'
system.listenTopic<String>('test_topic', (message) async {
print(message);
});
// Create top-level actor in actor system with name 'first_test_actor'
await system.actorOf('first_test_actor', FirstTestActor());
// Create top-level actor in actor system with name 'second_test_actor'
await system.actorOf('second_test_actor', SecondTestActor());
}
预期输出
Hello, from first test actor!
Hello, from second test actor!
在此示例中,我们订阅了几个不同的主题,并发布了来自主题“first_test_topic”的消息的回复
// Create first actor class
class FirstTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Send message to actor system topic with name 'first_test_topic' and get subscription to response
var subscription =
context.sendToTopic('first_test_topic', 'This is String');
// Set handler to response
subscription.onResponse((response) {
if (response is MessageResult) {
print(response.data);
}
});
}
}
// Create second actor class
class SecondTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Send message to actor system topic with name 'second_test_topic'
context.sendToTopic('second_test_topic', 123.4);
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create handler to messages as String from topic with name 'first_test_topic'
system.listenTopic<String>('first_test_topic', (message) async {
print(message);
return MessageResult(data: 'Hello, from main!');
});
// Create handler to messages as double from topic with name 'second_test_topic'
system.listenTopic<double>('second_test_topic', (message) async {
print(message * 2);
});
// Create top-level actor in actor system with name 'first_test_actor'
await system.actorOf('first_test_actor', FirstTestActor());
// Create top-level actor in actor system with name 'second_test_actor'
await system.actorOf('second_test_actor', SecondTestActor());
}
预期输出
This is String
Hello, from main!
246.8
路由器
Theater 中有一种特殊类型的 actor——路由器。
这些 actor 根据其指定的部署策略创建子 actor。根据其指定的消息路由策略将所有发送给它们的消息转发给其子 actor。此类 actor 的主要目的是通过在 actor 之间平衡消息来创建。
Theater 中有两种类型的路由器 actor
- 组路由器 (Group router);
- 池路由器 (Pool router)。
组路由器
组路由器是一种路由器,它创建一组节点 actor 作为子 actor(即 UntypedActor 或其他路由器可以作为此组中的 actor)。与池路由器不同,它允许你直接向其子 actor 发送消息,也就是说,不必仅通过路由器向它们发送消息。
具有以下消息路由策略
- 广播 (Broadcast)。路由器收到的消息被转发到其组中的所有 actor;
- 随机 (Random)。路由器收到的消息被转发到其组中的一个随机 actor;
- 轮询 (Round robin)。路由器收到的消息以循环方式发送给其组中的 actor。也就是说,如果收到 3 条消息,而 actor 组中有 2 个 actor,则第 1 条消息将由 actor 1 接收,第 2 条消息由 actor 2 接收,第 3 条消息由 actor 1 接收。
使用广播路由策略的组路由器示例。
// Create first test actor class
class FirstTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Create router actor
await context.actorOf('test_router', TestRouter());
// Send message to router
context.send('../test_router', 'Second hello!');
// Send message to second without router
context.send('../test_router/second_test_actor', 'First hello!');
}
}
// Create router class
class TestRouter extends GroupRouterActor {
// Override createDeployementStrategy method, configurate group router actor
@override
GroupDeployementStrategy createDeployementStrategy() {
return GroupDeployementStrategy(
routingStrategy: GroupRoutingStrategy.broadcast,
group: [
ActorInfo(name: 'second_test_actor', actor: SecondTestActor()),
ActorInfo(name: 'third_test_actor', actor: ThirdTestActor())
]);
}
}
// Create second test actor class
class SecondTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print('Second actor received message: ' + message);
});
}
}
// Create third test actor class
class ThirdTestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print('Third actor received message: ' + message);
});
}
}
void main(List<String> arguments) async {
// Create actor system
var actorSystem = ActorSystem('test_system');
// Initialize actor system before work with her
await actorSystem.initialize();
// Create top-level actor in actor system with name 'hello_actor'
await actorSystem.actorOf('first_test_actor', FirstTestActor());
}
预期输出
Second actor received message: Second hello!
Third actor received message: Second hello!
Second actor received message: First hello!
示例中创建的 actor 系统中的 actor 树结构
从示例中可以看出,我们创建了一个名为“first_test_actor”的 actor,它创建了一个名为“test_router”的路由器 actor,其中包含 2 个 actor,并发送了 2 条消息。第一条消息发送给路由器(后来发送给其组中的所有 actor),第二条消息仅发送给名为“second_test_actor”的 actor。
池路由器
池路由器是一种路由器,它将分配给它的相同类型的工作 actor 创建为一个池作为子 actor。与路由器不同,组不允许直接访问其池中的工作 actor,即只能通过路由器根据分配的路由策略向它们发送消息。
什么是工作 actor?工作 actor 是一种特殊的 actor,用于池路由器中。通常,该 actor 类似于 UntypedActor,但它不能创建子 actor,并且其内部工作方式也存在差异。
内部工作差异表现在:工作 actor 在处理完每条消息后,完成所有分配给该消息的处理程序后,会向其 actor 管理器发送一份报告消息。这在使用池路由器时会产生额外的流量,但允许你使用自己的路由策略,从而更有效地平衡池中 actor 和工作器之间的负载。
具有以下消息路由策略
- 广播 (Broadcast)。路由器收到的消息被转发到其组中的所有 actor;
- 随机 (Random)。路由器收到的消息被转发到其组中的一个随机 actor;
- 轮询 (Round robin)。路由器收到的消息以循环方式发送给其组中的 actor。也就是说,如果收到 3 条消息,而 actor 组中有 2 个 actor,则第 1 条消息将由 actor 1 接收,第 2 条消息由 actor 2 接收,第 3 条消息由 actor 1 接收;
- 负载均衡 (Load balancing)。平衡池中工作器之间的负载,同时考虑每个工作器中包含的未处理消息数量。
使用随机路由策略创建池路由器的示例。
// Create actor class
class TestActor extends UntypedActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Create router actor and get ref to him
var ref = await context.actorOf('test_router', TestRouter());
for (var i = 0; i < 5; i++) {
// Send message to pool router
ref.send('Hello message №' + i.toString());
}
}
}
// Create pool router class
class TestRouter extends PoolRouterActor {
// Override createDeployementStrategy method, configurate group router actor
@override
PoolDeployementStrategy createDeployementStrategy() {
return PoolDeployementStrategy(
workerFactory: TestWorkerFactory(),
routingStrategy: PoolRoutingStrategy.random,
poolSize: 5);
}
}
// Create actor worker class
class TestWorker extends WorkerActor {
@override
Future<void> onStart(UntypedActorContext context) async {
// Set handler to all String type messages which actor received
context.receive<String>((message) async {
print('Received by the worker with path: ' +
context.path.toString() +
', message: ' +
message);
});
}
}
// Create worker factory class
class TestWorkerFactory extends WorkerActorFactory {
@override
WorkerActor create() => TestWorker();
}
void main(List<String> arguments) async {
// Create actor system
var actorSystem = ActorSystem('test_system');
// Initialize actor system before work with her
await actorSystem.initialize();
// Create top-level actor in actor system with name 'test_actor'
await actorSystem.actorOf('test_actor', TestActor());
}
示例中创建的 actor 系统中的 actor 树结构
可能的输出结果之一
Received by the worker with path: tcp://test_system/root/user/test_actor/test_router/worker-1, message: Hello message №1
Received by the worker with path: tcp://test_system/root/user/test_actor/test_router/worker-2, message: Hello message №0
Received by the worker with path: tcp://test_system/root/user/test_actor/test_router/worker-4, message: Hello message №2
Received by the worker with path: tcp://test_system/root/user/test_actor/test_router/worker-2, message: Hello message №3
Received by the worker with path: tcp://test_system/root/user/test_actor/test_router/worker-1, message: Hello message №4
监督和错误处理
在 Theater 中,除了根 actor 之外,每个 actor 都有一个父 actor,负责管理其生命周期并处理来自它的错误,并且每个拥有子 actor 的 actor 都充当其子 actor 的监督者。
每个监督者都有一个控制策略(SupervisorStrategy),它处理从子 actor 接收到的错误,并根据子 actor 中发生的异常,接收到关于如何处理它的指令。
指令类型
- 恢复 (Resume);
- 重启 (Restart);
- 暂停 (Pause);
- 终止 (Kill);
- 将错误发送给监督者 actor (escalate)。
策略分为 2 种类型
- 一对一 (OneForOne);
- 一应俱全 (OneForAll)。
这两种策略的区别在于,OneForOne 策略将收到的指令应用于发生错误的 actor,而 OneForAll 策略将指令应用于做出此决定的 actor 的所有子 actor。OneForAll 策略在 actor 有几个子 actor,它们的关联非常紧密,并且其中一个出现错误应该导致应用于所有这些子 actor 的决策的情况下可能很有用。
默认情况下,每个监督者都采用 OneForOne 策略,将错误传递给上游监督者。当错误到达用户的守护者时,它也会将其传递给根 actor,根 actor 又将错误传递给 actor 系统,actor 系统会杀死所有 actor 并抛出异常,显示错误通过的所有 actor 的堆栈跟踪。
使用 OneForOne 策略处理错误的示例
// Create first actor class
class FirstTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Create child actor with name 'second_test_actor'
await context.actorOf('second_test_actor', SecondTestActor());
}
// Override createSupervisorStrategy method, set decider and restartDelay
@override
SupervisorStrategy createSupervisorStrategy() => OneForOneStrategy(
decider: TestDecider(), restartDuration: Duration(milliseconds: 500));
}
// Create decider class
class TestDecider extends Decider {
@override
Directive decide(Exception exception) {
if (exception is FormatException) {
return Directive.restart;
} else {
return Directive.escalate;
}
}
}
// Create second actor class
class SecondTestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
print('Hello, from second test actor!');
// Someone random factor or something where restarting might come in handy
if (Random().nextBool()) {
throw FormatException();
}
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'first_test_actor'
await system.actorOf('first_test_actor', FirstTestActor());
}
在此示例中,actor 树及其在发生错误时的情况可以表示如下
实用工具
调度器
调度器是一个类,它使创建一些需要一段时间后重复执行的任务更加方便。每个 actor 上下文都有自己的调度器实例,但你可以自己创建自己的调度器实例。
在 Dart 中,使用 Timer 可以很容易地实现定期执行的任务,因此在 Theater 中,调度器只是为此提供了一个方便的抽象。例如,Theater 调度器允许你使用 CancellationToken 一次性取消多个任务。
目前,调度器正在开发中,计划向其中添加例如将取消令牌传输到其他 actor 的功能(目前尚不可行),这将允许从其他 actor 取消计划任务。
使用调度器创建任务、使用取消令牌在 3 秒后取消计划任务的示例。
// Create actor class
class TestActor extends UntypedActor {
// Override onStart method which will be executed at actor startup
@override
Future<void> onStart(UntypedActorContext context) async {
// Create cancellation token
var cancellationToken = CancellationToken();
// Create first repeatedly action in scheduler
context.scheduler.scheduleActionRepeatedly(
interval: Duration(seconds: 1),
action: () {
print('Hello, from first action!');
},
cancellationToken: cancellationToken);
// Create second repeatedly action in scheduler
context.scheduler.scheduleActionRepeatedly(
initDelay: Duration(seconds: 1),
interval: Duration(milliseconds: 500),
action: () {
print('Hello, from second action!');
},
cancellationToken: cancellationToken);
Future.delayed(Duration(seconds: 3), () {
// Cancel actions after 3 seconds
cancellationToken.cancel();
});
}
}
void main(List<String> arguments) async {
// Create actor system
var system = ActorSystem('test_system');
// Initialize actor system before work with her
await system.initialize();
// Create top-level actor in actor system with name 'test_actor'
await system.actorOf('test_actor', TestActor());
}
预期输出
Hello, from first action!
Hello, from second action!
Hello, from first action!
Hello, from second action!
Hello, from second action!
路线图
目前正在开发中
- 通过网络(udp、tcp)与位于其他 Dart VM 中的 actor 系统进行通信;
- 改进消息路由系统(更多功能,必要时进行优化);
- 改进错误处理系统,错误日志记录。




