0
点赞
收藏
分享

微信扫一扫

关于 foundationdb client接口的基本实现 以及 使用上的一些小细节


文章目录

  • ​​前言​​
  • ​​基本接口使用​​
  • ​​fdb_setup_network 实现​​
  • ​​fdb_run_network 实现​​
  • ​​fdb_create_database 实现​​
  • ​​fdb_database_create_transaction 实现​​
  • ​​fdb_transaction_set_option 实现​​
  • ​​fdb_transaction_commit 实现​​
  • ​​fdb_transaction_on_error 实现​​
  • ​​tenant 相关的接口实现​​
  • ​​结语​​

前言

😐 八九月份实在是太忙了,大量的周末时间被消耗在学车上 😦 (给大学补课),周内的时间基本都在处理工作项目上的问题,没有集中时间好好得梳理这段时间学习到的东西,利用国庆假期补补课吧…

本文主要介绍一些foundationdb client端实现的一些细节,能帮助我们快速掌握api接口层的使用方式以及相关参数的影响范围,并且简单介绍一下flow中的异步编程框架如何在fdb内部使用。

foundationdb 其实在国内使用非常小众,虽然其发表了两篇顶会且是2021年的best paper,但是其实现复杂且易用性较低,实在难以像单机的rocksdb这样的kv引擎被大众所接纳。

复杂度除了本身架构设计上的众多角色之外 还有其贯穿整个代码体系的异步编程模式,从api层开始直到os的aio 都是全链路异步,这样的代码实现下如果不了解底层原理,很容易出现使用不当导致的应用问题且极难排查;所以foundationdb是真小众,但是其作为snowflake的元数据存储,且大规模应用在apple内部的元数据存储需求,还是值得研究学习的。

本文涉及到的源代码版本:​​7.1.9​​,且 fdbclient 代码行数在不包含测试的情况下已经有7.3w 行了。

关于 foundationdb client接口的基本实现 以及 使用上的一些小细节_nosql

基本接口使用

展示的是c-api的接口,后面介绍的实现都是 C++ 的核心实现。
所有的c-api都在​​​fdb_c.h​​​,安装fdb package之后会默认放在​​/usr/include/foundationdb​​下面。

基本接口的使用不会在这里描述,官方文档已经非常全面了 ​​fdb C-API​​,直接看一个简单连接客户端的例子,如下:

#define FDB_API_VERSION 710

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <fdb_c.h>
#include <unistd.h>

FDBTenant *fdb_tenant = NULL;
FDBTransaction *tr = NULL;
FDBDatabase *db = NULL;
pthread_t netThread;

static void checkError(fdb_error_t errorNum) {
if (errorNum) {
fprintf(stderr, "Error (%d): %s\n", errorNum, fdb_get_error(errorNum));
exit(errorNum);
}
}

static void waitAndCheckError(FDBFuture *future) {
checkError(fdb_future_block_until_ready(future));
if (fdb_future_get_error(future) != 0) {
checkError(fdb_future_get_error(future));
}
}

static void runNetwork() { checkError(fdb_run_network()); }

void createDataInDatabase() {
int committed = 0;
/* Create transaction. */
checkError(fdb_database_create_transaction(db, &tr));

while (!committed) {
/* Create data */
char *key1 = "Test Key1";
char *val1 = "Test Value1";
fdb_transaction_set(tr, key1, (int)strlen(key1), val1, (int)strlen(val1));

/* Commit to database.*/
FDBFuture *commitFuture = fdb_transaction_commit(tr);
checkError(fdb_future_block_until_ready(commitFuture));
if (fdb_future_get_error(commitFuture) != 0) {
waitAndCheckError(
fdb_transaction_on_error(tr, fdb_future_get_error(commitFuture)));
} else {
committed = 1;
}
fdb_future_destroy(commitFuture);
}
/* Destroy transaction. */
fdb_transaction_destroy(tr);
}

void createTenantDataAndReadData() {
char *tenant_name = "example";
FDBTransaction *tr = NULL;
FDBTransaction *tr2 = NULL;
fdb_bool_t valuePresent;
int valueLength;
const uint8_t *value = NULL;
const char *k1 = "tenant key1";
const char *v1 = "tenant value1";

checkError(fdb_database_open_tenant(db, (uint8_t const *)tenant_name,
strlen(tenant_name), &fdb_tenant));
checkError(fdb_tenant_create_transaction(fdb_tenant, &tr));
fdb_transaction_set(tr, k1, (int)strlen(k1), v1, (int)strlen(v1));
FDBFuture *commitFuture = fdb_transaction_commit(tr);
checkError(fdb_future_block_until_ready(commitFuture));
if (fdb_future_get_error(commitFuture) != 0) {
waitAndCheckError(
fdb_transaction_on_error(tr, fdb_future_get_error(commitFuture)));
}
fdb_transaction_destroy(tr);
fdb_future_destroy(commitFuture);

checkError(fdb_tenant_create_transaction(fdb_tenant, &tr2));
FDBFuture *getFuture = fdb_transaction_get(tr2, k1, (int)strlen(k1), 0);
waitAndCheckError(getFuture);
checkError(
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength));

printf("Get value from tenant , key : %s, value : %s, value-len : %d\n", k1,
value, valueLength);
fdb_transaction_destroy(tr2);
fdb_future_destroy(getFuture);
}

void readDataFromDatabase() {
FDBTransaction *tr = NULL;
const uint8_t *value = NULL;
fdb_bool_t valuePresent;
int valueLength;
char *key = "Test Key1";

checkError(fdb_database_create_transaction(db, &tr));
FDBFuture *getFuture = fdb_transaction_get(tr, key, (int)strlen(key), 0);
waitAndCheckError(getFuture);

checkError(
fdb_future_get_value(getFuture, &valuePresent, &value, &valueLength));

printf("Got Value from db. %s: '%.*s'\n", key, valueLength, value);
fdb_transaction_destroy(tr);
fdb_future_destroy(getFuture);
}

int main() {
/* Default fdb cluster file. */
char *cluster_file = "/etc/foundationdb/fdb.cluster";

/* Setup network. */
checkError(fdb_select_api_version(FDB_API_VERSION));
checkError(fdb_setup_network());
puts("Created network.");

pthread_create(&netThread, NULL, (void *)runNetwork, NULL);

checkError(fdb_create_database(cluster_file, &db));
puts("Created database.");

/*Create tenant and do nothing.*/
createDataInDatabase();
readDataFromDatabase();

createTenantDataAndReadData();

puts("Program done. Now exiting...");
fdb_tenant_destroy(fdb_tenant);
fdb_tenant = NULL;
fdb_database_destroy(db);
db = NULL;
checkError(fdb_stop_network());
int rc = pthread_join(netThread, NULL);
if (rc)
fprintf(stderr, "ERROR: network thread failed to join\n");
exit(0);
}

编译的话需要连接 ​​-lfdb_c -lpthread​​​ 两个库,因为 fdbclient的代码都在 ​​libfdb_c.so​​之中,用作用户和fdb集群进行交互。

main函数中能看到,想要进行k/v的读写,需要先执行:

// 选择api版本,因为代码中有一些api仅在指定的版本下才能支持 (租户相关的api)
checkError(fdb_select_api_version(FDB_API_VERSION));
// 启动异步处理相关的线程
checkError(fdb_setup_network());
// 创建一个执行 fdb_run_network函数的线程,用于进程退出时完成正在异步处理的请求
pthread_create(&netThread, NULL, (void *)runNetwork, NULL);
// 创建数据库对象,如果是多租户,后面还需要有创建访问租户的对象
checkError(fdb_create_database(cluster_file, &db));

后续就可以进行transaction相关的读写操作了。

1. fdb_database_create_transaction
2. fdb_transaction_set
3. fdb_transaction_commit
4. fdb_future_block_until_ready
5. fdb_future_get_error
6. fdb_future_destroy
7. fdb_transaction_destroy

一条kv从写入事务到完成提交,获取提交之后的状态,需要上面这个七个接口的参与…很麻烦。

fdb_setup_network 实现

在​​fdb_select_api_version​​​ 中指定了 >=14 版本的 一些接口被变更为 ​​#_impl​​ 这样的实现。

extern "C" DLLEXPORT fdb_error_t fdb_select_api_version_impl(int runtime_version, int header_version) {
...
FDB_API_CHANGED(fdb_setup_network, 14);
...
}

#define FDB_API_CHANGED(func, ver) \
if (header_version < ver) \
fdb_api_ptr_##func = (void*)&(func##_v##ver##_PREV); \
else if (fdb_api_ptr_##func == (void*)&fdb_api_ptr_unimpl) \
fdb_api_ptr_##func = (void*)&(func##_impl);

即 fdb_setup_network的入口是在 ​​fdb_setup_network_impl​​ 之中,调用C++ api。

fdb_error_t fdb_setup_network_impl() {
// API 是 MultiVersionApi::api,static成员,标识一个进程
// 无论调用多少次setup_network 只会启动一个,多次调用会返回失败。
CATCH_AND_RETURN(API->setupNetwork(););
}

简单提一下fdbclient 层的API抽象:

MultiVersionApi # 最上层
---> ThreadSafeApi # client底层api,用于和fdb cluster进行交互

所以启动过程会先进入 ​​MultiVersionApi::setupNetwork​​​ ,正常初始化的话会进入 ​​ThreadSafeApi::setupNetwork​​。

这里需要强调一下,这个函数字面意思是启动fdbclient和 集群的网络链接,但本质上并没有和cluseter的coordinator 有任何的网络交互。

验证的话可以通过tcpdump 监听的fdb.cluster 中的ip:port即可。
​​​sudo tcpdump -i lo host 127.0.0.1 and port 4500 -vv -s0 -nn​​ 因为我的server是在本地启动的,所以需要监听lo网口,-vv抓取整个网络包且 -nn 指定包展示完整的ip-port信息,不展示域名/hostname信息。

​fdb_setup_network​​ 做的事情如下几点:

  1. 检测​​networkStartup​​​标识,已经初始化过一次的就会返回 ​​2009 -- network_already_setup​​错误。
  2. 初始化 ​​localClient​​​,其就是更下一层的 client api -​​ThreadSafeApi​​。
  3. 执行localClient->api->setupNetwork ,即 ​​ThreadSafeApi::setupNetwork​
  4. 在​​ThreadSafeApi::setupNetwork​​ 中:
  • 先 check 是否已经启动过了,是则返回2009错误
  • 设置默认的 网络相关的 knob options ,如果在调用当前​​fdb_setup_network​​​ 接口之前通过​​fdb_network_set_option​​设置了网络相关的 knob option,则这里会完成对应option的填充,否则就用默认的。(option 的name和value 是保存在std::map中,这里只是取值)
  • 初始化 ​​g_network​​​,默认是初始化 ​​N2::Net2​
  • 添加​​fdb_stop_network​​ 底层要执行的函数到g_network的callback中
  • 初始化​​FlowTransport​​​ ,并放在​​g_network​​中,提供了用于传输用户请求到fdb-cluster的接口
  • 创建​​Net2FileSystem​​​,让​​g_network​​​ 能够对外提供一些读写文件的能力,比如客户端的trace-log的持久化,就是通过​​g_network​​的文件系统对象完成的。

需要注意的是在 ​​setupNetwork​​​ 执行过程中可能遇到 ​​1510-io_error​​​ 问题,你没看错,就是​​io_error​​。

这里是一个比较坑的地方,算是 fdbclient 的一个bug,你的​​fdb_setup_network​​​的接口会收到一个1510的错误码,具体背景在这个issue里面 ​​fdb_setup_network get io-error without trace-log​​​。就是因为linux 内核 配置上的一些问题,我们在fdb_setup_network 接口中收到1510的 io-error 错误码,第一反应应该是磁盘相关的一些问题,而且肯定是fdbserver 返回的(存储引擎写某一个盘写满了,或者有坏盘),但是server这里并没有任何trace-log的日志。是花了很长时间调试了fdbserver,最后就是结合fdb_setup_network的代码以及前面提到的tcpdump抓包来确认该 io-error 是在客户端层面的。但是客户端又没有什么日志爆出,最后在​​Net2FileSystem​​​ 代码中初始化 ​​AsyncFileKAIO​​ 确认了这个问题。

AsyncFileKAIO::init(Reference<IEventFD>(N2::ASIOReactor::getEventFD()), ioTimeout);

static void AsyncFileKAIO::init(Reference<IEventFD> ev, double ioTimeout) {
...
// 因为超过os 的 aio-max-nr 配置,可能会返回io-error
int rc = io_setup(FLOW_KNOBS->MAX_OUTSTANDING, &ctx.iocx);
if (rc < 0) {
// 这里并不会持久化到trace-log文件,因为此时并没有完成 AsyncFileKAIO的初始化,
TraceEvent("IOSetupError").GetLastError();
throw io_error();
}
setTimeout(ioTimeout);
ctx.evfd = ev->getFD();
poll(ev);

g_network->setGlobal(INetwork::enRunCycleFunc, (flowGlobalType)&AsyncFileKAIO::launch);
}

为什么会有aio 的 ​​io_setup​​​ 接口报 io-error的问题,是我们单机启动的集群过多,每一次调用​​io_setup​​​接口会提交一部分请求到aio调度队列,os默认允许的队列中最大的请求数为 ​​aio-max-nr = 65536​​​,我们多集群在一小段时间内启动就会有超过 aio-max-nr的情况。
最坑的地方在于这里并没有log文件,如果不看代码,基本发现不了这个问题(根本不知道fdbclient 还有可能返回io-error的地方)。

总结来说,关于fdb的api使用上遇到的相关问题 或者 其他类似nosql 系统的api的问题,排查方式建议如下:

  1. 缩小问题范围,需明确区分该错误是cluster返回的 还是client 返回的:直接的方式就是抓包。
  2. 开启client 或者 fdbserver的trace-log , server本身是默认开启的,client需要通过 ​​fdb_netwokr_set_option​​​设置 ​​FDB_NET_OPTION_TRACE_ENABLE​​​ 指定log的路径,以及设置 ​​FDB_NET_OPTION_KNOB​​​ 中的 ​​min_trace_severity=10​​​,表示默认追踪的是info级别的 log信息。
    出现问题之后,优先看看trace-log中是否有serverity>=30的情况,有则看其对应的异常是否和接口返回的异常一样,一样则即可明确问题。
  3. 考虑到目前fdb在易用性上的问题,代码比较复杂且难追踪始末链路。如果是io-error或者platfor-error相关的问题,大多都是使用上的问题,但具体是在哪里出现的使用问题,可能需要根据返回这一些问题相关的代码来综合查看。比如,io-error相关的问题,那就需要全局搜索哪里有​​throw io_error​​ 的情况,根据自己的接口所处的代码区域来综合看,清楚对应错误的上下文。
  4. 既然要使用了,必须加强对接口实现代码的掌控。对于​​fdb_setup_network​​​ 这样的接口返回io-error问题,没有日志,没有错误码,不知道哪一个系统调用产生的,也就是在不看源代码的情况下根本无法使用外部trace 工具来去追踪。因为该问题仅在特定的场景下才能产生,根本无法用gdb trace执行。所以核心还是需要对代码熟悉,这样才能有效决策错误产生场景及解决方案。比如,不抓包、不看源代码的情况下根本不可能知道fdb_setup_network 不和fdbserver通信;不看其实现代码的情况下,根本不可能知道io_error是其初始化AIO相关的文件系统时执行了​​io_setup​​​ 系统调用产生的。所以,使用了什么接口,且要上生产环境,必须对其实现原理以及可能的问题有精确的掌控,才能最大可能得防止损失的方式,有效得提高系统的容错能力。
    PS:吐槽一点,fdb的异步框架让代码变得极不易读。这一部分代码逻辑很简单,因为还没有涉及到 flow 异步框架的介入,所以代码顺序看就好。但是到了 后续要介绍的 transaction相关的接口,就会发现异步框架让我们对细节的掌控变得很难,一个错误码的抛出并不是这个接口产生的,而是前面的某一个接口产生,内部将错误码异步传递给了其他接口的异步请求。。。总之,异步框架让fdb可以承担极高的并发,但是让内核代码变得极度复杂,难以快速上手维护。

到此 fdb_setup_network 的实现主要是初始化一些client端要使用的一些全局变量,以及准备好能够和server端通信变量(FlowTransport 这样的)。

接下来要执行​​fdb_run_network​​ 的逻辑。

fdb_run_network 实现

该接口的主要功能是 初始化能够异步处理客户端请求的全局(进程级别,每一个进程仅有一个)调度线程,并且要执行能够cluster交互的部分。
实现链路也是:

MultiVersionApi::runNetwork
ThreadSafeApi::runNetwork
NativeAPI::runNetwork

核心逻辑是在 ​​NativeAPI::runNetwork​​ 中,主要做了以下几个事情:

  1. 检查​​g_network​​ 是否已经创建 且 可运行,否则会返回错误码。
  2. 开启客户端的 profiling-trace,可以通过​​FDBNetworkOptions::ENABLE_RUN_LOOP_PROFILING​​ 配置开启。
  3. 通过​​g_network->run​​​函数启动运行​​Net2::run​​​,因为这个函数会循环运行,所以需要 ​​fdb_run_network​​​函数单独放在一个线程执行。直到执行了 ​​fdb_stop_network​​,将stop标记为true,run内部的循环才会结束。
  4. 关于 ​​Net2::run​​ 函数中,主体逻辑是在如下之中。

while (!ready.empty()) {
++countTasks;
currentTaskID = ready.top().taskID;
priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task;
ready.pop();

try {
++tasksSinceReact;
(*task)();
} catch (Error& e) {
TraceEvent(SevError, "TaskError").error(e);
} catch (...) {
TraceEvent(SevError, "TaskError").error(unknown_error());
}

if (currentTaskID < minTaskID) {
trackAtPriority(currentTaskID, taskBegin);
minTaskID = currentTaskID;
}
...
}

  1. 主要是维护了一个​​priority_queue​​​ 保存来自客户端请求的task,即执行函数,请求的实际执行会在这里,也就是在 ​​Net2::run​​ 所处的独立线程中。ready 中的task则是执行其他接口:比如 fdb_transaction_set_option 时底层执行函数内部注册的task 填充到ready 优先级队列中。

fdb_create_database 实现

接下来就是初始化 fdbclient 和 cluster链接的database 对象,需要指定连接集群的​​fdb.cluster​​​路径,这个接口是需要和cluster通信的。
实现链路的C++代码调用栈如下:

MultiVersionApi::createDatabase
ThreadSafeApi::createDatabase
ThreadSafeDatabase::ThreadSafeDatabase
Database::createDatabase

核心实现还是在 ​​NativeAPI.actor.cpp​​​ 中的 ​​Database::createDatabase​​​函数中,拿着传入的cluster-file 路径构造的 ​​IClusterConnectionRecord​​ 对象和cluster进行通信。

大体逻辑如下:

  1. check 前面​​fdb_setup_network​​​ 初始化好的 ​​g_network​​ 在这里是否有效,否则返回2008
  2. 通过 ​​g_network​​ 初始化 TLS 认证连接,保证后续 client和cluster 通信的加密性
  3. 通过 ​​IClusterConnectionRecord​​​ 对象和cluster 的 coordinator 进行rpc通信,主要是拿到 ​​commitProxies​​​ 和 ​​GrvProxies​​​的信息,方便后续client 的读写调度,因为后续的读请求会直接和​​GrvProxies​​​交互, 写请求的调度则直接和​​commitProxies​​​交互,这一些节点的信息需要被客户端或渠到。这一部分的逻辑是在 ​​monitorProxies --> monitorProxiesOneGeneration​​函数中实现的,底层通信是通过 fdbrpc 完成的。
  4. 构造​​DatabaseContext​​​ 对象,并转为 ​​Database​​ 对象返回。

总的来说 createDatabase的核心目的还是为了通过 cluster-file 拿到集群中的关键角色的信息(commitProxiesGrvProxies),保证后续client 的读写操作能够直连这一些角色。

fdb_database_create_transaction 实现

这个接口用来在指定的 database中创建transaction对象,在多租户场景中则需要使用​​fdb_tenant_create_transaction​​创建指定租户的事务对象,租户相关的接口后文会继续介绍。

在这个接口的实现中有一些细节需要我们注意,不然可能在接口的使用上会有误判。

这里也通过调用前面创建好的database对象来初始化transaction 对象:​​Reference<ITransaction> tr = DB(d)->createTransaction();​​​。
调用栈如下:

MultiVersionDatabase::createTransaction
MultiVersionTransaction::MultiVersionTransaction
MultiVersionTransaction::updateTransaction
ThreadSafeDatabase::createTransaction
ThreadSafeTransaction::ThreadSafeTransaction
ReadYourWritesTransaction::construct
ReadYourWritesTransaction::ReadYourWritesTransaction

在构造 ​​MultiVersionTransaction​​​ 对象的时候会保存一下这个事务的​​startTime​​​,即事务最开始被创建或者reset 的时间,用来后续对事务执行是否超时来做判断,这里是事务超时机制非常重要的一个细节,如果我们忽略,可能会在接口使用上存在很多莫名其妙的 ​​1031-transaction_timed_out​​ 问题。

在​​ThreadSafeTransaction​​​ 中默认的 ​​ISingleThreadTransaction::Type​​​ 为 ​​RYW​​ 即 ReadYourWriteTransaciton,可以通过 ​​fdb_database_set_option​​​ 来配置 ​​USE_CONFIG_DATABASE​​ 选项,变更要创建的 transaction 类型。

对于 RYW 来说即read-your-write transaction,这个是实现客户端transaction 基本的形态,提供的事务隔离级别是 RR (repeatable-read),即读的时候可以从 GrvProxies 中 拿一个read-version 来读这个read-version之前的版本,之后的提交则读不到(目前看到transaction相关的配置,并没有办法配置transaction 的隔离级别)。

在ReadYourWriteTransaciton 的初始化代码中需要注意两个比较重要的变量:

ReadYourWritesTransaction::ReadYourWritesTransaction(Database const& cx, Optional<TenantName> tenantName)
: ISingleThreadTransaction(cx->deferredError), tr(cx, tenantName), cache(&arena), writes(&arena), retries(0),
approximateSize(0), creationTime(now()), commitStarted(false), versionStampFuture(tr.getVersionstamp()),
specialKeySpaceWriteMap(std::make_pair(false, Optional<Value>()), specialKeys.end), options(tr) {
std::copy(
cx.getTransactionDefaults().begin(), cx.getTransactionDefaults().end(), std::back_inserter(persistentOptions));
applyPersistentOptions();
}

1. 初始化 tr – 即​​Transaction​​类:

Transaction::Transaction(Database const& cx, Optional<TenantName> const& tenant)
: trState(makeReference<TransactionState>(cx,
tenant,
cx->taskID,
generateSpanID(cx->transactionTracingSample),
createTrLogInfoProbabilistically(cx))),
span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) {
if (DatabaseContext::debugUseTags) {
debugAddTags(trState);
}
}

这里需要注意​​backoff​​,它是用来防止transaction 提交失败频繁重试的,用来延迟一段时间保证系统有足够的响应时间,默认是10ms。

2. 初始化 writes – 即 ​​WriteMap​​​ 其实际上是一个保存在内存中的 ​​Balanced binary tree​​,fdb这里起的名字叫做 ​​PTree​​ – ​​persistent balanced binary tree​​,不过并没有看到持久化相关的逻辑。
仅占用O(1)空间的情况下,就能满足 RYW 事务对于transaction的多版本要求。所有的 ​​fdb_transaction_set​​ 接口写入的k/v数据 会先被 RYW 事务缓存到 本地的WriteMap,因为一个transaction 保持提交性能稳定的情况下建议最大的缓存数据量为10M,k/v个数有限,所以单个事务占用的 PTree的空间是常量空间,维持binary-tree的目的则是保证查询性能。

fdb_transaction_set_option 实现

这个接口是用来设置 transaction级别的配置的,用来指定一个transaction的行为,比如:是否禁止RYW-transaction,当前transaction的超时时间,请求执行失败可以重试的次数,大小等等。更多的参数在:​​fdb_c_options.g.h​​​ – ​​FDBTransactionOption​​枚举类型中。

实现调用栈如下:

MultiVersionTransaction::setOption
ThreadSafeTransaction::setOption

调用链路还没有结束,这里在 ThreadSafTransaction::setOption中能够看会取 this->tr 作为自己的transaction类型,这块我们在前面创建transaction也知道这里默认是 RYW transaction。

void ThreadSafeTransaction::setOption(FDBTransactionOptions::Option option, Optional<StringRef> value) {
auto itr = FDBTransactionOptions::optionInfo.find(option);
if (itr == FDBTransactionOptions::optionInfo.end()) {
TraceEvent("UnknownTransactionOption").detail("Option", option);
throw invalid_option();
}
ISingleThreadTransaction* tr = this->tr;
Standalone<Optional<StringRef>> passValue = value;

// ThreadSafeTransaction is not allowed to do anything with options except pass them through to RYW.
onMainThreadVoid([tr, option, passValue]() { tr->setOption(option, passValue.contents()); }, &tr->deferredError);
}

这里会通过 ​​onMainThreadVoid​​​ 函数将当前函数的执行时机交给一个 promise 变量,这个promise 变量会被填充到g_network 管理的 ​​ready​​ 优先级队列中。由这个promise变量在 我们前面 g_network run时所在的线程去调度,从而达到异步执行的目的。

void onMainThreadVoid(F f, Error* err = nullptr, TaskPriority taskID = TaskPriority::DefaultOnMainThread) {
Promise<Void> signal;
internal_thread_helper::doOnMainThreadVoid(signal.getFuture(), f, err);
g_network->onMainThread(std::move(signal), taskID);
}
// 让当前 onMainThreadVoid 函数等待在一个signal(future)之上,后续该future被promise
// 唤醒, 则能够恢复执行传入的 function。
void doOnMainThreadVoid(Future<Void> signal, F f, Error* err) {
wait(signal);
if (err && err->code() != invalid_error_code)
return;
try {
f();
} catch (Error& e) {
if (err)
*err = e;
}
}
// g_network->onMainThread 的 Net2 实现
void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
if (stopped)
return;
// 创建好promise task,后面填充到ready 队列中
PromiseTask* p = new PromiseTask(std::move(signal));
int64_t priority = int64_t(taskID) << 32;

if (thread_network == this) {
processThreadReady();
this->ready.push(OrderedTask(priority - (++tasksIssued), taskID, p));
} else {
if (threadReady.push(OrderedTask(priority, taskID, p)))
reactor.wake();
}
}

这块的实现逻辑在 ​​ThreadSafeTransaction​​​ 相关的调度接口中都是一样的,包括 ​​set​​​,​​get​​​,​​commit​​​等等,而最后lazy执行的对应的函数是 ​​RYW​​中的对应的函数。

这块异步调度流程实际如下图,其中还包括了后续要介绍的几个commit/block_until/onError接口等:

关于 foundationdb client接口的基本实现 以及 使用上的一些小细节_c++_02


上图从上往下代表的是时间线,绿色可以理解为我们外部调用的api接口,数字代表的是执行顺序。

需要注意的是 1 和 6 两个接口,因为4 是异步执行,所以又可能4发生之前6已经执行,数字仅描述的是一种理想的状态。不过在2-5之间的执行顺序是严格有序的。

两个比较主要的变量分别是 ​​ready​​​ 和 ​​resetPromise​

  • 有一个​​g_network->run​​ 所在进程唯一的thread 维护的 ready-queue 用来调度来自客户端的promise请求。
  • 还有一个 transaction 唯一的 ​​resetPromise​​ 变量,用来在事务内部 唤醒/传递 error。

以​​fdb_transaction_set_option​​接口为例,其实现的步骤如下:

  1. 按照前面提到的调用栈 进入 ​​ThreadsafeTransaction::setOption​​ 逻辑中的​​onMainThreadVoid​​函数进行调度,封装好 ​​ReadYourWritesTransaction::setOption​​ 函数作为异步执行函数参数。
  2. 在调度函数中,通过 ​​doOnMainThreadVoid​​ 让 RYW 对应的setOption 的执行 wait 在一个promise变量上,等待被异步唤醒执行。
  3. 通过 ​​g_network->onMainThread​​ 将promise 变量塞入到 ready 优先级队列中,等待g_network thread进行调度。

void Net2::onMainThread(Promise<Void>&& signal, TaskPriority taskID) {
if (stopped)
return;
PromiseTask* p = new PromiseTask(std::move(signal));
int64_t priority = int64_t(taskID) << 32;

if (thread_network == this) {
processThreadReady();
this->ready.push(OrderedTask(priority - (++tasksIssued), taskID, p));
} else {
if (threadReady.push(OrderedTask(priority, taskID, p)))
reactor.wake();
}
}

  1. 在 g_network->run 函数中,通过​​Net2::run​​ 函数中的如下逻辑唤醒等待在当前 promise上的待执行函数。

void Net2::run() {
...
while (!ready.empty()) {
++countTasks;
currentTaskID = ready.top().taskID;
priorityMetric = static_cast<int64_t>(currentTaskID);
Task* task = ready.top().task;
ready.pop();

try {
++tasksSinceReact;
(*task)();
} catch (Error& e) {
TraceEvent(SevError, "TaskError").error(e);
} catch (...) {
TraceEvent(SevError, "TaskError").error(unknown_error());
}
}
...
}

主要是从ready中取出task,这个task实际的对象类型是前面push 进ready中的 ​​PromiseTask​​​,执行​​(*task)()​​​ 则是调用 ​​PromiseTask​​​ 类的 ​​()​​​ operator,通过​​promise.send(Void());​​ 唤醒前面的wait逻辑,执行真正的 RYW的setOption逻辑。

  1. 在 ​​ReadYourWritesTransaction::setOption​​​ 中参数就是 option 类型和要设置的value,调用 ​​setOptionImpl​​进行参数设置。
  2. 这里需要提一下 option的 ​​TIMEOUT​​​ 选项,有隐藏的坑,fdb transaction的timeout 并不是单纯描述设置option之后一个操作的执行时间是否超时,而是看当前transaction 从创建到完成后面操作的时间是否超时。
    看代码就知道了,先调用 ​​​resetTimeout​​:

void ReadYourWritesTransaction::resetTimeout() {
timeoutActor =
options.timeoutInSeconds == 0.0 ? Void() :
timebomb(options.timeoutInSeconds + creationTime, resetPromise);
}

ACTOR Future<Void> timebomb(double endTime, Promise<Void> resetPromise) {
while (now() < endTime) {
wait(delayUntil(std::min(endTime + 0.0001,
now() + CLIENT_KNOBS->TRANSACTION_TIMEOUT_DELAY_INTERVAL)));
}
if (!resetPromise.isSet())
resetPromise.sendError(transaction_timed_out());
throw transaction_timed_out();
}

假如我们设置了timeout配置,那肯定会进入 ​​timebomb​​​逻辑,可以看到第一个参数其实代表的是从现在开始要delay的end-time,即从事务的​​creationTime​​​开始 到 事务的设置的超时时间结束。如果这段时间内没有完成事务的相关操作(commit/get),就会通过​​resetPormise​​​ send一个timeout error。
这块的逻辑比较重要,假如我们设置的超时时间是​​​100ms​​​,但是我们上层从创建事务到执行commit操作之前有一些额外的操作消耗掉了这100ms的时间,那在执行接下来的​​commit/get​​操作就有可能收到超时错误。

这个错误并不是由 setoption 接口返回的,但是是其内部的 RYW事务的 timebomb 函数send给 transaction共享的 ​​resetPromise​​​ ,会由后续的 ​​onError​​​ 取出来返回给用户。这样的一个调度链路就是setOption设置超时时间时起内部产生的错误码会被 commit 之后的 ​​onError​​捕获,该错误并不是单纯的commit报出的错误,所以如果不看代码,根本不清楚这块的调度链路,误导大家对错误的判断。

此外,对于超时的处理如果想要单纯的check 某一个接口的操作是否会超时,则需要将现在到createtime 之间的时间也加进来才行。

对于其他类型的 transaction option 的设置直接看 ​​ReadYourWritesTransaction::setOptionImpl​​的处理,有不同类型的处理方式,有的是不需要产生异步 promise 由 g_network thread 去调度的。

fdb_transaction_commit 实现

前面提到的 setoption 在整个client 调度链路的图在当前 commit 这里也适用。

关于 foundationdb client接口的基本实现 以及 使用上的一些小细节_c++_02


同样会生成一个 ​​​PromiseTask​​​ 对象被 g_network thread 异步调度,执行链路是和前面setOption一样的,我们主要是关注 commit部分在 ​​ReadYourWritesTransaction​​​ 中的实现逻辑。

核心commit 的逻辑实现调用栈如下:

RYWImpl::commit
Transaction::commit
commitAndWatch (NativeAPI.actor.cpp)
Transaction::commitMutations
tryCommit (NativeAPI.actor.cpp)

在 ​​Transaction::commitMutations​​​ 前面主要是做一些 option 上的check 和对应逻辑的处理,最终会进入到​​tryCommit​​​ 逻辑,在这个函数中核心就是拿着前面 ​​fdb_create_database​​​ 中保存的 ​​clientInfo​​​即 commitProxies 的信息,直接通过​​fdb_setup_network​​​ 时构造的通信组件​​FlowTransport​​​ 发送rpc 到 ​​firstCommitProxy​​这个节点进行提交。

ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
CommitTransactionRequest req,
Future<Version> readVersion) {
...
try {
// 开启buggify,则会随机生成一些错误码返回,用于为上层客户端提供有效的稳定性测试。
if (CLIENT_BUGGIFY) {
throw deterministicRandom()->randomChoice(std::vector<Error>{
not_committed(), transaction_too_old(), proxy_memory_limit_exceeded(), commit_unknown_result() });
}
...
req.debugID = commitID;
state Future<CommitID> reply;
if (trState->options.commitOnFirstProxy) {
if (trState->cx->clientInfo->get().firstCommitProxy.present()) {
reply = throwErrorOr(brokenPromiseToMaybeDelivered(
trState->cx->clientInfo->get().firstCommitProxy.get().commit.tryGetReply(req)));
} else {
const std::vector<CommitProxyInterface>& proxies = trState->cx->clientInfo->get().commitProxies;
reply = proxies.size() ? throwErrorOr(brokenPromiseToMaybeDelivered(proxies[0].commit.tryGetReply(req)))
: Never();
}
} else {
reply = basicLoadBalance(trState->cx->getCommitProxies(trState->useProvisionalProxies),
&CommitProxyInterface::commit,
req,
TaskPriority::DefaultPromiseEndpoint,
AtMostOnce::True);
}
...
}

后续的 commit req请求的处理就到 fdbserver 了,这块的实现后续的文章再单独描述吧,本篇就不提了。流程上可以参考:​​foundationdb read and write path internal​​。

fdb_transaction_on_error 实现

这个接口是对一个transaction的错误处理,并在返回前根据相关的事务处理的错误码进行对应的错误处理逻辑,需要注意的是这个接口仅能在有错误码的情况下使用,如果本身处理是成功的,这个接口传入的error-code 为 0 的话就会返回2000 错误码,表示接口使用不当。

调用栈和前面的setOption一样,不过在 ​​ThreadSafeTransaction​​​ 丢promise 进g_network thread时变成阻塞方式的了,需要等到RYW 的 ​​onError​​​执行返回才会给上层返回。
主体逻辑在 ​​​RYWImpl::onError​​ 中。

ReadYourWritesTransaction::onError
RYWImpl::onError

大概的处理逻辑是:

  1. check 当前事务处理失败 并 尝试retry的次数是否超过 ​​FDBTransactionOptions::RETRY_LIMIT​​的限制,超过则throw error.
  2. 调用 ​​Transaction::onError​​ 函数继续处理错误码:
  • 针对如下错误码 – 认为是可重试的错误,会触发一个backoff策略,简单来说就是delay 一段backoff的时间,并且reset 当前事务。

if (e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result ||
e.code() == error_code_database_locked || e.code() == error_code_proxy_memory_limit_exceeded ||
e.code() == error_code_process_behind || e.code() == error_code_batch_transaction_throttled ||
e.code() == error_code_tag_throttled) {
if (e.code() == error_code_not_committed)
++trState->cx->transactionsNotCommitted;
else if (e.code() == error_code_commit_unknown_result)
++trState->cx->transactionsMaybeCommitted;
else if (e.code() == error_code_proxy_memory_limit_exceeded)
++trState->cx->transactionsResourceConstrained;
else if (e.code() == error_code_process_behind)
++trState->cx->transactionsProcessBehind;
else if (e.code() == error_code_batch_transaction_throttled || e.code() == error_code_tag_throttled) {
++trState->cx->transactionsThrottled;
}

double backoff = getBackoff(e.code());
reset();
return delay(backoff, trState->taskID);
}

  • 后面还有两种会执行 backoff 和 reset 的错误码场景

if (e.code() == error_code_transaction_too_old || 
e.code() == error_code_future_version) {
if (e.code() == error_code_transaction_too_old)
++trState->cx->transactionsTooOld;
else if (e.code() == error_code_future_version)
++trState->cx->transactionsFutureVersions;

double maxBackoff = trState->options.maxBackoff;
reset();
return delay(std::min(CLIENT_KNOBS->FUTURE_VERSION_RETRY_DELAY, maxBackoff), trState->taskID);
}
if (e.code() == error_code_unknown_tenant) {
double maxBackoff = trState->options.maxBackoff;
reset();
return delay(std::min(CLIENT_KNOBS->UNKNOWN_TENANT_RETRY_DELAY, maxBackoff), trState->taskID);
}

在 onError内部,会自动对以上错误码调度 reset,只是针对不同的错误码会有不同的backoff策略,体现在delay的时间长或者短。

tenant 相关的接口实现

多租户在 foundationdb 中是一个比较重要的特性,因为不同用户的数据应该要能够快速识别,并且在逻辑访问上是隔离的,这样一个fdb集群就能够为多用户提供访问特性,且保证了每一个用户的访问性能,目前tenant 相关的特性还处于实验阶段。

实现上是比较简单的,就是在commit的时候,在用户写入的key前面加一个对应的租户prefix,这个prefix是利用租户名生成的,可以转为一个​​int64_t​​的id标识。

如下tryCommit 函数,我们前面介绍事务的commit api实现的时候提到过:

ACTOR static Future<Void> tryCommit(Reference<TransactionState> trState,
CommitTransactionRequest req,
Future<Version> readVersion) {
...
state Key tenantPrefix;
if (trState->tenant().present()) {
KeyRangeLocationInfo locationInfo = wait(getKeyLocation(trState,
""_sr,
&StorageServerInterface::getValue,
Reverse::False,
UseTenant::True,
req.transaction.read_snapshot));
applyTenantPrefix(req, locationInfo.tenantEntry.prefix);
tenantPrefix = locationInfo.tenantEntry.prefix;
}
...
}

只需要在读的时候保证能够将租户相关的prefix 移除,用户要读的数据 返回给用户即可。

结语

fdb其实还有一些比较有趣的接口,比如 ​​fdb_transaction_watch​​ 能够由用户指定对想要审计的key 添加 watcher,这样在后续其他的用户修改了这个key 对应的value 就可以触发用户设置的回掉函数。其实现也是利用的 promise,客户端利用unordered_map 保存客户端设置的wather,在commit k/v的时候会检测提交的key 是否在 这个map 中,在则触发用户外部设置的回调函数。

本文并没有完整展示所有客户端接口的实现,只是挑了一些比较有代表性且重要的接口做了一些介绍,通过本文的介绍,在fdbclient 侧的源代码中基本就能游刃有余了,能够准确的得使用fdb 提供的api。 不过核心代码还是在 fdbserver (分布式事务,共识算法,存储引擎)以及 fdbrpc 和 fdb提供的 simulator测试框架 中,当然这就需要花费更多的时间去深入了。


举报

相关推荐

0 条评论