0
点赞
收藏
分享

微信扫一扫

d的跟踪.

​​这里​​​ :需要手动初化.​​__gshared​​是正确的方法吗?
这是​​线程本地存储​​用途.正如D运行时对​​dmd​​的​​-profile​​命令行开关那样,可拥有模块级​​Appender​​,它为该线程​​收集数据​​而无需担心​​竞争条件​​.
:解锁互斥锁?
仅当​​每个线程​​退出并​​组合​​结果时,才需要​​互斥锁​​.每个​​static ~this()​​都可获取​​互斥锁​​并添加​​其结果​​到全局​​Appender​​.而该​​"全局"Appender​​是​​shared或__gshared​​不重要.
​:__gshared​​并不在分析器上放​​共享​​类型属性.正确.
高效无锁​​appender​​会很酷.

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
@disable this(this);
@disable void opAssign(ref const(typeof(this)));

SpinLock * lock;

this(shared(SpinLock) * lock) {
this.lock = lock;
lock.lock();
}

~this() {
lock.unlock();
}
}

struct Collector {
long[] data;

shared(SpinLock) lock;

auto scopeLock() shared {
return ScopeLock(&lock);
}

// 加`收集器`的数据指针
void add(long i) shared {
auto sl = scopeLock();

/// 加针方法,读代码
data ~= i;
}

// 添加`此收集器的数据`到`指定数组`中,
void aggregate(ref long[] where) shared {
auto sl = scopeLock();

where ~= data.sum;
data.length = 0;
(cast(long[])data).assumeSafeAppend();
}
}

// 帮助我们信任代码的变量,主结束时打印
long allThatHasBeenDumped = 0;
// 仅用于验证代码
shared long allCollectedByThreads;

synchronized class Dumper {
private:
shared(Collector)*[] collectors;

void register(shared(Collector) * collector) shared {
writeln("注册 ", collector);
collectors ~= collector;
}

// 转储当前结果
void dump(File output) shared {
long[] data;

foreach (collector; collectors) {
collector.aggregate(data);
}

const allData = data.sum;

if (allData != 0) {
stdout.writefln!"收集了:%-(\n %,s%)"(data);
allThatHasBeenDumped += allData;
}
}
}

shared(Dumper) dumper;

shared static this() {
writeln("造转储器");
dumper = new Dumper();
}

shared(Collector) * collector;

static this() {
writeln("造收集器");
collector = new shared(Collector)();
dumper.register(cast(shared)collector);
}

// 主线程
void doWork() {
try {
doWorkImpl();

} catch (Throwable exc) {
stderr.writeln("抓可抛:", exc.msg);
}
}

// 每个线程的实现
void doWorkImpl() {
auto sw = StopWatch();
sw.start();

long i = 0;
while (sw.peek < threadRunTime) {
(cast(shared)collector).add(i);
++i;
}

--i;
auto total = i * (i + 1) / 2;
writefln("收集了%s,%s,%s",i,total,collector);

atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
writeln("主开始");
iota(workerCount).each!(_ => spawn(&doWork));

auto sw = StopWatch();
sw.start();

while (sw.peek < mainRunTime) {
dumper.dump(stdout);
Thread.sleep(100.msecs);
}

//最终收集(并转储)
dumper.dump(stdout);

assert(allThatHasBeenDumped == allCollectedByThreads);
}


举报

相关推荐

0 条评论