原文 工具:
core.Thread
std.concurrency.spawn
std.parallelism.task / taskPool
vibe.d.runTask
etc.
代码:
auto fun() {
return spawn(...);
}
void gun() {
auto task = fun();
// ...
join(task);
}
//
auto fun() {
return new ...;
}
void gun() {
auto task = fun();
// ...
delete task;
}
这不是,结构化
的并发.
所有权和生命期
每个异步计算
都需要有个所有者
所有者需要比它拥有
的所有异步计算
生命更长
错误
必须向上传播到所有者链
异步计算
应该支持取消
异步计算
需要一种方法来:
表示正常完成
(返回值)
表示错误(抛出
异常)
表示取消
标准C++
异步模型,基于三个关键抽象
:调度器,发送器和接收器
,及一组可定制的异步算法
.
发送者
/// 发送T类型单值的发送器
struct ValueSender(T) {
alias Value = T;
T value;
static struct OperationalState(Receiver) {
Receiver receiver;
T value;
void start() @safe nothrow {
receiver.setValue(value); // 用返回值完成
}
}
auto connect(Receiver)(return Receiver receiver) @safe scope return {
// 确保 NVRO
auto op = OperationalState!(Receiver)(receiver, value);
return op;
}
}
发送者与接受者
,通过另一线程:
auto just(T t) @safe {
return ValueSender!T(t);
}
void main() @safe {
auto w = just(42).via(ThreadSender).syncWait();
assert(w.value == 42);
}
调度器:
void main() @safe {
auto pool = stdTaskPool(2);
auto w = just(42)
.on(pool.getScheduler)
.syncWait();
assert(w.value == 42);
}
//当所有
auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }
void foobar() @safe {
whenAll(fooSender, barSender)
.syncWait(); // what ought to happen if either `fooSender` or `barSender` fails?
}
重试
算法:
auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }
auto foobar() @safe {
return whenAll(fooSender.retry(MaxTimes(3)), barSender);
}
void fun() @safe {
foobar().syncWait(); // What ought to happen if `barSender` fails?
}
竞争
算法:
auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }
auto foobar() @safe {
return whenAll(fooSender.retry(MaxTimes(3)), barSender);
}
void fun() @safe {
foobar().timeout(10.secs).syncWait();
}
auto timeout(Sender)(Sender s, Duration d) @safe {
return race(s, delay(duration));
}
流:
void main() @safe {
intervalStream(1.secs)
.collect(() => writeln("Hello World!"))
.syncWait();
}
//
void main() @safe {
auto w = intervalStream(1.secs, true)
.scan((int i) => i + 1, 0)
.filter((int i) => i % 2)
.take(5)
.toList()
.syncWait();
assert(w.value == [0, 2, 4, 6, 8]);
}
多生产单消费
队列示例:
final class MPSCQueue(Node) {
// [...]
@safe nothrow @nogc shared
bool push(Node* n) { */ ... */ }
@safe nothrow @nogc
Node* pop() { /* ... */ }
alias Producer = shared(MPSCQueueProducer!Node);
@trusted nothrow @nogc
Producer producer() {
return Producer(cast(shared)this);
}
// [...]
}
但,生产者是共享的.
struct MPSCQueueProducer(Node) {
private shared MPSCQueue!(Node)* q;
@safe nothrow @nogc shared
void push(Node* node) {
q.push(node);
}
}
rest示例
:
struct Application {
@Path("/") shared @safe
auto getXyz() { /* ... */ }
}
void main() @safe {
auto pool = stdTaskPool(16);
auto app = shared Application();
auto listener = HttpListener("0.0.0.0", 8080);
listener
.handleRequests(app)
.withScheduler(pool.getScheduler())
.syncWait().assumeOk;
}
序化器:
auto mySerializer = new Serializer(1);
auto fun() @safe {
return sequence(
step1(),
step2().on(mySerializer),
step3()
);
}
未来
有些地方需要更多的线程安全
嵌套流
中存在令人讨厌的背压问题
集成纤程
集成,集成事件循环(liburing,kqueue,IOCP
等)
工作窃取
调度程序
实现更多的异步算法
实现HTTP
服务器
教程/指南
如何编写自定义发送者,
任务本地GC
?等等