0
点赞
收藏
分享

微信扫一扫

d结构并发


​​原文​​ 工具:

core.Thread
std.concurrency.spawn
std.parallelism.task / taskPool
vibe.d.runTask
etc.

代码:

auto fun() {
return spawn(...);
}

void gun() {
auto task = fun();
// ...
join(task);
}
//
auto fun() {
return new ...;
}

void gun() {
auto task = fun();
// ...
delete task;
}

这不是,​​结构化​​的并发.

所有权和生命期

每个​​异步计算​​​都需要有个​​所有者​​​ 所有者需要比它​​拥有​​的所有​​异步计算​​生命更长
​错误​​必须向上传播到所有者链

​异步计算​​​应该支持​​取消​​​​异步计算​​需要一种方法来:
表示​​正常完成​​(返回值)
表示错误(​​抛出​​异常)
表示​​取消​

标准​​C++​​​异步模型,基于三个​​关键抽象​​​:​​调度器,发送器和接收器​​​,及一组可定制的​​异步算法​​.

发送者

/// 发送T类型单值的发送器
struct ValueSender(T) {
alias Value = T;
T value;
static struct OperationalState(Receiver) {
Receiver receiver;
T value;
void start() @safe nothrow {
receiver.setValue(value); // 用返回值完成
}
}
auto connect(Receiver)(return Receiver receiver) @safe scope return {
// 确保 NVRO
auto op = OperationalState!(Receiver)(receiver, value);
return op;
}
}

​发送者与接受者​​,通过另一线程:

auto just(T t) @safe {
return ValueSender!T(t);
}

void main() @safe {
auto w = just(42).via(ThreadSender).syncWait();
assert(w.value == 42);
}

调度器:

void main() @safe {
auto pool = stdTaskPool(2);

auto w = just(42)
.on(pool.getScheduler)
.syncWait();

assert(w.value == 42);
}
//当所有
auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }

void foobar() @safe {
whenAll(fooSender, barSender)
.syncWait(); // what ought to happen if either `fooSender` or `barSender` fails?
}

​重试​​算法:

auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }

auto foobar() @safe {
return whenAll(fooSender.retry(MaxTimes(3)), barSender);
}

void fun() @safe {
foobar().syncWait(); // What ought to happen if `barSender` fails?
}

​竞争​​算法:

auto fooSender() @safe { /* ... */ }
auto barSender() @safe { /* ... */ }

auto foobar() @safe {
return whenAll(fooSender.retry(MaxTimes(3)), barSender);
}

void fun() @safe {
foobar().timeout(10.secs).syncWait();
}

auto timeout(Sender)(Sender s, Duration d) @safe {
return race(s, delay(duration));
}

流:

void main() @safe {
intervalStream(1.secs)
.collect(() => writeln("Hello World!"))
.syncWait();
}
//
void main() @safe {
auto w = intervalStream(1.secs, true)
.scan((int i) => i + 1, 0)
.filter((int i) => i % 2)
.take(5)
.toList()
.syncWait();

assert(w.value == [0, 2, 4, 6, 8]);
}

​多生产单消费​​队列示例:

final class MPSCQueue(Node) {
// [...]

@safe nothrow @nogc shared
bool push(Node* n) { */ ... */ }

@safe nothrow @nogc
Node* pop() { /* ... */ }

alias Producer = shared(MPSCQueueProducer!Node);

@trusted nothrow @nogc
Producer producer() {
return Producer(cast(shared)this);
}

// [...]
}

但,生产者是共享的.

struct MPSCQueueProducer(Node) {
private shared MPSCQueue!(Node)* q;

@safe nothrow @nogc shared
void push(Node* node) {
q.push(node);
}
}

​rest示例​​:

struct Application {
@Path("/") shared @safe
auto getXyz() { /* ... */ }
}

void main() @safe {
auto pool = stdTaskPool(16);
auto app = shared Application();
auto listener = HttpListener("0.0.0.0", 8080);
listener
.handleRequests(app)
.withScheduler(pool.getScheduler())
.syncWait().assumeOk;
}

序化器:

auto mySerializer = new Serializer(1);

auto fun() @safe {
return sequence(
step1(),
step2().on(mySerializer),
step3()
);
}

未来

有些地方需要更多的​​线程安全​​​​嵌套流​​中存在令人讨厌的背压问题
集成​​纤程​​集成,集成事件循环​​(liburing,kqueue,IOCP​​等)
​工作窃取​​调度程序
实现更多的异步算法
实现​​HTTP​​服务器
​教程/指南​​如何编写自定义发送者,
任务本地​​GC​​?等等


举报

相关推荐

0 条评论