0
点赞
收藏
分享

微信扫一扫

《MySQL 8.0.22执行器源码分析(2)解读函数 ExecuteIteratorQuery》

函数代码

bool SELECT_LEX_UNIT::ExecuteIteratorQuery(THD *thd) {
THD_STAGE_INFO(thd, stage_executing);
DEBUG_SYNC(thd, "before_join_exec");

Opt_trace_context *const trace = &thd->opt_trace;
Opt_trace_object trace_wrapper(trace);
Opt_trace_object trace_exec(trace, "join_execution");
if (is_simple()) {
trace_exec.add_select_number(first_select()->select_number);
}
Opt_trace_array trace_steps(trace, "steps");

if (ClearForExecution(thd)) {
return true;
}

mem_root_deque<Item *> *fields = get_field_list();
Query_result *query_result = this->query_result();
DBUG_ASSERT(query_result != nullptr);

if (query_result->start_execution(thd)) return true;

if (query_result->send_result_set_metadata(
thd, *fields, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) {
return true;
}

set_executed();

// Hand over the query to the secondary engine if needed.
if (first_select()->join->override_executor_func != nullptr) {
thd->current_found_rows = 0;
for (SELECT_LEX *select = first_select(); select != nullptr;
select = select->next_select()) {
if (select->join->override_executor_func(select->join)) {
return true;
}
thd->current_found_rows += select->join->send_records;
}
const bool calc_found_rows =
(first_select()->active_options() & OPTION_FOUND_ROWS);
if (!calc_found_rows) {
// This is for backwards compatibility reasons only;
// we have documented that without SQL_CALC_FOUND_ROWS,
// we return the actual number of rows returned.
thd->current_found_rows =
std::min(thd->current_found_rows, select_limit_cnt);
}
return query_result->send_eof(thd);
}

if (item) {
item->reset_value_registration();

if (item->assigned()) {
item->assigned(false); // Prepare for re-execution of this unit
item->reset();
}
}

// We need to accumulate in the first join's send_records as long as
// we support SQL_CALC_FOUND_ROWS, since LimitOffsetIterator will use it
// for reporting rows skipped by OFFSET or LIMIT. When we get rid of
// SQL_CALC_FOUND_ROWS, we can use a local variable here instead.
ha_rows *send_records_ptr;
if (fake_select_lex != nullptr) {
// UNION with LIMIT: found_rows() applies to the outermost block.
// LimitOffsetIterator will write skipped OFFSET rows into the
// fake_select_lex's send_records, so use that.
send_records_ptr = &fake_select_lex->join->send_records;
} else if (is_simple()) {
// Not an UNION: found_rows() applies to the join.
// LimitOffsetIterator will write skipped OFFSET rows into the JOIN's
// send_records, so use that.
send_records_ptr = &first_select()->join->send_records;
} else {
// UNION, but without a fake_select_lex (may or may not have a
// LIMIT): found_rows() applies to the outermost block. See
// SELECT_LEX_UNIT::send_records for more information.
send_records_ptr = &send_records;
}
*send_records_ptr = 0;

thd->get_stmt_da()->reset_current_row_for_condition();

{
auto join_cleanup = create_scope_guard([this, thd] {
for (SELECT_LEX *sl = first_select(); sl; sl = sl->next_select()) {
JOIN *join = sl->join;
join->join_free();
thd->inc_examined_row_count(join->examined_rows);
}
if (fake_select_lex != nullptr) {
thd->inc_examined_row_count(fake_select_lex->join->examined_rows);
}
});

if (m_root_iterator->Init()) {
return true;
}

PFSBatchMode pfs_batch_mode(m_root_iterator.get());

for (;;) {
int error = m_root_iterator->Read();
DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;);

if (error > 0 || thd->is_error()) // Fatal error
return true;
else if (error < 0)
break;
else if (thd->killed) // Aborted by user
{
thd->send_kill_message();
return true;
}

++*send_records_ptr;

if (query_result->send_data(thd, *fields)) {
return true;
}
thd->get_stmt_da()->inc_current_row_for_condition();
}

// NOTE: join_cleanup must be done before we send EOF, so that we get the
// row counts right.
}

thd->current_found_rows = *send_records_ptr;

return query_result->send_eof(thd);
}

函数过程浅析

1、is_simple()函数用来判断一个查询表达式是否有union或者多级order,如果没有说明这个查询语句简单。就执行​​add_select_number​​,TODO

2、运行​​ClearForExecution​​函数。

在初始化root迭代器之前,把之前的执行迭代器的数据清除。

3、运行​​get_field_list()​​​,获取查询表达式的字段列表,并将所有字段都放到一个deque中,即​​mem_root_deque<Item*>​​;对于查询块的并集,返回在准备期间生成的字段列表,对于单个查询块,尽可能返回字段列表。

4、运行​​start_execution​​,准备执行查询表达式或DML查询。

5、接下来的一些操作与第二引擎有关,关于该引擎见https://www.h5w3.com/123061.html

总结一下就是:Secondary Engine实际上是MySQL sever上同时支持两个存储引擎,把一部分主引擎上的数据,在Secondary Engine上也保存一份,然后查询的时候会根据优化器的的选择决定在哪个引擎上处理数据。

我们这里先不看这一部分

6、如果该查询用于子查询,那么重新reset,指向子查询。

7、接下来是对于复杂句以及简单句的不同处理,从而给​​send_records_ptr​​赋值。

函数对于这个情况的解释如下:

We need to accumulate in the first join's send_records as long as
we support SQL_CALC_FOUND_ROWS, since LimitOffsetIterator will use it
for reporting rows skipped by OFFSET or LIMIT. When we get rid of
SQL_CALC_FOUND_ROWS, we can use a local variable here instead.

情况一:如果该查询块具有UNION或者多级的ORDER BY/LIMIT的话

UNION with LIMIT的话,​​found_rows()​​用于最外层

​LimitOffsetIterator​​​跳过偏移量行写入​​send_records​

情况二:如果是个简单句的话

​found_rows()​​直接用到join上。

​LimitOffsetIterator​​​跳过偏移量行写入​​send_records​

情况三:如果是UNION,但是没有LIMIT

​found_rows()​​用于最外层。

8、重置计数器

9、接下来是一个对查询块遍历,逐个释放内存的操作,用以增加并发性并减少内存消耗。

10、初始化根迭代器

11、然后for循环,从根迭代器一直到引擎的handler,调用读取数据。如果出错就直接返回。

如果收到kill信号,也返回。

在循环中对​​send_records_ptr​​进行累加。

行计数器++,指向下一行。

12、将​​send_records_ptr​​​赋值给该线程的​​current_found_rows​


举报

相关推荐

0 条评论