文章目录
- 前言
- 基本接口使用
- fdb_setup_network 实现
- fdb_run_network 实现
- fdb_create_database 实现
- fdb_database_create_transaction 实现
- fdb_transaction_set_option 实现
- fdb_transaction_commit 实现
- fdb_transaction_on_error 实现
- tenant 相关的接口实现
- 结语
前言
😐 八九月份实在是太忙了,大量的周末时间被消耗在学车上 😦 (给大学补课),周内的时间基本都在处理工作项目上的问题,没有集中时间好好得梳理这段时间学习到的东西,利用国庆假期补补课吧…
本文主要介绍一些foundationdb client端实现的一些细节,能帮助我们快速掌握api接口层的使用方式以及相关参数的影响范围,并且简单介绍一下flow中的异步编程框架如何在fdb内部使用。
foundationdb 其实在国内使用非常小众,虽然其发表了两篇顶会且是2021年的best paper,但是其实现复杂且易用性较低,实在难以像单机的rocksdb这样的kv引擎被大众所接纳。
复杂度除了本身架构设计上的众多角色之外 还有其贯穿整个代码体系的异步编程模式,从api层开始直到os的aio 都是全链路异步,这样的代码实现下如果不了解底层原理,很容易出现使用不当导致的应用问题且极难排查;所以foundationdb是真小众,但是其作为snowflake的元数据存储,且大规模应用在apple内部的元数据存储需求,还是值得研究学习的。
本文涉及到的源代码版本:7.1.9
,且 fdbclient 代码行数在不包含测试的情况下已经有7.3w 行了。
基本接口使用
展示的是c-api的接口,后面介绍的实现都是 C++ 的核心实现。
所有的c-api都在fdb_c.h
,安装fdb package之后会默认放在/usr/include/foundationdb
下面。
基本接口的使用不会在这里描述,官方文档已经非常全面了 fdb C-API,直接看一个简单连接客户端的例子,如下:
#define FDB_API_VERSION 710
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fdb_c.h>
#include <unistd.h>
FDBTenant *fdb_tenant = NULL;
FDBTransaction *tr = NULL;
FDBDatabase *db = NULL;
pthread_t netThread;
static void checkError(fdb_error_t errorNum) {
if (errorNum) {
fprintf(stderr, "Error (%d): %s\n", errorNum, fdb_get_error(errorNum));
exit(errorNum);
}
}
static void waitAndCheckError(FDBFuture *future) {
checkError(fdb_future_block_until_ready(future));
if (fdb_future_get_error(future) != 0) {
checkError(fdb_future_get_error(future));
}
}
static void runNetwork() { checkError(fdb_run_network()); }
void createDataInDatabase() {
int committed = 0;
/* Create transaction. */
checkError(fdb_database_create_transaction(db, &tr));
while (!committed) {
/* Create data */
char *key1 = "Test Key1";
char *val1 = "Test Value1";
fdb_transaction_set(tr, key1, (int)strlen(key1), val1, (int)strlen(val1));
/* Commit to database.*/
FDBFuture *commitFuture = fdb_transaction_commit(tr);
checkError(fdb_future_block_until_ready(commitFuture));
if (fdb_future_get_error(commitFuture) != 0) {
waitAndCheckError(
fdb_transaction_on_error(tr, fdb_future_get_error(commitFuture)));
} else {
committed = 1;
}
fdb_future_destroy(commitFuture);
}
/* Destroy transaction. */
fdb_transaction_destroy(tr);
}
void createTenantDataAndReadData() {
char *tenant_name = "example";
FDBTransaction *tr = NULL;
FDBTransaction *tr2 = NULL;
fdb_bool_t valuePresent;
int valueLength;
const uint8_t *value = NULL;
const char *k1 = "tenant key1";
const char *v1 = "tenant value1";
checkError(fdb_database_open_tenant(db, (uint8_t const *)tenant_name,
strlen(tenant_name), &fdb_tenant));
checkError(fdb_tenant_create_transaction(fdb_tenant, &tr));
fdb_transaction_set(tr, k1, (int)strlen(k1), v1, (int)strlen(v1));
FDBFuture *commitFuture = fdb_transaction_commit(tr);
checkError(fdb_future_block_until_ready(commitFuture));
if (fdb_future_get_error(commitFuture) != 0) {
waitAndCheckError(
fdb_transaction_on_error(tr, fdb_future_get_error(commitFuture)));
}
fdb_transaction_destroy(tr);
fdb_future_destroy(commitFuture);
checkError(fdb_tenant_create_transaction(fdb_tenant, &tr2));
FDBFuture *getFuture = fdb_transaction_get(tr2, k1, (int)strlen(k1), 0);
waitAndCheckError(getFuture);
checkError(
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength));
printf("Get value from tenant , key : %s, value : %s, value-len : %d\n", k1,
value, valueLength);
fdb_transaction_destroy(tr2);
fdb_future_destroy(getFuture);
}
void readDataFromDatabase() {
FDBTransaction *tr = NULL;
const uint8_t *value = NULL;
fdb_bool_t valuePresent;
int valueLength;
char *key = "Test Key1";
checkError(fdb_database_create_transaction(db, &tr));
FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
waitAndCheckError(getFuture);
checkError(
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength));
printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
fdb_transaction_destroy(tr);
fdb_future_destroy(getFuture);
}
int main() {
/* Default fdb cluster file. */
char *cluster_file = "/etc/foundationdb/fdb.cluster";
/* Setup network. */
checkError(fdb_select_api_version(FDB_API_VERSION));
checkError(fdb_setup_network());
puts("Created network.");
pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
checkError(fdb_create_database(cluster_file, &db));
puts("Created database.");
/*Create tenant and do nothing.*/
createDataInDatabase();
readDataFromDatabase();
createTenantDataAndReadData();
puts("Program done. Now exiting...");
fdb_tenant_destroy(fdb_tenant);
fdb_tenant = NULL;
fdb_database_destroy(db);
db = NULL;
checkError(fdb_stop_network());
int rc = pthread_join(netThread, NULL);
if (rc)
fprintf(stderr, "ERROR: network thread failed to join\n");
exit(0);
}
编译的话需要连接 -lfdb_c -lpthread
两个库,因为 fdbclient的代码都在 libfdb_c.so
之中,用作用户和fdb集群进行交互。
main函数中能看到,想要进行k/v的读写,需要先执行:
// 选择api版本,因为代码中有一些api仅在指定的版本下才能支持 (租户相关的api)
checkError(fdb_select_api_version(FDB_API_VERSION));
// 启动异步处理相关的线程
checkError(fdb_setup_network());
// 创建一个执行 fdb_run_network函数的线程,用于进程退出时完成正在异步处理的请求
pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
// 创建数据库对象,如果是多租户,后面还需要有创建访问租户的对象
checkError(fdb_create_database(cluster_file, &db));
后续就可以进行transaction相关的读写操作了。
1. fdb_database_create_transaction
2. fdb_transaction_set
3. fdb_transaction_commit
4. fdb_future_block_until_ready
5. fdb_future_get_error
6. fdb_future_destroy
7. fdb_transaction_destroy
一条kv从写入事务到完成提交,获取提交之后的状态,需要上面这个七个接口的参与…很麻烦。
fdb_setup_network 实现
在fdb_select_api_version
中指定了 >=14 版本的 一些接口被变更为 #_impl
这样的实现。
extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version, int header_version) {
...
FDB_API_CHANGED(fdb_setup_network, 14);
...
}
#define FDB_API_CHANGED(func, ver) \
if (header_version < ver) \
fdb_api_ptr_##func = (void*)&(func##_v##ver##_PREV); \
else if (fdb_api_ptr_##func == (void*)&fdb_api_ptr_unimpl) \
fdb_api_ptr_##func = (void*)&(func##_impl);
即 fdb_setup_network的入口是在 fdb_setup_network_impl
之中,调用C++ api。
fdb_error_t fdb_setup_network_impl() {
// API 是 MultiVersionApi::api,static成员,标识一个进程
// 无论调用多少次setup_network 只会启动一个,多次调用会返回失败。
CATCH_AND_RETURN(API->setupNetwork(););
}
简单提一下fdbclient 层的API抽象:
MultiVersionApi # 最上层
---> ThreadSafeApi # client底层api,用于和fdb cluster进行交互
所以启动过程会先进入 MultiVersionApi::setupNetwork
,正常初始化的话会进入 ThreadSafeApi::setupNetwork
。
这里需要强调一下,这个函数字面意思是启动fdbclient和 集群的网络链接,但本质上并没有和cluseter的coordinator 有任何的网络交互。
验证的话可以通过tcpdump 监听的fdb.cluster 中的ip:port即可。
sudo tcpdump -i lo host 127.0.0.1 and port 4500 -vv -s0 -nn
因为我的server是在本地启动的,所以需要监听lo网口,-vv抓取整个网络包且 -nn 指定包展示完整的ip-port信息,不展示域名/hostname信息。
fdb_setup_network
做的事情如下几点:
- 检测
networkStartup
标识,已经初始化过一次的就会返回 2009 -- network_already_setup
错误。 - 初始化
localClient
,其就是更下一层的 client api -ThreadSafeApi
。 - 执行localClient->api->setupNetwork ,即
ThreadSafeApi::setupNetwork
- 在
ThreadSafeApi::setupNetwork
中:
- 先 check 是否已经启动过了,是则返回2009错误
- 设置默认的 网络相关的 knob options ,如果在调用当前
fdb_setup_network
接口之前通过fdb_network_set_option
设置了网络相关的 knob option,则这里会完成对应option的填充,否则就用默认的。(option 的name和value 是保存在std::map中,这里只是取值) - 初始化
g_network
,默认是初始化 N2::Net2
- 添加
fdb_stop_network
底层要执行的函数到g_network的callback中 - 初始化
FlowTransport
,并放在g_network
中,提供了用于传输用户请求到fdb-cluster的接口 - 创建
Net2FileSystem
,让g_network
能够对外提供一些读写文件的能力,比如客户端的trace-log的持久化,就是通过g_network
的文件系统对象完成的。
需要注意的是在 setupNetwork
执行过程中可能遇到 1510-io_error
问题,你没看错,就是io_error
。
这里是一个比较坑的地方,算是 fdbclient 的一个bug,你的fdb_setup_network
的接口会收到一个1510的错误码,具体背景在这个issue里面 fdb_setup_network get io-error without trace-log。就是因为linux 内核 配置上的一些问题,我们在fdb_setup_network 接口中收到1510的 io-error 错误码,第一反应应该是磁盘相关的一些问题,而且肯定是fdbserver 返回的(存储引擎写某一个盘写满了,或者有坏盘),但是server这里并没有任何trace-log的日志。是花了很长时间调试了fdbserver,最后就是结合fdb_setup_network的代码以及前面提到的tcpdump抓包来确认该 io-error 是在客户端层面的。但是客户端又没有什么日志爆出,最后在Net2FileSystem
代码中初始化 AsyncFileKAIO
确认了这个问题。
AsyncFileKAIO::init(Reference<IEventFD>(N2::ASIOReactor::getEventFD()), ioTimeout);
static void AsyncFileKAIO::init(Reference<IEventFD> ev, double ioTimeout) {
...
// 因为超过os 的 aio-max-nr 配置,可能会返回io-error
int rc = io_setup(FLOW_KNOBS->MAX_OUTSTANDING, &ctx.iocx);
if (rc < 0) {
// 这里并不会持久化到trace-log文件,因为此时并没有完成 AsyncFileKAIO的初始化,
TraceEvent("IOSetupError").GetLastError();
throw io_error();
}
setTimeout(ioTimeout);
ctx.evfd = ev->getFD();
poll(ev);
g_network->setGlobal(INetwork::enRunCycleFunc, (flowGlobalType)&AsyncFileKAIO::launch);
}
为什么会有aio 的 io_setup
接口报 io-error的问题,是我们单机启动的集群过多,每一次调用io_setup
接口会提交一部分请求到aio调度队列,os默认允许的队列中最大的请求数为 aio-max-nr = 65536
,我们多集群在一小段时间内启动就会有超过 aio-max-nr的情况。
最坑的地方在于这里并没有log文件,如果不看代码,基本发现不了这个问题(根本不知道fdbclient 还有可能返回io-error的地方)。
总结来说,关于fdb的api使用上遇到的相关问题 或者 其他类似nosql 系统的api的问题,排查方式建议如下:
- 缩小问题范围,需明确区分该错误是cluster返回的 还是client 返回的:直接的方式就是抓包。
- 开启client 或者 fdbserver的trace-log , server本身是默认开启的,client需要通过
fdb_netwokr_set_option
设置 FDB_NET_OPTION_TRACE_ENABLE
指定log的路径,以及设置 FDB_NET_OPTION_KNOB
中的 min_trace_severity=10
,表示默认追踪的是info级别的 log信息。
出现问题之后,优先看看trace-log中是否有serverity>=30的情况,有则看其对应的异常是否和接口返回的异常一样,一样则即可明确问题。 - 考虑到目前fdb在易用性上的问题,代码比较复杂且难追踪始末链路。如果是io-error或者platfor-error相关的问题,大多都是使用上的问题,但具体是在哪里出现的使用问题,可能需要根据返回这一些问题相关的代码来综合查看。比如,io-error相关的问题,那就需要全局搜索哪里有
throw io_error
的情况,根据自己的接口所处的代码区域来综合看,清楚对应错误的上下文。 - 既然要使用了,必须加强对接口实现代码的掌控。对于
fdb_setup_network
这样的接口返回io-error问题,没有日志,没有错误码,不知道哪一个系统调用产生的,也就是在不看源代码的情况下根本无法使用外部trace 工具来去追踪。因为该问题仅在特定的场景下才能产生,根本无法用gdb trace执行。所以核心还是需要对代码熟悉,这样才能有效决策错误产生场景及解决方案。比如,不抓包、不看源代码的情况下根本不可能知道fdb_setup_network 不和fdbserver通信;不看其实现代码的情况下,根本不可能知道io_error是其初始化AIO相关的文件系统时执行了io_setup
系统调用产生的。所以,使用了什么接口,且要上生产环境,必须对其实现原理以及可能的问题有精确的掌控,才能最大可能得防止损失的方式,有效得提高系统的容错能力。
PS:吐槽一点,fdb的异步框架让代码变得极不易读。这一部分代码逻辑很简单,因为还没有涉及到 flow 异步框架的介入,所以代码顺序看就好。但是到了 后续要介绍的 transaction相关的接口,就会发现异步框架让我们对细节的掌控变得很难,一个错误码的抛出并不是这个接口产生的,而是前面的某一个接口产生,内部将错误码异步传递给了其他接口的异步请求。。。总之,异步框架让fdb可以承担极高的并发,但是让内核代码变得极度复杂,难以快速上手维护。
到此 fdb_setup_network 的实现主要是初始化一些client端要使用的一些全局变量,以及准备好能够和server端通信变量(FlowTransport 这样的)。
接下来要执行fdb_run_network
的逻辑。
fdb_run_network 实现
该接口的主要功能是 初始化能够异步处理客户端请求的全局(进程级别,每一个进程仅有一个)调度线程,并且要执行能够cluster交互的部分。
实现链路也是:
MultiVersionApi::runNetwork
ThreadSafeApi::runNetwork
NativeAPI::runNetwork
核心逻辑是在 NativeAPI::runNetwork
中,主要做了以下几个事情:
- 检查
g_network
是否已经创建 且 可运行,否则会返回错误码。 - 开启客户端的 profiling-trace,可以通过
FDBNetworkOptions::ENABLE_RUN_LOOP_PROFILING
配置开启。 - 通过
g_network->run
函数启动运行Net2::run
,因为这个函数会循环运行,所以需要 fdb_run_network
函数单独放在一个线程执行。直到执行了 fdb_stop_network
,将stop标记为true,run内部的循环才会结束。 - 关于
Net2::run
函数中,主体逻辑是在如下之中。
while (!ready.empty()) {
++countTasks;
currentTaskID = ready.top().taskID;
priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task;
ready.pop();
try {
++tasksSinceReact;
(*task)();
} catch (Error& e) {
TraceEvent(SevError, "TaskError").error(e);
} catch (...) {
TraceEvent(SevError, "TaskError").error(unknown_error());
}
if (currentTaskID < minTaskID) {
trackAtPriority(currentTaskID, taskBegin);
minTaskID = currentTaskID;
}
...
}
- 主要是维护了一个
priority_queue
保存来自客户端请求的task,即执行函数,请求的实际执行会在这里,也就是在 Net2::run
所处的独立线程中。ready 中的task则是执行其他接口:比如 fdb_transaction_set_option 时底层执行函数内部注册的task 填充到ready 优先级队列中。
fdb_create_database 实现
接下来就是初始化 fdbclient 和 cluster链接的database 对象,需要指定连接集群的fdb.cluster
路径,这个接口是需要和cluster通信的。
实现链路的C++代码调用栈如下:
MultiVersionApi::createDatabase
ThreadSafeApi::createDatabase
ThreadSafeDatabase::ThreadSafeDatabase
Database::createDatabase
核心实现还是在 NativeAPI.actor.cpp
中的 Database::createDatabase
函数中,拿着传入的cluster-file 路径构造的 IClusterConnectionRecord
对象和cluster进行通信。
大体逻辑如下:
- check 前面
fdb_setup_network
初始化好的 g_network
在这里是否有效,否则返回2008 - 通过
g_network
初始化 TLS 认证连接,保证后续 client和cluster 通信的加密性 - 通过
IClusterConnectionRecord
对象和cluster 的 coordinator 进行rpc通信,主要是拿到 commitProxies
和 GrvProxies
的信息,方便后续client 的读写调度,因为后续的读请求会直接和GrvProxies
交互, 写请求的调度则直接和commitProxies
交互,这一些节点的信息需要被客户端或渠到。这一部分的逻辑是在 monitorProxies --> monitorProxiesOneGeneration
函数中实现的,底层通信是通过 fdbrpc 完成的。 - 构造
DatabaseContext
对象,并转为 Database
对象返回。
总的来说 createDatabase的核心目的还是为了通过 cluster-file 拿到集群中的关键角色的信息(commitProxies 和 GrvProxies),保证后续client 的读写操作能够直连这一些角色。
fdb_database_create_transaction 实现
这个接口用来在指定的 database中创建transaction对象,在多租户场景中则需要使用fdb_tenant_create_transaction
创建指定租户的事务对象,租户相关的接口后文会继续介绍。
在这个接口的实现中有一些细节需要我们注意,不然可能在接口的使用上会有误判。
这里也通过调用前面创建好的database对象来初始化transaction 对象:Reference<ITransaction> tr = DB(d)->createTransaction();
。
调用栈如下:
MultiVersionDatabase::createTransaction
MultiVersionTransaction::MultiVersionTransaction
MultiVersionTransaction::updateTransaction
ThreadSafeDatabase::createTransaction
ThreadSafeTransaction::ThreadSafeTransaction
ReadYourWritesTransaction::construct
ReadYourWritesTransaction::ReadYourWritesTransaction
在构造 MultiVersionTransaction
对象的时候会保存一下这个事务的startTime
,即事务最开始被创建或者reset 的时间,用来后续对事务执行是否超时来做判断,这里是事务超时机制非常重要的一个细节,如果我们忽略,可能会在接口使用上存在很多莫名其妙的 1031-transaction_timed_out
问题。
在ThreadSafeTransaction
中默认的 ISingleThreadTransaction::Type
为 RYW
即 ReadYourWriteTransaciton,可以通过 fdb_database_set_option
来配置 USE_CONFIG_DATABASE
选项,变更要创建的 transaction 类型。
对于 RYW 来说即read-your-write transaction,这个是实现客户端transaction 基本的形态,提供的事务隔离级别是 RR (repeatable-read),即读的时候可以从 GrvProxies 中 拿一个read-version 来读这个read-version之前的版本,之后的提交则读不到(目前看到transaction相关的配置,并没有办法配置transaction 的隔离级别)。
在ReadYourWriteTransaciton 的初始化代码中需要注意两个比较重要的变量:
ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenantName)
: ISingleThreadTransaction(cx->deferredError), tr(cx, tenantName), cache(&arena), writes(&arena), retries(0),
approximateSize(0), creationTime(now()), commitStarted(false), versionStampFuture(tr.getVersionstamp()),
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end), options(tr) {
std::copy(
cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
applyPersistentOptions();
}
1. 初始化 tr – 即Transaction
类:
Transaction::Transaction(Database const& cx, Optional<TenantName> const& tenant)
: trState(makeReference<TransactionState>(cx,
tenant,
cx->taskID,
generateSpanID(cx->transactionTracingSample),
createTrLogInfoProbabilistically(cx))),
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) {
if (DatabaseContext::debugUseTags) {
debugAddTags(trState);
}
}
这里需要注意backoff
,它是用来防止transaction 提交失败频繁重试的,用来延迟一段时间保证系统有足够的响应时间,默认是10ms。
2. 初始化 writes – 即 WriteMap
其实际上是一个保存在内存中的 Balanced binary tree
,fdb这里起的名字叫做 PTree
– persistent balanced binary tree
,不过并没有看到持久化相关的逻辑。
仅占用O(1)空间的情况下,就能满足 RYW 事务对于transaction的多版本要求。所有的 fdb_transaction_set
接口写入的k/v数据 会先被 RYW 事务缓存到 本地的WriteMap,因为一个transaction 保持提交性能稳定的情况下建议最大的缓存数据量为10M,k/v个数有限,所以单个事务占用的 PTree的空间是常量空间,维持binary-tree的目的则是保证查询性能。
fdb_transaction_set_option 实现
这个接口是用来设置 transaction级别的配置的,用来指定一个transaction的行为,比如:是否禁止RYW-transaction,当前transaction的超时时间,请求执行失败可以重试的次数,大小等等。更多的参数在:fdb_c_options.g.h
– FDBTransactionOption
枚举类型中。
实现调用栈如下:
MultiVersionTransaction::setOption
ThreadSafeTransaction::setOption
调用链路还没有结束,这里在 ThreadSafTransaction::setOption中能够看会取 this->tr 作为自己的transaction类型,这块我们在前面创建transaction也知道这里默认是 RYW transaction。
void ThreadSafeTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
auto itr = FDBTransactionOptions::optionInfo.find(option);
if (itr == FDBTransactionOptions::optionInfo.end()) {
TraceEvent("UnknownTransactionOption").detail("Option", option);
throw invalid_option();
}
ISingleThreadTransaction* tr = this->tr;
Standalone<Optional<StringRef>> passValue = value;
// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); }, &tr->deferredError);
}
这里会通过 onMainThreadVoid
函数将当前函数的执行时机交给一个 promise 变量,这个promise 变量会被填充到g_network 管理的 ready
优先级队列中。由这个promise变量在 我们前面 g_network run时所在的线程去调度,从而达到异步执行的目的。
void onMainThreadVoid(F f, Error* err = nullptr, TaskPriority taskID = TaskPriority::DefaultOnMainThread) {
Promise<Void> signal;
internal_thread_helper::doOnMainThreadVoid(signal.getFuture(), f, err);
g_network->onMainThread(std::move(signal), taskID);
}
// 让当前 onMainThreadVoid 函数等待在一个signal(future)之上,后续该future被promise
// 唤醒, 则能够恢复执行传入的 function。
void doOnMainThreadVoid(Future<Void> signal, F f, Error* err) {
wait(signal);
if (err && err->code() != invalid_error_code)
return;
try {
f();
} catch (Error& e) {
if (err)
*err = e;
}
}
// g_network->onMainThread 的 Net2 实现
void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
if (stopped)
return;
// 创建好promise task,后面填充到ready 队列中
PromiseTask* p = new PromiseTask(std::move(signal));
int64_t priority = int64_t(taskID) << 32;
if (thread_network == this) {
processThreadReady();
this->ready.push(OrderedTask(priority - (++tasksIssued), taskID, p));
} else {
if (threadReady.push(OrderedTask(priority, taskID, p)))
reactor.wake();
}
}
这块的实现逻辑在 ThreadSafeTransaction
相关的调度接口中都是一样的,包括 set
,get
,commit
等等,而最后lazy执行的对应的函数是 RYW
中的对应的函数。
这块异步调度流程实际如下图,其中还包括了后续要介绍的几个commit/block_until/onError接口等:
上图从上往下代表的是时间线,绿色可以理解为我们外部调用的api接口,数字代表的是执行顺序。
需要注意的是 1 和 6 两个接口,因为4 是异步执行,所以又可能4发生之前6已经执行,数字仅描述的是一种理想的状态。不过在2-5之间的执行顺序是严格有序的。
两个比较主要的变量分别是 ready
和 resetPromise
- 有一个
g_network->run
所在进程唯一的thread 维护的 ready-queue 用来调度来自客户端的promise请求。 - 还有一个 transaction 唯一的
resetPromise
变量,用来在事务内部 唤醒/传递 error。
以fdb_transaction_set_option
接口为例,其实现的步骤如下:
- 按照前面提到的调用栈 进入
ThreadsafeTransaction::setOption
逻辑中的onMainThreadVoid
函数进行调度,封装好 ReadYourWritesTransaction::setOption
函数作为异步执行函数参数。 - 在调度函数中,通过
doOnMainThreadVoid
让 RYW 对应的setOption 的执行 wait 在一个promise变量上,等待被异步唤醒执行。 - 通过
g_network->onMainThread
将promise 变量塞入到 ready 优先级队列中,等待g_network thread进行调度。
void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
if (stopped)
return;
PromiseTask* p = new PromiseTask(std::move(signal));
int64_t priority = int64_t(taskID) << 32;
if (thread_network == this) {
processThreadReady();
this->ready.push(OrderedTask(priority - (++tasksIssued), taskID, p));
} else {
if (threadReady.push(OrderedTask(priority, taskID, p)))
reactor.wake();
}
}
- 在 g_network->run 函数中,通过
Net2::run
函数中的如下逻辑唤醒等待在当前 promise上的待执行函数。
void Net2::run() {
...
while (!ready.empty()) {
++countTasks;
currentTaskID = ready.top().taskID;
priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task;
ready.pop();
try {
++tasksSinceReact;
(*task)();
} catch (Error& e) {
TraceEvent(SevError, "TaskError").error(e);
} catch (...) {
TraceEvent(SevError, "TaskError").error(unknown_error());
}
}
...
}
主要是从ready中取出task,这个task实际的对象类型是前面push 进ready中的 PromiseTask
,执行(*task)()
则是调用 PromiseTask
类的 ()
operator,通过promise.send(Void());
唤醒前面的wait逻辑,执行真正的 RYW的setOption逻辑。
- 在
ReadYourWritesTransaction::setOption
中参数就是 option 类型和要设置的value,调用 setOptionImpl
进行参数设置。 - 这里需要提一下 option的
TIMEOUT
选项,有隐藏的坑,fdb transaction的timeout 并不是单纯描述设置option之后一个操作的执行时间是否超时,而是看当前transaction 从创建到完成后面操作的时间是否超时。
看代码就知道了,先调用 resetTimeout
:
void ReadYourWritesTransaction::resetTimeout() {
timeoutActor =
options.timeoutInSeconds == 0.0 ? Void() :
timebomb(options.timeoutInSeconds + creationTime, resetPromise);
}
ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
while (now() < endTime) {
wait(delayUntil(std::min(endTime + 0.0001,
now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL)));
}
if (!resetPromise.isSet())
resetPromise.sendError(transaction_timed_out());
throw transaction_timed_out();
}
假如我们设置了timeout配置,那肯定会进入 timebomb
逻辑,可以看到第一个参数其实代表的是从现在开始要delay的end-time,即从事务的creationTime
开始 到 事务的设置的超时时间结束。如果这段时间内没有完成事务的相关操作(commit/get),就会通过resetPormise
send一个timeout error。
这块的逻辑比较重要,假如我们设置的超时时间是100ms
,但是我们上层从创建事务到执行commit操作之前有一些额外的操作消耗掉了这100ms的时间,那在执行接下来的commit/get
操作就有可能收到超时错误。
这个错误并不是由 setoption 接口返回的,但是是其内部的 RYW事务的 timebomb 函数send给 transaction共享的 resetPromise
,会由后续的 onError
取出来返回给用户。这样的一个调度链路就是setOption设置超时时间时起内部产生的错误码会被 commit 之后的 onError
捕获,该错误并不是单纯的commit报出的错误,所以如果不看代码,根本不清楚这块的调度链路,误导大家对错误的判断。
此外,对于超时的处理如果想要单纯的check 某一个接口的操作是否会超时,则需要将现在到createtime 之间的时间也加进来才行。
对于其他类型的 transaction option 的设置直接看 ReadYourWritesTransaction::setOptionImpl
的处理,有不同类型的处理方式,有的是不需要产生异步 promise 由 g_network thread 去调度的。
fdb_transaction_commit 实现
前面提到的 setoption 在整个client 调度链路的图在当前 commit 这里也适用。
同样会生成一个 PromiseTask
对象被 g_network thread 异步调度,执行链路是和前面setOption一样的,我们主要是关注 commit部分在 ReadYourWritesTransaction
中的实现逻辑。
核心commit 的逻辑实现调用栈如下:
RYWImpl::commit
Transaction::commit
commitAndWatch (NativeAPI.actor.cpp)
Transaction::commitMutations
tryCommit (NativeAPI.actor.cpp)
在 Transaction::commitMutations
前面主要是做一些 option 上的check 和对应逻辑的处理,最终会进入到tryCommit
逻辑,在这个函数中核心就是拿着前面 fdb_create_database
中保存的 clientInfo
即 commitProxies 的信息,直接通过fdb_setup_network
时构造的通信组件FlowTransport
发送rpc 到 firstCommitProxy
这个节点进行提交。
ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
CommitTransactionRequest req,
Future<Version> readVersion) {
...
try {
// 开启buggify,则会随机生成一些错误码返回,用于为上层客户端提供有效的稳定性测试。
if (CLIENT_BUGGIFY) {
throw deterministicRandom()->randomChoice(std::vector<Error>{
not_committed(), transaction_too_old(), proxy_memory_limit_exceeded(), commit_unknown_result() });
}
...
req.debugID = commitID;
state Future<CommitID> reply;
if (trState->options.commitOnFirstProxy) {
if (trState->cx->clientInfo->get().firstCommitProxy.present()) {
reply = throwErrorOr(brokenPromiseToMaybeDelivered(
trState->cx->clientInfo->get().firstCommitProxy.get().commit.tryGetReply(req)));
} else {
const std::vector<CommitProxyInterface>& proxies = trState->cx->clientInfo->get().commitProxies;
reply = proxies.size() ? throwErrorOr(brokenPromiseToMaybeDelivered(proxies[0].commit.tryGetReply(req)))
: Never();
}
} else {
reply = basicLoadBalance(trState->cx->getCommitProxies(trState->useProvisionalProxies),
&CommitProxyInterface::commit,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::True);
}
...
}
后续的 commit req请求的处理就到 fdbserver 了,这块的实现后续的文章再单独描述吧,本篇就不提了。流程上可以参考:foundationdb read and write path internal。
fdb_transaction_on_error 实现
这个接口是对一个transaction的错误处理,并在返回前根据相关的事务处理的错误码进行对应的错误处理逻辑,需要注意的是这个接口仅能在有错误码的情况下使用,如果本身处理是成功的,这个接口传入的error-code 为 0 的话就会返回2000 错误码,表示接口使用不当。
调用栈和前面的setOption一样,不过在 ThreadSafeTransaction
丢promise 进g_network thread时变成阻塞方式的了,需要等到RYW 的 onError
执行返回才会给上层返回。
主体逻辑在 RYWImpl::onError
中。
ReadYourWritesTransaction::onError
RYWImpl::onError
大概的处理逻辑是:
- check 当前事务处理失败 并 尝试retry的次数是否超过
FDBTransactionOptions::RETRY_LIMIT
的限制,超过则throw error. - 调用
Transaction::onError
函数继续处理错误码:
- 针对如下错误码 – 认为是可重试的错误,会触发一个backoff策略,简单来说就是delay 一段backoff的时间,并且reset 当前事务。
if (e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result ||
e.code() == error_code_database_locked || e.code() == error_code_proxy_memory_limit_exceeded ||
e.code() == error_code_process_behind || e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_tag_throttled) {
if (e.code() == error_code_not_committed)
++trState->cx->transactionsNotCommitted;
else if (e.code() == error_code_commit_unknown_result)
++trState->cx->transactionsMaybeCommitted;
else if (e.code() == error_code_proxy_memory_limit_exceeded)
++trState->cx->transactionsResourceConstrained;
else if (e.code() == error_code_process_behind)
++trState->cx->transactionsProcessBehind;
else if (e.code() == error_code_batch_transaction_throttled || e.code() == error_code_tag_throttled) {
++trState->cx->transactionsThrottled;
}
double backoff = getBackoff(e.code());
reset();
return delay(backoff, trState->taskID);
}
- 后面还有两种会执行 backoff 和 reset 的错误码场景
if (e.code() == error_code_transaction_too_old ||
e.code() == error_code_future_version) {
if (e.code() == error_code_transaction_too_old)
++trState->cx->transactionsTooOld;
else if (e.code() == error_code_future_version)
++trState->cx->transactionsFutureVersions;
double maxBackoff = trState->options.maxBackoff;
reset();
return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), trState->taskID);
}
if (e.code() == error_code_unknown_tenant) {
double maxBackoff = trState->options.maxBackoff;
reset();
return delay(std::min(CLIENT_KNOBS->UNKNOWN_TENANT_RETRY_DELAY, maxBackoff), trState->taskID);
}
在 onError内部,会自动对以上错误码调度 reset,只是针对不同的错误码会有不同的backoff策略,体现在delay的时间长或者短。
tenant 相关的接口实现
多租户在 foundationdb 中是一个比较重要的特性,因为不同用户的数据应该要能够快速识别,并且在逻辑访问上是隔离的,这样一个fdb集群就能够为多用户提供访问特性,且保证了每一个用户的访问性能,目前tenant 相关的特性还处于实验阶段。
实现上是比较简单的,就是在commit的时候,在用户写入的key前面加一个对应的租户prefix,这个prefix是利用租户名生成的,可以转为一个int64_t
的id标识。
如下tryCommit 函数,我们前面介绍事务的commit api实现的时候提到过:
ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
CommitTransactionRequest req,
Future<Version> readVersion) {
...
state Key tenantPrefix;
if (trState->tenant().present()) {
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState,
""_sr,
&StorageServerInterface::getValue,
Reverse::False,
UseTenant::True,
req.transaction.read_snapshot));
applyTenantPrefix(req, locationInfo.tenantEntry.prefix);
tenantPrefix = locationInfo.tenantEntry.prefix;
}
...
}
只需要在读的时候保证能够将租户相关的prefix 移除,用户要读的数据 返回给用户即可。
结语
fdb其实还有一些比较有趣的接口,比如 fdb_transaction_watch
能够由用户指定对想要审计的key 添加 watcher,这样在后续其他的用户修改了这个key 对应的value 就可以触发用户设置的回掉函数。其实现也是利用的 promise,客户端利用unordered_map 保存客户端设置的wather,在commit k/v的时候会检测提交的key 是否在 这个map 中,在则触发用户外部设置的回调函数。
本文并没有完整展示所有客户端接口的实现,只是挑了一些比较有代表性且重要的接口做了一些介绍,通过本文的介绍,在fdbclient 侧的源代码中基本就能游刃有余了,能够准确的得使用fdb 提供的api。 不过核心代码还是在 fdbserver (分布式事务,共识算法,存储引擎)以及 fdbrpc 和 fdb提供的 simulator测试框架 中,当然这就需要花费更多的时间去深入了。