内联头
source/server/server.cc 325行
void registerCustomInlineHeadersFromBootstrap(
const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {//注册内联头
for (const auto& inline_header : bootstrap.inline_headers()) {//遍历内联头
const Http::LowerCaseString lower_case_name(inline_header.inline_header_name());//小写头名称
if (!canBeRegisteredAsInlineHeader(lower_case_name)) {//如果不能注册为内联头,则抛异常
throw EnvoyException(fmt::format("Header {} cannot be registered as an inline header.",
inline_header.inline_header_name()));
}
switch (inline_header.inline_header_type()) {//内联头的类型
case envoy::config::bootstrap::v3::CustomInlineHeader::REQUEST_HEADER://请求头类型
Http::CustomInlineHeaderRegistry::registerInlineHeader<
Http::RequestHeaderMap::header_map_type>(lower_case_name);//注册内联头
break;
case envoy::config::bootstrap::v3::CustomInlineHeader::REQUEST_TRAILER://请求trailer类型
Http::CustomInlineHeaderRegistry::registerInlineHeader<
Http::RequestTrailerMap::header_map_type>(lower_case_name);//注册请求头
break;
case envoy::config::bootstrap::v3::CustomInlineHeader::RESPONSE_HEADER://响应头
Http::CustomInlineHeaderRegistry::registerInlineHeader<
Http::ResponseHeaderMap::header_map_type>(lower_case_name);//注册内联头
break;
case envoy::config::bootstrap::v3::CustomInlineHeader::RESPONSE_TRAILER://响应trailer
Http::CustomInlineHeaderRegistry::registerInlineHeader<
Http::ResponseTrailerMap::header_map_type>(lower_case_name);//注册内联头
break;
default:
PANIC("not implemented");
}
}
}
source/common/http/header_map_impl.cc 670行
std::vector<HeaderMapImplUtility::HeaderMapImplInfo>
HeaderMapImplUtility::getAllHeaderMapImplInfo() {//获取所有的HeaderMapImplInfo
std::vector<HeaderMapImplUtility::HeaderMapImplInfo> ret;
ret.push_back(makeHeaderMapImplInfo<RequestHeaderMapImpl>("request header map"));
ret.push_back(makeHeaderMapImplInfo<RequestTrailerMapImpl>("request trailer map"));
ret.push_back(makeHeaderMapImplInfo<ResponseHeaderMapImpl>("response header map"));
ret.push_back(makeHeaderMapImplInfo<ResponseTrailerMapImpl>("response trailer map"));
return ret;
}
source/common/http/header_map_impl.cc 654行
template <class T>
HeaderMapImplUtility::HeaderMapImplInfo makeHeaderMapImplInfo(absl::string_view name) {
// Constructing a header map implementation will force the custom headers and sizing to be
// finalized, so do that first.
auto header_map = T::create();//创建对象
HeaderMapImplUtility::HeaderMapImplInfo info;
info.name_ = std::string(name);//设置名称
info.size_ = T::inlineHeadersSize() + sizeof(T);//设置size
for (const auto& header : CustomInlineHeaderRegistry::headers<T::header_map_type>()) {//遍历内联头
info.registered_headers_.push_back(header.first.get());//追加registered_headers_
}
return info;
}
工厂
source/common/config/utility.h 333行
template <class Factory, class ProtoMessage>
static Factory& getAndCheckFactory(const ProtoMessage& message) {//获取和检查工厂
return *getAndCheckFactory<Factory>(message, false);
}
source/common/config/utility.h 311行
template <class Factory, class ProtoMessage>
static Factory* getAndCheckFactory(const ProtoMessage& message, bool is_optional) {
Factory* factory = Utility::getFactoryByType<Factory>(message.typed_config());//根据类型获取工厂
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.no_extension_lookup_by_name")) {//判断是否启用了特性
if (factory == nullptr && !is_optional) {//如果工厂是空的,并且不是可选的,报异常
ExceptionUtil::throwEnvoyException(
fmt::format("Didn't find a registered implementation for '{}' with type URL: '{}'",
message.name(), getFactoryType(message.typed_config())));
}
return factory;
} else if (factory != nullptr) {
return factory;
}
return Utility::getAndCheckFactoryByName<Factory>(message.name(), is_optional);//根据名称获取工厂
}
source/common/config/utility.h 368行
*/
template <class Factory> static Factory* getFactoryByType(const ProtobufWkt::Any& typed_config) {
if (typed_config.type_url().empty()) {//如果类型type_url为空,返回空指针
return nullptr;
}
return Registry::FactoryRegistry<Factory>::getFactoryByType(getFactoryType(typed_config));//根据类型获取工厂
}
source/common/config/utility.h 342行
static std::string getFactoryType(const ProtobufWkt::Any& typed_config) {//获取工程类型
static const std::string& typed_struct_type =
xds::type::v3::TypedStruct::default_instance().GetDescriptor()->full_name();//获取类型名称
static const std::string& legacy_typed_struct_type =
udpa::type::v1::TypedStruct::default_instance().GetDescriptor()->full_name();//获取遗留类型名称
// Unpack methods will only use the fully qualified type name after the last '/'.
// https://github.com/protocolbuffers/protobuf/blob/3.6.x/src/google/protobuf/any.proto#L87
auto type = std::string(TypeUtil::typeUrlToDescriptorFullName(typed_config.type_url()));//type_url转类型
if (type == typed_struct_type) {//如果类型相同
xds::type::v3::TypedStruct typed_struct;
MessageUtil::unpackTo(typed_config, typed_struct);
// Not handling nested structs or typed structs in typed structs
return std::string(TypeUtil::typeUrlToDescriptorFullName(typed_struct.type_url()));//获取类型
} else if (type == legacy_typed_struct_type) {//如果类型相同
udpa::type::v1::TypedStruct typed_struct;
MessageUtil::unpackTo(typed_config, typed_struct);
// Not handling nested structs or typed structs in typed structs
return std::string(TypeUtil::typeUrlToDescriptorFullName(typed_struct.type_url()));//获取类型
}
return type;
}
envoy/registry/registry.h 297行
static Base* getFactoryByType(absl::string_view type) {//根据类型获取工厂
auto it = factoriesByType().find(type);//查找类型匹配的工程
if (it == factoriesByType().end()) {//如果是最后一个则返回空指针
return nullptr;
}
return it->second;//返回工厂
}
source/common/runtime/runtime_features.cc 163行
bool runtimeFeatureEnabled(absl::string_view feature) {
absl::CommandLineFlag* flag = RuntimeFeaturesDefaults::get().getFlag(feature);//获取flag
if (flag == nullptr) {//flag是控制者返回false
IS_ENVOY_BUG(absl::StrCat("Unable to find runtime feature ", feature));
return false;
}
// We validate in map creation that the flag is a boolean.
return flag->TryGet<bool>().value();//返回flag值
}
source/common/config/utility.h 408行
template <class Factory>
static ProtobufTypes::MessagePtr
translateAnyToFactoryConfig(const ProtobufWkt::Any& typed_config,
ProtobufMessage::ValidationVisitor& validation_visitor,
Factory& factory) {//转换any成工厂配置
ProtobufTypes::MessagePtr config = factory.createEmptyConfigProto();//创建空配置
// Fail in an obvious way if a plugin does not return a proto.
RELEASE_ASSERT(config != nullptr, "");
// Check that the config type is not google.protobuf.Empty
RELEASE_ASSERT(config->GetDescriptor()->full_name() != "google.protobuf.Empty", "");
translateOpaqueConfig(typed_config, validation_visitor, *config);//转换配置
return config;
}
source/common/config/utility.h 252行
void Utility::translateOpaqueConfig(const ProtobufWkt::Any& typed_config,
ProtobufMessage::ValidationVisitor& validation_visitor,
Protobuf::Message& out_proto) {//转换配置
static const std::string struct_type =
ProtobufWkt::Struct::default_instance().GetDescriptor()->full_name();
static const std::string typed_struct_type =
xds::type::v3::TypedStruct::default_instance().GetDescriptor()->full_name();
static const std::string legacy_typed_struct_type =
udpa::type::v1::TypedStruct::default_instance().GetDescriptor()->full_name();
if (!typed_config.value().empty()) {//配置不空
// Unpack methods will only use the fully qualified type name after the last '/'.
// https://github.com/protocolbuffers/protobuf/blob/3.6.x/src/google/protobuf/any.proto#L87
absl::string_view type = TypeUtil::typeUrlToDescriptorFullName(typed_config.type_url());//type_url转type
if (type == typed_struct_type) {
xds::type::v3::TypedStruct typed_struct;
MessageUtil::unpackTo(typed_config, typed_struct);
// if out_proto is expecting Struct, return directly
if (out_proto.GetDescriptor()->full_name() == struct_type) {
out_proto.CopyFrom(typed_struct.value());
} else {
// The typed struct might match out_proto, or some earlier version, let
// MessageUtil::jsonConvert sort this out.
MessageUtil::jsonConvert(typed_struct.value(), validation_visitor, out_proto);
}
} else if (type == legacy_typed_struct_type) {
udpa::type::v1::TypedStruct typed_struct;
MessageUtil::unpackTo(typed_config, typed_struct);
// if out_proto is expecting Struct, return directly
if (out_proto.GetDescriptor()->full_name() == struct_type) {
out_proto.CopyFrom(typed_struct.value());
} else {
// The typed struct might match out_proto, or some earlier version, let
// MessageUtil::jsonConvert sort this out.
MessageUtil::jsonConvert(typed_struct.value(), validation_visitor, out_proto);
}
} // out_proto is expecting Struct, unpack directly
else if (type != struct_type || out_proto.GetDescriptor()->full_name() == struct_type) {
MessageUtil::unpackTo(typed_config, out_proto);
} else {
ProtobufWkt::Struct struct_config;
MessageUtil::unpackTo(typed_config, struct_config);
MessageUtil::jsonConvert(struct_config, validation_visitor, out_proto);
}
}
}
source/common/common/regex.cc 66行
EnginePtr GoogleReEngineFactory::createEngine(const Protobuf::Message&,
Server::Configuration::ServerFactoryContext&) {
return std::make_shared<GoogleReEngine>();//创建正则引擎
}
stats相关
source/common/config/utility.h 217行
Stats::TagProducerPtr
Utility::createTagProducer(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
const Stats::TagVector& cli_tags) {//床键tag生产者
return std::make_unique<Stats::TagProducerImpl>(bootstrap.stats_config(), cli_tags);
}
source/common/stats/tag_producer_impl.cc 17行
TagProducerImpl::TagProducerImpl(const envoy::config::metrics::v3::StatsConfig& config,
const Stats::TagVector& cli_tags) {
// To check name conflict.
reserveResources(config);
absl::node_hash_set<std::string> names = addDefaultExtractors(config);//添加默认抽取器
for (const auto& cli_tag : cli_tags) {//遍历命令行tag
if (!names.emplace(cli_tag.name_).second) {//追加名称
throw EnvoyException(fmt::format("Tag name '{}' specified twice.", cli_tag.name_));
}
default_tags_.emplace_back(cli_tag);//最佳默认tag
}
for (const auto& tag_specifier : config.stats_tags()) {//遍历统计tag
const std::string& name = tag_specifier.tag_name(); //获取tag名称
if (!names.emplace(name).second) {//追加名称
throw EnvoyException(fmt::format("Tag name '{}' specified twice.", name));
}
// If no tag value is found, fallback to default regex to keep backward compatibility.
if (tag_specifier.tag_value_case() ==
envoy::config::metrics::v3::TagSpecifier::TagValueCase::TAG_VALUE_NOT_SET ||
tag_specifier.tag_value_case() ==
envoy::config::metrics::v3::TagSpecifier::TagValueCase::kRegex) {//如果tag值case没有设置或者是正则
if (tag_specifier.regex().empty()) {//如果tag正则为空
if (addExtractorsMatching(name) == 0) {//添加匹配的抽取器
throw EnvoyException(fmt::format(
"No regex specified for tag specifier and no default regex for name: '{}'", name));
}
} else {
addExtractor(TagExtractorImplBase::createTagExtractor(name, tag_specifier.regex()));//添加抽取器
}
} else if (tag_specifier.tag_value_case() ==
envoy::config::metrics::v3::TagSpecifier::TagValueCase::kFixedValue) {//如果tag值case是固定值
default_tags_.emplace_back(Tag{name, tag_specifier.fixed_value()});//添加默认tag
}
}
}
source/common/stats/tag_producer_impl.cc 119行
absl::node_hash_set<std::string>
TagProducerImpl::addDefaultExtractors(const envoy::config::metrics::v3::StatsConfig& config) {
absl::node_hash_set<std::string> names;
if (!config.has_use_all_default_tags() || config.use_all_default_tags().value()) {//不使用所有默认tag,或者使用所有默认tag有值
for (const auto& desc : Config::TagNames::get().descriptorVec()) {//遍历描述vec
names.emplace(desc.name_);//追加名称
addExtractor(TagExtractorImplBase::createTagExtractor(desc.name_, desc.regex_, desc.substr_,
desc.negative_match_, desc.re_type_));//添加抽取器
}
for (const auto& desc : Config::TagNames::get().tokenizedDescriptorVec()) {//遍历令牌化的描述vec
names.emplace(desc.name_);//追加名称
addExtractor(std::make_unique<TagExtractorTokensImpl>(desc.name_, desc.pattern_));//添加抽取器
}
}
return names;
}
source/common/stats/tag_extractor_impl.cc 61行
TagExtractorPtr TagExtractorImplBase::createTagExtractor(absl::string_view name,
absl::string_view regex,
absl::string_view substr,
absl::string_view negative_match,
Regex::Type re_type) {
if (name.empty()) {//如果名称为空,则返回异常
throw EnvoyException("tag_name cannot be empty");
}
if (regex.empty()) {//如果正则色空,抛异常
throw EnvoyException(fmt::format(
"No regex specified for tag specifier and no default regex for name: '{}'", name));
}
switch (re_type) {//判断正则类型
case Regex::Type::Re2://google re2正则
return std::make_unique<TagExtractorRe2Impl>(name, regex, substr, negative_match);//创建抽取器
case Regex::Type::StdRegex://标准正则
ASSERT(negative_match.empty(), "Not supported");
return std::make_unique<TagExtractorStdRegexImpl>(name, regex, substr);//创建抽取器
}
PANIC_DUE_TO_CORRUPT_ENUM;
}
source/common/stats/tag_producer_impl.cc 75行
void TagProducerImpl::addExtractor(TagExtractorPtr extractor) {//添加抽取器
const absl::string_view prefix = extractor->prefixToken();//获取前缀令牌
if (prefix.empty()) {//如果前缀为空
tag_extractors_without_prefix_.emplace_back(std::move(extractor));//插入抽取器
} else {
tag_extractor_prefix_map_[prefix].emplace_back(std::move(extractor));//插入抽取器map
}
}
source/common/stats/tag_producer_impl.cc 57行
int TagProducerImpl::addExtractorsMatching(absl::string_view name) {
int num_found = 0;
for (const auto& desc : Config::TagNames::get().descriptorVec()) {//遍历tag名称秒速vec
if (desc.name_ == name) {//如果名称匹配
addExtractor(TagExtractorImplBase::createTagExtractor(desc.name_, desc.regex_, desc.substr_,
desc.negative_match_, desc.re_type_));//添加抽取器
++num_found;//找到加1
}
}
for (const auto& desc : Config::TagNames::get().tokenizedDescriptorVec()) {//遍历tag名称令牌描述vec
if (desc.name_ == name) {//如果名称匹配
addExtractor(std::make_unique<TagExtractorTokensImpl>(desc.name_, desc.pattern_));//添加抽取器
++num_found;
}
}
return num_found;
}
source/common/config/utility.h 223行
Stats::StatsMatcherPtr
Utility::createStatsMatcher(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Stats::SymbolTable& symbol_table) {
return std::make_unique<Stats::StatsMatcherImpl>(bootstrap.stats_config(), symbol_table);//创建统计匹配器
}
source/common/stats/stats_matcher_impl.cc 17行
StatsMatcherImpl::StatsMatcherImpl(const envoy::config::metrics::v3::StatsConfig& config,
SymbolTable& symbol_table)
: symbol_table_(symbol_table), stat_name_pool_(std::make_unique<StatNamePool>(symbol_table)) {
switch (config.stats_matcher().stats_matcher_case()) {//判断统计匹配器case
case envoy::config::metrics::v3::StatsMatcher::StatsMatcherCase::kRejectAll://如果是rejectAll类型
// In this scenario, there are no matchers to store.
is_inclusive_ = !config.stats_matcher().reject_all();//调用拒绝所有
break;
case envoy::config::metrics::v3::StatsMatcher::StatsMatcherCase::kInclusionList://如果是inclusionList
// If we have an inclusion list, we are being default-exclusive.
for (const auto& stats_matcher : config.stats_matcher().inclusion_list().patterns()) {//遍历统计匹配器包含列表模式
matchers_.push_back(Matchers::StringMatcherImpl(stats_matcher));//追加模式
optimizeLastMatcher();//优化最后个匹配器
}
is_inclusive_ = false;
break;
case envoy::config::metrics::v3::StatsMatcher::StatsMatcherCase::kExclusionList://如果是排除列表
// If we have an exclusion list, we are being default-inclusive.
for (const auto& stats_matcher : config.stats_matcher().exclusion_list().patterns()) {//遍历统计匹配器排除列表模式
matchers_.push_back(Matchers::StringMatcherImpl(stats_matcher));//匹配器追加
optimizeLastMatcher();//优化最后匹配器
}
FALLTHRU;
default:
// No matcher was supplied, so we default to inclusion.
is_inclusive_ = true;
break;
}
}
source/common/config/utility.h 229行
Stats::HistogramSettingsConstPtr
Utility::createHistogramSettings(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
return std::make_unique<Stats::HistogramSettingsImpl>(bootstrap.stats_config());//创建HistogramSettingsImpl
}
source/common/stats/histogram_impl.cc 101行
HistogramSettingsImpl::HistogramSettingsImpl(const envoy::config::metrics::v3::StatsConfig& config)
: configs_([&config]() {
std::vector<Config> configs;
for (const auto& matcher : config.histogram_bucket_settings()) {//遍历设置
std::vector<double> buckets{matcher.buckets().begin(), matcher.buckets().end()};//创建buckets
std::sort(buckets.begin(), buckets.end());//排序buckets
configs.emplace_back(matcher.match(), std::move(buckets));//统计配置追加匹配和buckets
}
return configs;
}()) {}
source/server/server.h 70行
#define ALL_SERVER_STATS(COUNTER, GAUGE, HISTOGRAM) \
COUNTER(debug_assertion_failures) \
COUNTER(envoy_bug_failures) \
COUNTER(dynamic_unknown_fields) \
COUNTER(static_unknown_fields) \
COUNTER(wip_protos) \
COUNTER(dropped_stat_flushes) \
GAUGE(concurrency, NeverImport) \
GAUGE(days_until_first_cert_expiring, NeverImport) \
GAUGE(seconds_until_first_ocsp_response_expiring, NeverImport) \
GAUGE(hot_restart_epoch, NeverImport) \
/* hot_restart_generation is an Accumulate gauge; we omit it here for testing dynamics. */ \
GAUGE(live, NeverImport) \
GAUGE(memory_allocated, Accumulate) \
GAUGE(memory_heap_size, Accumulate) \
GAUGE(memory_physical_size, Accumulate) \
GAUGE(parent_connections, Accumulate) \
GAUGE(state, NeverImport) \
GAUGE(stats_recent_lookups, NeverImport) \
GAUGE(total_connections, Accumulate) \
GAUGE(uptime, Accumulate) \
GAUGE(version, NeverImport) \
HISTOGRAM(initialization_time_ms, Milliseconds)
struct ServerStats {
ALL_SERVER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
};
source/server/server.h 58行
#define ALL_SERVER_COMPILATION_SETTINGS_STATS(COUNTER, GAUGE, HISTOGRAM) \
GAUGE(fips_mode, NeverImport)
struct ServerCompilationSettingsStats {
ALL_SERVER_COMPILATION_SETTINGS_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT,
GENERATE_HISTOGRAM_STRUCT)
};
source/common/protobuf/message_validator_impl.h
void setCounters(Stats::Counter& static_unknown_counter, Stats::Counter& dynamic_unknown_counter,
Stats::Counter& wip_counter) {//设置计数器
strict_validation_visitor_.setCounters(wip_counter);//谁知严格校验防蚊器计数器
static_warning_validation_visitor_.setCounters(static_unknown_counter, wip_counter);//设置静态告警校验防蚊器计数器
dynamic_warning_validation_visitor_.setCounters(dynamic_unknown_counter, wip_counter);//设置动态告警校验访问器计数器
}
source/common/stats/timespan_impl.cc 9行
HistogramCompletableTimespanImpl::HistogramCompletableTimespanImpl(Histogram& histogram,
TimeSource& time_source)
: time_source_(time_source), histogram_(histogram), start_(time_source.monotonicTime()) {
ensureTimeHistogram(histogram);//确保时间
}
source/common/stats/timespan_impl.cc 21行
void HistogramCompletableTimespanImpl::ensureTimeHistogram(const Histogram& histogram) const {
switch (histogram.unit()) {//判断时间单位
case Histogram::Unit::Null:
case Histogram::Unit::Microseconds:
case Histogram::Unit::Milliseconds:
return;
case Histogram::Unit::Unspecified:
case Histogram::Unit::Bytes:
case Histogram::Unit::Percent:
RELEASE_ASSERT(
false,
fmt::format("Cannot create a timespan flushing the duration to histogram '{}' because "
"it does not measure time. This is a programming error, either pass a "
"histogram measuring time or fix the unit of the passed histogram.",
histogram.name()));
}
}
source/server/server.cc 181行
void InstanceImpl::failHealthcheck(bool fail) {//失败健康监测
live_.store(!fail);//保存状态
server_stats_->live_.set(live_.load());//设置活跃状态
}
source/common/local_info/local_info_impl.h 46行
LocalInfoImpl(Stats::SymbolTable& symbol_table, const envoy::config::core::v3::Node& node,
const Protobuf::RepeatedPtrField<std::string>& node_context_params,
const Network::Address::InstanceConstSharedPtr& address,
absl::string_view zone_name, absl::string_view cluster_name,
absl::string_view node_name)//构造本地信息实例
: node_(buildLocalNode(node, zone_name, cluster_name, node_name)), address_(address),
context_provider_(node_, node_context_params),
zone_stat_name_storage_(getZoneName(node_, zone_name), symbol_table),
zone_stat_name_(zone_stat_name_storage_.statName()),
dynamic_update_callback_handle_(context_provider_.addDynamicContextUpdateCallback(
[this](absl::string_view resource_type_url) {
(*node_.mutable_dynamic_parameters())
[toStdStringView(resource_type_url)] // NOLINT(std::string_view)
.CopyFrom(context_provider_.dynamicContext(resource_type_url));
})) {}
source/server/configuration_impl.cc 199行
InitialImpl::InitialImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {//构造初始化器
const auto& admin = bootstrap.admin();//获取admin配置
admin_.profile_path_ =
admin.profile_path().empty() ? "/var/log/envoy/envoy.prof" : admin.profile_path();//获取性能日志路径
if (admin.has_address()) {//如果有管理地址
admin_.address_ = Network::Address::resolveProtoAddress(admin.address());//获取管理地址
}
admin_.socket_options_ = std::make_shared<std::vector<Network::Socket::OptionConstSharedPtr>>();//创建管理socket选项
if (!admin.socket_options().empty()) {//如果管理选项不为空
Network::Socket::appendOptions(
admin_.socket_options_,
Network::SocketOptionFactory::buildLiteralOptions(admin.socket_options()));//设置管理选项
}
admin_.ignore_global_conn_limit_ = admin.ignore_global_conn_limit();//设置管理是否忽略全局连接限制
if (!bootstrap.flags_path().empty()) {//如果启动参数路径不为空
flags_path_ = bootstrap.flags_path();//获取启动参数路径
}
if (bootstrap.has_layered_runtime()) {//如果启动配置有运行时
layered_runtime_.MergeFrom(bootstrap.layered_runtime());//合并运行时
if (layered_runtime_.layers().empty()) {//如果运行时为空
layered_runtime_.add_layers()->mutable_admin_layer();//添加管理运行时
}
}
}
source/server/admin/admin.cc 79行
AdminImpl::AdminImpl(const std::string& profile_path, Server::Instance& server,
bool ignore_global_conn_limit)//创建管理对象
: server_(server),
request_id_extension_(Extensions::RequestId::UUIDRequestIDExtension::defaultInstance(
server_.api().randomGenerator())),
profile_path_(profile_path),
stats_(Http::ConnectionManagerImpl::generateStats("http.admin.", server_.stats())),
null_overload_manager_(server_.threadLocal()),
tracing_stats_(
Http::ConnectionManagerImpl::generateTracingStats("http.admin.", no_op_store_)),
route_config_provider_(server.timeSource()),
scoped_route_config_provider_(server.timeSource()), clusters_handler_(server),
config_dump_handler_(config_tracker_, server), init_dump_handler_(server),
stats_handler_(server), logs_handler_(server), profiling_handler_(profile_path),
runtime_handler_(server), listeners_handler_(server), server_cmd_handler_(server),
server_info_handler_(server),
// TODO(jsedgwick) add /runtime_reset endpoint that removes all admin-set values
handlers_{
makeHandler("/", "Admin home page", MAKE_ADMIN_HANDLER(handlerAdminHome), false, false),
makeHandler("/certs", "print certs on machine",
MAKE_ADMIN_HANDLER(server_info_handler_.handlerCerts), false, false),
makeHandler("/clusters", "upstream cluster status",
MAKE_ADMIN_HANDLER(clusters_handler_.handlerClusters), false, false),
makeHandler("/config_dump", "dump current Envoy configs (experimental)",
MAKE_ADMIN_HANDLER(config_dump_handler_.handlerConfigDump), false, false),
makeHandler("/init_dump", "dump current Envoy init manager information (experimental)",
MAKE_ADMIN_HANDLER(init_dump_handler_.handlerInitDump), false, false),
makeHandler("/contention", "dump current Envoy mutex contention stats (if enabled)",
MAKE_ADMIN_HANDLER(stats_handler_.handlerContention), false, false),
makeHandler("/cpuprofiler", "enable/disable the CPU profiler",
MAKE_ADMIN_HANDLER(profiling_handler_.handlerCpuProfiler), false, true),
makeHandler("/heapprofiler", "enable/disable the heap profiler",
MAKE_ADMIN_HANDLER(profiling_handler_.handlerHeapProfiler), false, true),
makeHandler("/healthcheck/fail", "cause the server to fail health checks",
MAKE_ADMIN_HANDLER(server_cmd_handler_.handlerHealthcheckFail), false, true),
makeHandler("/healthcheck/ok", "cause the server to pass health checks",
MAKE_ADMIN_HANDLER(server_cmd_handler_.handlerHealthcheckOk), false, true),
makeHandler("/help", "print out list of admin commands", MAKE_ADMIN_HANDLER(handlerHelp),
false, false),
makeHandler("/hot_restart_version", "print the hot restart compatibility version",
MAKE_ADMIN_HANDLER(server_info_handler_.handlerHotRestartVersion), false,
false),
makeHandler("/logging", "query/change logging levels",
MAKE_ADMIN_HANDLER(logs_handler_.handlerLogging), false, true),
makeHandler("/memory", "print current allocation/heap usage",
MAKE_ADMIN_HANDLER(server_info_handler_.handlerMemory), false, false),
makeHandler("/quitquitquit", "exit the server",
MAKE_ADMIN_HANDLER(server_cmd_handler_.handlerQuitQuitQuit), false, true),
makeHandler("/reset_counters", "reset all counters to zero",
MAKE_ADMIN_HANDLER(stats_handler_.handlerResetCounters), false, true),
makeHandler("/drain_listeners", "drain listeners",
MAKE_ADMIN_HANDLER(listeners_handler_.handlerDrainListeners), false, true),
makeHandler("/server_info", "print server version/status information",
MAKE_ADMIN_HANDLER(server_info_handler_.handlerServerInfo), false, false),
makeHandler("/ready", "print server state, return 200 if LIVE, otherwise return 503",
MAKE_ADMIN_HANDLER(server_info_handler_.handlerReady), false, false),
makeStreamingHandler("/stats", "print server stats", stats_handler_, false, false),
makeHandler("/stats/prometheus", "print server stats in prometheus format",
MAKE_ADMIN_HANDLER(stats_handler_.handlerPrometheusStats), false, false),
makeHandler("/stats/recentlookups", "Show recent stat-name lookups",
MAKE_ADMIN_HANDLER(stats_handler_.handlerStatsRecentLookups), false, false),
makeHandler("/stats/recentlookups/clear", "clear list of stat-name lookups and counter",
MAKE_ADMIN_HANDLER(stats_handler_.handlerStatsRecentLookupsClear), false,
true),
makeHandler(
"/stats/recentlookups/disable", "disable recording of reset stat-name lookup names",
MAKE_ADMIN_HANDLER(stats_handler_.handlerStatsRecentLookupsDisable), false, true),
makeHandler(
"/stats/recentlookups/enable", "enable recording of reset stat-name lookup names",
MAKE_ADMIN_HANDLER(stats_handler_.handlerStatsRecentLookupsEnable), false, true),
makeHandler("/listeners", "print listener info",
MAKE_ADMIN_HANDLER(listeners_handler_.handlerListenerInfo), false, false),
makeHandler("/runtime", "print runtime values",
MAKE_ADMIN_HANDLER(runtime_handler_.handlerRuntime), false, false),
makeHandler("/runtime_modify", "modify runtime values",
MAKE_ADMIN_HANDLER(runtime_handler_.handlerRuntimeModify), false, true),
makeHandler("/reopen_logs", "reopen access logs",
MAKE_ADMIN_HANDLER(logs_handler_.handlerReopenLogs), false, true),
},
date_provider_(server.dispatcher().timeSource()),
admin_filter_chain_(std::make_shared<AdminFilterChain>()),
local_reply_(LocalReply::Factory::createDefault()),
ignore_global_conn_limit_(ignore_global_conn_limit) {}
source/server/server.cc 818行
void InstanceImpl::loadServerFlags(const absl::optional<std::string>& flags_path) {//加载服务器参数
if (!flags_path) {//如果服务器参数路径为空
return;
}
ENVOY_LOG(info, "server flags path: {}", flags_path.value());
if (api_->fileSystem().fileExists(flags_path.value() + "/drain")) {//如果路径存在
ENVOY_LOG(info, "starting server in drain mode");
InstanceImpl::failHealthcheck(true);//调用失败健康监测
}
}
source/common/filesystem/posix/filesystem_impl.cc 95行
bool InstanceImplPosix::fileExists(const std::string& path) {//判断文件是否存在
std::ifstream input_file(path);//创建输入文件流
return input_file.is_open();//判断文件是否打开
}
source/common/secret/secret_manager_impl.cc 20行
SecretManagerImpl::SecretManagerImpl(Server::ConfigTracker& config_tracker)//创建secret管理器
: config_tracker_entry_(
config_tracker.add("secrets", [this](const Matchers::StringMatcher& name_matcher) {
return dumpSecretConfigs(name_matcher);
})) {}
负载管理
source/server/overload_manager_impl.cc 302行
OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
ThreadLocal::SlotAllocator& slot_allocator,
const envoy::config::overload::v3::OverloadManager& config,
ProtobufMessage::ValidationVisitor& validation_visitor,
Api::Api& api, const Server::Options& options)//创建负载管理器
: started_(false), dispatcher_(dispatcher), tls_(slot_allocator),
refresh_interval_(
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
proactive_resources_(
std::make_unique<
absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
validation_visitor);//创建资源监测工厂上下文
// We should hide impl details from users, for them there should be no distinction between
// proactive and regular resource monitors in configuration API. But internally we will maintain
// two distinct collections of proactive and regular resources. Proactive resources are not
// subject to periodic flushes and can be recalculated/updated on demand by invoking
// `tryAllocateResource/tryDeallocateResource` via thread local overload state.
for (const auto& resource : config.resource_monitors()) {//遍历资源监测器
const auto& name = resource.name();//获取资源名称
// Check if it is a proactive resource.
auto proactive_resource_it =
OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);//根据名称获取资源
ENVOY_LOG(debug, "Evaluating resource {}", name);
bool result = false;
if (proactive_resource_it !=
OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {//找到资源
ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
auto& factory =
Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
resource);//创建工厂
auto config =
Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);//生成工厂配置
auto monitor = factory.createProactiveResourceMonitor(*config, context);//生成监视器
result =
proactive_resources_
->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
.second;//追加主动资源
} else {
ENVOY_LOG(debug, "Adding resource monitor for {}", name);
auto& factory =
Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);//创建工厂
auto config =
Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);//生成工厂配置
auto monitor = factory.createResourceMonitor(*config, context);//创建监视器
result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;//追加资源
}
if (!result) {
throw EnvoyException(absl::StrCat("Duplicate resource monitor ", name));
}
}
for (const auto& action : config.actions()) {//遍历动作
const auto& name = action.name();//获取动作名称
const auto symbol = action_symbol_table_.get(name);//根据名称获取动作符号
ENVOY_LOG(debug, "Adding overload action {}", name);
// TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
// addressed
// We cannot currently use in place construction as the OverloadAction constructor may throw,
// causing an inconsistent internal state of the actions_ map, which on destruction results in
// an invalid free.
auto result = actions_.try_emplace(symbol, OverloadAction(action, stats_scope));//尝试追加动作
if (!result.second) {//如果追加失败则抛异常
throw EnvoyException(absl::StrCat("Duplicate overload action ", name));
}
if (name == OverloadActionNames::get().ReduceTimeouts) {//如果动作名称是减少时间
timer_minimums_ = std::make_shared<const Event::ScaledTimerTypeMap>(
parseTimerMinimums(action.typed_config(), validation_visitor));//创建时间减少
} else if (name == OverloadActionNames::get().ResetStreams) {//如果动作是重置流
if (!config.has_buffer_factory_config()) {//如果没有缓存工厂配置
throw EnvoyException(
fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
}
makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
} else if (action.has_typed_config()) {//如果动作有typed_config
throw EnvoyException(fmt::format(
"Overload action \"{}\" has an unexpected value for the typed_config field", name));
}
for (const auto& trigger : action.triggers()) {//遍历触发器
const std::string& resource = trigger.name();//获取触发器名称
auto proactive_resource_it =
OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);//查找主动资源
if (resources_.find(resource) == resources_.end() &&
proactive_resource_it ==
OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {//如果没有找到资源
throw EnvoyException(
fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
}
resource_to_actions_.insert(std::make_pair(resource, symbol));//往资源动作里插入数据
}
}
}
source/server/resource_monitor_config_impl.h 11行
ResourceMonitorFactoryContextImpl(Event::Dispatcher& dispatcher, const Server::Options& options,
Api::Api& api,
ProtobufMessage::ValidationVisitor& validation_visitor)
: dispatcher_(dispatcher), options_(options), api_(api),
validation_visitor_(validation_visitor) {}
source/server/overload_manager_impl.cc 242行
OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
Stats::Scope& stats_scope)//构造负载动作
: state_(OverloadActionState::inactive()),
active_gauge_(
makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::Accumulate)),
scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
Stats::Gauge::ImportMode::Accumulate)) {
for (const auto& trigger_config : config.triggers()) {//遍历触发器
TriggerPtr trigger;
switch (trigger_config.trigger_oneof_case()) {//判断触发器类型
case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold://如果是threshold触发器
trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());//创建ThresholdTriggerImpl
break;
case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled://如果是scaled触发器
trigger = std::make_unique<ScaledTriggerImpl>(trigger_config.scaled());//创建ScaledTriggerImpl
break;
case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET://如果是ONEOF_NOT_SET
throw EnvoyException(absl::StrCat("action not set for trigger ", trigger_config.name()));
}
if (!triggers_.try_emplace(trigger_config.name(), std::move(trigger)).second) {//如果触发器已经存在
throw EnvoyException(
absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
}
}
active_gauge_.set(0);//初始化active_gauge_
scale_percent_gauge_.set(0);//初始化scale_percent_gauge_
}
source/common/momery/heap_shrinker.cc 14行
HeapShrinker::HeapShrinker(Event::Dispatcher& dispatcher, Server::OverloadManager& overload_manager,
Stats::Scope& stats)//创建HeapShrinker动作
: active_(false) {
const auto action_name = Server::OverloadActionNames::get().ShrinkHeap;//获取动作名称
if (overload_manager.registerForAction(
action_name, dispatcher,
[this](Server::OverloadActionState state) { active_ = state.isSaturated(); })) {//注册动作
Envoy::Stats::StatNameManagedStorage stat_name(
absl::StrCat("overload.", action_name, ".shrink_count"), stats.symbolTable());//统计管理存储
shrink_counter_ = &stats.counterFromStatName(stat_name.statName());//计数器
timer_ = dispatcher.createTimer([this] {
shrinkHeap();
timer_->enableTimer(kTimerInterval);
});//创建定时任务
timer_->enableTimer(kTimerInterval);//启用定时任务
}
}
void HeapShrinker::shrinkHeap() {
if (active_) {//如果处于活跃状态
Utils::releaseFreeMemory();//释放内存
shrink_counter_->inc();//增加计数器
}
}
source/server/overload_manager_impl.cc 441行
bool OverloadManagerImpl::registerForAction(const std::string& action,
Event::Dispatcher& dispatcher,
OverloadActionCb callback) {//注册动作
ASSERT(!started_);
const auto symbol = action_symbol_table_.get(action);//获取symbol
if (actions_.find(symbol) == actions_.end()) {//如果没找到symbol
ENVOY_LOG(debug, "No overload action is configured for {}.", action);
return false;
}
action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
std::forward_as_tuple(dispatcher, callback));//追加动作回调
return true;
}
source/common/signal/fatal_error_handler.cc 152行
void registerFatalActions(FatalAction::FatalActionPtrList safe_actions,
FatalAction::FatalActionPtrList unsafe_actions,
Thread::ThreadFactory& thread_factory) {//注册致命动作
// Create a FatalActionManager and store it.
if (!fatal_action_manager) {//如果不存在致命动作管理器
fatal_action_manager.exchange(new FatalAction::FatalActionManager(
std::move(safe_actions), std::move(unsafe_actions), thread_factory));//创建致命动作管理器
}
}
监听管理器
source/server/listener_manager_impl.cc 278行
ListenerManagerImpl::ListenerManagerImpl(Instance& server,
ListenerComponentFactory& listener_factory,
WorkerFactory& worker_factory,
bool enable_dispatcher_stats,
Quic::QuicStatNames& quic_stat_names)//创建监听管理器
: server_(server), factory_(listener_factory),
scope_(server.stats().createScope("listener_manager.")), stats_(generateStats(*scope_)),
config_tracker_entry_(server.admin().getConfigTracker().add(
"listeners",
[this](const Matchers::StringMatcher& name_matcher) {
return dumpListenerConfigs(name_matcher);
})),
enable_dispatcher_stats_(enable_dispatcher_stats), quic_stat_names_(quic_stat_names) {
for (uint32_t i = 0; i < server.options().concurrency(); i++) {//循环并发次数
workers_.emplace_back(
worker_factory.createWorker(i, server.overloadManager(), absl::StrCat("worker_", i)));//创建工作线程
}
}
source/server/worker_impl.cc 17行
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
const std::string& worker_name) {//创建工作线程
Event::DispatcherPtr dispatcher(
api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));//创建dispatcher
auto conn_handler = std::make_unique<ConnectionHandlerImpl>(*dispatcher, index);//创建连接处理器
return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
overload_manager, api_, stat_names_);//创建worker
}
source/server/connection_handler_impl.cc 18行
ConnectionHandlerImpl::ConnectionHandlerImpl(Event::Dispatcher& dispatcher,
absl::optional<uint32_t> worker_index)//创建连接处理器
: worker_index_(worker_index), dispatcher_(dispatcher),
per_handler_stat_prefix_(dispatcher.name() + "."), disable_listeners_(false) {}
source/server/worker_impl.cc 26行
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
OverloadManager& overload_manager, Api::Api& api,
WorkerStatNames& stat_names)//创建worker
: tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
api_(api), reset_streams_counter_(
api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
tls_.registerThread(*dispatcher_, false);//注册线程
overload_manager.registerForAction(
OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
[this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });//注册负载管理动作
overload_manager.registerForAction(
OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
[this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });//注册负载管理动作
overload_manager.registerForAction(
OverloadActionNames::get().ResetStreams, *dispatcher_,
[this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });//注册负载管理动作
}
source/common/thread_local/thread_local_impl.cc 107行
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {//注册线程
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
if (main_thread) {//如果是主线程
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else {
ASSERT(!containsReference(registered_threads_, dispatcher));
registered_threads_.push_back(dispatcher);
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
初始化线程和stats
source/common/stats/thread_local_store.cc 192行
void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher,
ThreadLocal::Instance& tls) {//初始化线程
threading_ever_initialized_ = true;
main_thread_dispatcher_ = &main_thread_dispatcher;//设置主线程dispatcher
tls_cache_ = ThreadLocal::TypedSlot<TlsCache>::makeUnique(tls);//创建tlsCache
tls_cache_->set(
[](Event::Dispatcher&) -> std::shared_ptr<TlsCache> { return std::make_shared<TlsCache>(); });//设置tlscache
tls_ = tls;//设置tls
}
source/common/event/dispatcher_impl.cc 103行
void DispatcherImpl::initializeStats(Stats::Scope& scope,
const absl::optional<std::string>& prefix) {//初始化stats
const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");//获取prefix
// This needs to be run in the dispatcher's thread, so that we have a thread id to log.
post([this, &scope, effective_prefix] {//后置回调
stats_prefix_ = effective_prefix + "dispatcher";//获取stats prefix
stats_ = std::make_unique<DispatcherStats>(
DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});//创建stats
base_scheduler_.initializeStats(stats_.get());//初始化stats
ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
});
}
运行时
source/exec/main_common.cc 40行
Runtime::LoaderPtr ProdComponentFactory::createRuntime(Server::Instance& server,
Server::Configuration::Initial& config) {
return Server::InstanceUtil::createRuntime(server, config);//创建运行时
}
Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
Server::Configuration::Initial& config) {//创建海南运行时
ENVOY_LOG(info, "runtime: {}", MessageUtil::getYamlStringFromMessage(config.runtime()));
return std::make_unique<Runtime::LoaderImpl>(
server.dispatcher(), server.threadLocal(), config.runtime(), server.localInfo(),
server.stats(), server.api().randomGenerator(),
server.messageValidationContext().dynamicValidationVisitor(), server.api());
}
source/common/runtime/runtime_impl.cc 404行
LoaderImpl::LoaderImpl(Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls,
const envoy::config::bootstrap::v3::LayeredRuntime& config,
const LocalInfo::LocalInfo& local_info, Stats::Store& store,
Random::RandomGenerator& generator,
ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api)
: generator_(generator), stats_(generateStats(store)), tls_(tls.allocateSlot()),
config_(config), service_cluster_(local_info.clusterName()), api_(api),
init_watcher_("RTDS", [this]() { onRtdsReady(); }), store_(store) {
absl::node_hash_set<std::string> layer_names;//层名称集合
for (const auto& layer : config_.layers()) {//遍历配置
auto ret = layer_names.insert(layer.name());//插入层名称
if (!ret.second) {//插入失败
throw EnvoyException(absl::StrCat("Duplicate layer name: ", layer.name()));
}
switch (layer.layer_specifier_case()) {//判断层类型
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kStaticLayer://静态层
// Nothing needs to be done here.
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kAdminLayer://管理层
if (admin_layer_ != nullptr) {//如果管理层不为空,管理层只能有一个
throw EnvoyException(
"Too many admin layers specified in LayeredRuntime, at most one may be specified");
}
admin_layer_ = std::make_unique<AdminLayer>(layer.name(), stats_);//创建管理层
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kDiskLayer://磁盘层
if (watcher_ == nullptr) {//如果监视器为空
watcher_ = dispatcher.createFilesystemWatcher();//创建监视器
}
watcher_->addWatch(layer.disk_layer().symlink_root(), Filesystem::Watcher::Events::MovedTo,
[this](uint32_t) -> void { loadNewSnapshot(); });//添加监视
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kRtdsLayer://运行时发现层
subscriptions_.emplace_back(//添加订阅
std::make_unique<RtdsSubscription>(*this, layer.rtds_layer(), store, validation_visitor));
init_manager_.add(subscriptions_.back()->init_target_);//订阅初始化魔表,添加到初始化管理器
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::LAYER_SPECIFIER_NOT_SET://层类型错误
throw EnvoyException("layer specifier not set");
}
}
loadNewSnapshot();//加载新快照
}
source/common/event/dispatcher_impl.cc 189行
Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {//创建文件监视器
ASSERT(isThreadSafe());
return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
}
source/common/filesystem/kqueue/watcher_impl.cc 19行
WatcherImpl::WatcherImpl(Event::Dispatcher& dispatcher, Filesystem::Instance& file_system)//监视器实现
: file_system_(file_system), queue_(kqueue()),
kqueue_event_(dispatcher.createFileEvent(//文件事件
queue_,
[this](uint32_t events) -> void {
if (events & Event::FileReadyType::Read) {
onKqueueEvent();
}
},
Event::FileTriggerType::Edge, Event::FileReadyType::Read)) {}
source/common/filesystem/kqueue/watcher_impl.cc 35行
void WatcherImpl::addWatch(absl::string_view path, uint32_t events, Watcher::OnChangedCb cb) {//添加监视
FileWatchPtr watch = addWatch(path, events, cb, false);//添加监视
if (watch == nullptr) {
throw EnvoyException(absl::StrCat("invalid watch path ", path));
}
}
WatcherImpl::FileWatchPtr WatcherImpl::addWatch(absl::string_view path, uint32_t events,
Watcher::OnChangedCb cb, bool path_must_exist) {
bool watching_dir = false;
std::string pathname(path);//路径
int watch_fd = open(pathname.c_str(), O_SYMLINK);//打开文件
if (watch_fd == -1) {//打开失败
if (path_must_exist) {//路径必须存在
return nullptr;
}
watch_fd = open(std::string(file_system_.splitPathFromFilename(path).directory_).c_str(), 0);//打开拆分文件路径的目录
if (watch_fd == -1) {//打开失败
return nullptr;
}
watching_dir = true;//监听目录
}
FileWatchPtr watch(new FileWatch());//创建文件监视指针
watch->fd_ = watch_fd;//设置文件描述符
watch->file_ = pathname;//设置路径
watch->events_ = events;//设置事件
watch->callback_ = cb;//设置回调
watch->watching_dir_ = watching_dir;//设置是否监听目录
u_int flags = NOTE_DELETE | NOTE_RENAME | NOTE_WRITE;//标志位
struct kevent event;
EV_SET(&event, watch_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, flags, 0,
reinterpret_cast<void*>(watch_fd));//上树
if (kevent(queue_, &event, 1, nullptr, 0, nullptr) == -1 || event.flags & EV_ERROR) {
throw EnvoyException(
fmt::format("unable to add filesystem watch for file {}: {}", path, errorDetails(errno)));
}
ENVOY_LOG(debug, "added watch for file: '{}' fd: {}", path, watch_fd);
watches_[watch_fd] = watch;//把文件监视设置到map中
return watch;
}
source/common/runtime/runtime_impl.cc 545行
void LoaderImpl::loadNewSnapshot() {//加载快照
std::shared_ptr<SnapshotImpl> ptr = createNewSnapshot();//创建新快照
tls_->set([ptr](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {//设置tls
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(ptr);
});
refreshReloadableFlags(ptr->values());//刷新可重载的flag
{
absl::MutexLock lock(&snapshot_mutex_);//创建锁
thread_safe_snapshot_ = ptr;//设置线程安全快照
}
}
source/common/runtime/runtime_impl.cc 595行
SnapshotImplPtr LoaderImpl::createNewSnapshot() {//创建新快照
std::vector<Snapshot::OverrideLayerConstPtr> layers;//层数组
uint32_t disk_layers = 0;//磁盘层数量计数
uint32_t error_layers = 0;//错误层数量计数
uint32_t rtds_layer = 0;//运行时发现层数量计数
for (const auto& layer : config_.layers()) {//遍历层配置
switch (layer.layer_specifier_case()) {//判断层类型
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kStaticLayer://静态层
layers.emplace_back(std::make_unique<const ProtoLayer>(layer.name(), layer.static_layer()));//创建proto层并追加到层数组中
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kDiskLayer: {//磁盘层
std::string path =
layer.disk_layer().symlink_root() + "/" + layer.disk_layer().subdirectory();//拼接路径
if (layer.disk_layer().append_service_cluster()) {//如果追加服务集群
path += "/" + service_cluster_;//拼接路径
}
if (api_.fileSystem().directoryExists(path)) {//如果路径存在
TRY_ASSERT_MAIN_THREAD {
layers.emplace_back(std::make_unique<DiskLayer>(layer.name(), path, api_));//创建磁盘层,并追加到层数组
++disk_layers;//磁盘层技术加一
}
END_TRY
catch (EnvoyException& e) {
// TODO(htuch): Consider latching here, rather than ignoring the
// layer. This would be consistent with filesystem RTDS.
++error_layers;//层错误加1
ENVOY_LOG(debug, "error loading runtime values for layer {} from disk: {}",
layer.DebugString(), e.what());
}
}
break;
}
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kAdminLayer://管理层
layers.push_back(std::make_unique<AdminLayer>(*admin_layer_));//创建管理层并追加到层数组
break;
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::kRtdsLayer: {//运行时发现层
auto* subscription = subscriptions_[rtds_layer++].get();//获取订阅
layers.emplace_back(std::make_unique<const ProtoLayer>(layer.name(), subscription->proto_));//创建proto层并追加到层数组中
break;
}
case envoy::config::bootstrap::v3::RuntimeLayer::LayerSpecifierCase::LAYER_SPECIFIER_NOT_SET://类型错误
PANIC_DUE_TO_PROTO_UNSET;
}
}
stats_.num_layers_.set(layers.size());//设置统计
if (error_layers == 0) {
stats_.load_success_.inc();//设置统计
} else {
stats_.load_error_.inc();//设置统计
}
if (disk_layers > 1) {
stats_.override_dir_exists_.inc();//设置统计
} else {
stats_.override_dir_not_exists_.inc();//设置统计
}
return std::make_unique<SnapshotImpl>(generator_, stats_, std::move(layers));//创建快照指针并返回
}
source/common/runtime/runtime_impl.cc 45行
void refreshReloadableFlags(const Snapshot::EntryMap& flag_map) {
absl::flat_hash_map<std::string, bool> quiche_flags_override;// flags map
for (const auto& it : flag_map) {//遍历flags
#ifdef ENVOY_ENABLE_QUIC//如果定义了ENVOY_ENABLE_QUIC
if (absl::StartsWith(it.first, quiche::EnvoyQuicheReloadableFlagPrefix) &&
it.second.bool_value_.has_value()) {//如果flag匹配
quiche_flags_override[it.first.substr(quiche::EnvoyFeaturePrefix.length())] =
it.second.bool_value_.value();//设置quiche flags map值
}
#endif
if (it.second.bool_value_.has_value() && isRuntimeFeature(it.first)) {//如果是运行时属性flag
maybeSetRuntimeGuard(it.first, it.second.bool_value_.value());//设置运行时属性
}
}
#ifdef ENVOY_ENABLE_QUIC//如果定义了ENVOY_ENABLE_QUIC
quiche::FlagRegistry::getInstance().updateReloadableFlags(quiche_flags_override);//更新quiche可重载flags
#endif
// Make sure ints are parsed after the flag allowing deprecated ints is parsed.
for (const auto& it : flag_map) {//遍历flag
if (it.second.uint_value_.has_value()) {
maybeSetDeprecatedInts(it.first, it.second.uint_value_.value());//设置过期属性
}
}
markRuntimeInitialized();//标记运行时初始化完成
}
source/common/runtime/runtime_impl.cc 467行
RtdsSubscription::RtdsSubscription(//运行时发现服务订阅
LoaderImpl& parent, const envoy::config::bootstrap::v3::RuntimeLayer::RtdsLayer& rtds_layer,
Stats::Store& store, ProtobufMessage::ValidationVisitor& validation_visitor)
: Envoy::Config::SubscriptionBase<envoy::service::runtime::v3::Runtime>(validation_visitor,
"name"),
parent_(parent), config_source_(rtds_layer.rtds_config()), store_(store),
stats_scope_(store_.createScope("runtime")), resource_name_(rtds_layer.name()),
init_target_("RTDS " + resource_name_, [this]() { start(); }) {}
source/common/runtime/runtime_impl.cc 520行
void RtdsSubscription::start() { subscription_->start({resource_name_}); }//开始订阅
source/common/config/grpc_subscription_impl.cc 155行
void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
ASSERT(resource_names.empty());
GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});//开始grpc订阅的
}
source/common/config/grpc_subscription_impl.cc 30行
void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
if (init_fetch_timeout_.count() > 0) {//如果初始抓取超时大于0
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {//创建定时器
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);//启用定时器
}
watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, options_);//添加监视器
// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
// "attempt" for a given xDS API combined by ADS is not really that meaningful.
stats_.update_attempt_.inc();//更新统计
// ADS initial request batching relies on the users of the GrpcMux *not* calling start on it,
// whereas non-ADS xDS users must call it themselves.
if (!is_aggregated_) {//如果不是聚集的
grpc_mux_->start();//启动grpc
}
}
source/common/config/grpc_mux_impl.cc 106行
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
const absl::flat_hash_set<std::string>& resources,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
const SubscriptionOptions&) {
auto watch =
std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url, *this);//创建watch实现
ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
// Lazily kick off the requests based on first subscription. This has the
// convenient side-effect that we order messages on the channel based on
// Envoy's internal dependency ordering.
// TODO(gsagula): move TokenBucketImpl params to a config.
if (!apiStateFor(type_url).subscribed_) {//url如果没有订阅
apiStateFor(type_url).request_.set_type_url(type_url);//设置rul
apiStateFor(type_url).request_.mutable_node()->MergeFrom(local_info_.node());//合并node
apiStateFor(type_url).subscribed_ = true;//设置订阅
subscriptions_.emplace_back(type_url);//订阅中追加url
}
// This will send an updated request on each subscription.
// TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added.
// Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we
// only send a single RDS/EDS update after the CDS/LDS update.
queueDiscoveryRequest(type_url);//服务发现请求排队
return watch;
}
source/common/config/grpc_mux_impl.cc 315行
void GrpcMuxImpl::queueDiscoveryRequest(absl::string_view queue_item) {
if (!grpc_stream_.grpcStreamAvailable()) {//如果grpc流不可用
ENVOY_LOG(debug, "No stream available to queueDiscoveryRequest for {}", queue_item);
return; // Drop this request; the reconnect will enqueue a new one.
}
ApiState& api_state = apiStateFor(queue_item);//获取url api状态
if (api_state.paused()) {//如果状态是暂停
ENVOY_LOG(trace, "API {} paused during queueDiscoveryRequest(), setting pending.", queue_item);
api_state.pending_ = true;
return; // Drop this request; the unpause will enqueue a new one.
}
request_queue_->emplace(std::string(queue_item));//添加请求队列
drainRequests();//排水请求
}
source/common/config/grpc_mux_impl.cc 365行
void GrpcMuxImpl::drainRequests() {
while (!request_queue_->empty() && grpc_stream_.checkRateLimitAllowsDrain()) {//如果请求队列不为空,并且不限流
// Process the request, if rate limiting is not enabled at all or if it is under rate limit.
sendDiscoveryRequest(request_queue_->front());//发送发现请求
request_queue_->pop();//弹出请求队列头部
}
grpc_stream_.maybeUpdateQueueSizeStat(request_queue_->size());//更新grpc流队列大小统计
}
admin相关
source/server/configuration_impl.cc 227行
void InitialImpl::initAdminAccessLog(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server) {//初始化管理日志
const auto& admin = bootstrap.admin();//获取admin配置
for (const auto& access_log : admin.access_log()) {//遍历管理日志配置
AccessLog::InstanceSharedPtr current_access_log =
AccessLog::AccessLogFactory::fromProto(access_log, server.serverFactoryContext());//创建管理日志
admin_.access_logs_.emplace_back(current_access_log);//追加管理日志
}
if (!admin.access_log_path().empty()) {//如果管理日志路径不为空
Filesystem::FilePathAndType file_info{Filesystem::DestinationType::File,
admin.access_log_path()};//创建文件
admin_.access_logs_.emplace_back(new Extensions::AccessLoggers::File::FileAccessLog(
file_info, {}, Formatter::SubstitutionFormatUtils::defaultSubstitutionFormatter(),
server.accessLogManager()));//管理日志追加文件日志
}
}
source/server/admin/admin.cc 52行
void AdminImpl::startHttpListener(const std::list<AccessLog::InstanceSharedPtr>& access_logs,
const std::string& address_out_path,
Network::Address::InstanceConstSharedPtr address,
const Network::Socket::OptionsSharedPtr& socket_options,
Stats::ScopeSharedPtr&& listener_scope) {//开启管理监听
for (const auto& access_log : access_logs) {//遍历日志
access_logs_.emplace_back(access_log);//追加日志
}
null_overload_manager_.start();//开始null负载管理器
socket_ = std::make_shared<Network::TcpListenSocket>(address, socket_options, true);//创建tcp监听套接字
RELEASE_ASSERT(0 == socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_,
"listen() failed on admin listener");//启用监听
socket_factories_.emplace_back(std::make_unique<AdminListenSocketFactory>(socket_));//创建socket工厂并追加到socket工厂中
listener_ = std::make_unique<AdminListener>(*this, std::move(listener_scope));//创建admin监听器
ENVOY_LOG(info, "admin address: {}",
socket().connectionInfoProvider().localAddress()->asString());
if (!address_out_path.empty()) {//如果地址输出路径不为空
std::ofstream address_out_file(address_out_path);//创建输出文件流
if (!address_out_file) {//如果输出文件流打开失败
ENVOY_LOG(critical, "cannot open admin address output file {} for writing.",
address_out_path);
} else {
address_out_file << socket_->connectionInfoProvider().localAddress()->asString();//把套接字本地地址写入输出文件流
}
}
}
source/common/network/listen_socket_impl.cc 60行
NetworkListenSocket(const Address::InstanceConstSharedPtr& address,
const Network::Socket::OptionsSharedPtr& options, bool bind_to_port,
const SocketCreationOptions& creation_options = {})
: ListenSocketImpl(bind_to_port ? Network::ioHandleForAddr(T::type, address, creation_options)
: nullptr,
address) {//网络监听套接字
// Prebind is applied if the socket is bind to port.
if (bind_to_port) {//如果绑定端口
RELEASE_ASSERT(io_handle_->isOpen(), "");//io处理器打开
setPrebindSocketOptions();//设置预绑定套接字选项
setupSocket(options);//设置套接字
} else {
// If the tcp listener does not bind to port, we test that the ip family is supported.
if (auto ip = address->ip(); ip != nullptr) {//如果ip不为空
RELEASE_ASSERT(
Network::SocketInterfaceSingleton::get().ipFamilySupported(ip->ipv4() ? AF_INET
: AF_INET6),
fmt::format("Creating listen socket address {} but the address family is not supported",
address->asStringView()));//判断地址是否支持
}
}
}
source/common/network/listen_socket_impl.cc 42行
void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options) {//设置套接字
setListenSocketOptions(options);//设置套接字选项
bind(connection_info_provider_->localAddress());//绑定套接字
}
source/common/network/listen_socket_impl.cc 35行
void ListenSocketImpl::setListenSocketOptions(const Network::Socket::OptionsSharedPtr& options) {
if (!Network::Socket::applyOptions(options, *this,
envoy::config::core::v3::SocketOption::STATE_PREBIND)) {//设置套接字选项
throw SocketOptionException("ListenSocket: Setting socket options failed");
}
}
source/common/network/listen_socket_impl.cc 21行
Api::SysCallIntResult ListenSocketImpl::bind(Network::Address::InstanceConstSharedPtr address) {//绑定套接字
connection_info_provider_->setLocalAddress(address);//设置本地地址
const Api::SysCallIntResult result = SocketImpl::bind(connection_info_provider_->localAddress());//绑定套接字
if (SOCKET_FAILURE(result.return_value_)) {//绑定失败
close();
throw SocketBindException(fmt::format("cannot bind '{}': {}",
connection_info_provider_->localAddress()->asString(),
errorDetails(result.errno_)),
result.errno_);
}
return {0, 0};
}
source/common/network/socket_impl.cc 52行
Api::SysCallIntResult SocketImpl::bind(Network::Address::InstanceConstSharedPtr address) {//绑定套接字
Api::SysCallIntResult bind_result;
if (address->type() == Address::Type::Pipe) {//如果是管道地址
const Address::Pipe* pipe = address->pipe();//获取地址
const auto* pipe_sa = reinterpret_cast<const sockaddr_un*>(address->sockAddr());//获取地址
bool abstract_namespace = address->pipe()->abstractNamespace();
if (!abstract_namespace) {
// Try to unlink an existing filesystem object at the requested path. Ignore
// errors -- it's fine if the path doesn't exist, and if it exists but can't
// be unlinked then `::bind()` will generate a reasonable errno.
unlink(pipe_sa->sun_path);//删除路径
}
// Not storing a reference to syscalls singleton because of unit test mocks
bind_result = io_handle_->bind(address);//绑定地址
if (pipe->mode() != 0 && !abstract_namespace && bind_result.return_value_ == 0) {
auto set_permissions = Api::OsSysCallsSingleton::get().chmod(pipe_sa->sun_path, pipe->mode());//设置权限
if (set_permissions.return_value_ != 0) {//设置权限失败
throw EnvoyException(fmt::format("Failed to create socket with mode {}: {}",
std::to_string(pipe->mode()),
errorDetails(set_permissions.errno_)));
}
}
return bind_result;
}
bind_result = io_handle_->bind(address);//绑定地址
if (bind_result.return_value_ == 0 && address->ip()->port() == 0) {
connection_info_provider_->setLocalAddress(io_handle_->localAddress());
}
return bind_result;
}
source/common/network/io_socket_handler_impl.cc 460行
Api::SysCallIntResult IoSocketHandleImpl::bind(Address::InstanceConstSharedPtr address) {
return Api::OsSysCallsSingleton::get().bind(fd_, address->sockAddr(), address->sockAddrLen());//绑定地址
}
source/common/network/io_socket_handler_impl.cc 464行
Api::SysCallIntResult IoSocketHandleImpl::listen(int backlog) {//设置监听
return Api::OsSysCallsSingleton::get().listen(fd_, backlog);
}
source/server/admin.cc 399行
void AdminImpl::addListenerToHandler(Network::ConnectionHandler* handler) {
if (listener_) {
handler->addListener(absl::nullopt, *listener_, server_.runtime());//把监听器添加到连接处理器中
}
}
source/server/connection_handler_impl.cc 30行
void ConnectionHandlerImpl::addListener(absl::optional<uint64_t> overridden_listener,
Network::ListenerConfig& config, Runtime::Loader& runtime) {//添加监听器
if (overridden_listener.has_value()) {
ActiveListenerDetailsOptRef listener_detail =
findActiveListenerByTag(overridden_listener.value());//通过标签获取活跃监听器
ASSERT(listener_detail.has_value());
listener_detail->get().invokeListenerMethod(
[&config](Network::ConnectionHandler::ActiveListener& listener) {
listener.updateListenerConfig(config);
});//调用监听器方法
return;
}
auto details = std::make_unique<ActiveListenerDetails>();//创建活跃监听器详情
if (config.internalListenerConfig().has_value()) {//如果有内部监听器配置
// Ensure the this ConnectionHandlerImpl link to the thread local registry. Ideally this step
// should be done only once. However, an extra phase and interface is overkill.
Network::InternalListenerRegistry& internal_listener_registry =
config.internalListenerConfig()->internalListenerRegistry();//获取内部监听器注册
Network::LocalInternalListenerRegistry* local_registry =
internal_listener_registry.getLocalRegistry();//获取本地内部监听器注册
RELEASE_ASSERT(local_registry != nullptr, "Failed to get local internal listener registry.");
local_registry->setInternalListenerManager(*this);//本地注册设置内部监听器管理器
if (overridden_listener.has_value()) {
if (auto iter = listener_map_by_tag_.find(overridden_listener.value());
iter != listener_map_by_tag_.end()) {//如果能通过tag找到监听器
iter->second->invokeListenerMethod(
[&config](Network::ConnectionHandler::ActiveListener& listener) {
listener.updateListenerConfig(config);
});//调用监听器方法
return;
}
IS_ENVOY_BUG("unexpected");
}
auto internal_listener =
local_registry->createActiveInternalListener(*this, config, dispatcher());//通过本地注册创建活跃内部监听器
// TODO(soulxu): support multiple internal addresses in listener in the future.
ASSERT(config.listenSocketFactories().size() == 1);
details->addActiveListener(config, config.listenSocketFactories()[0]->localAddress(),
listener_reject_fraction_, disable_listeners_,
std::move(internal_listener));//添加活跃监听器
} else if (config.listenSocketFactories()[0]->socketType() == Network::Socket::Type::Stream) {//获取配置的套接字类型是流
for (auto& socket_factory : config.listenSocketFactories()) {//遍历配置的监听器套接字工厂
auto address = socket_factory->localAddress();//获取工厂的本地地址
// worker_index_ doesn't have a value on the main thread for the admin server.
details->addActiveListener(
config, address, listener_reject_fraction_, disable_listeners_,
std::make_unique<ActiveTcpListener>(
*this, config, runtime,
socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0),
address, config.connectionBalancer(*address)));//添加活跃监听器
}
} else {
ASSERT(config.udpListenerConfig().has_value(), "UDP listener factory is not initialized.");
ASSERT(worker_index_.has_value());
for (auto& socket_factory : config.listenSocketFactories()) {//遍历配置监听器套接字工厂
auto address = socket_factory->localAddress();//获取工厂本地地址
details->addActiveListener(
config, address, listener_reject_fraction_, disable_listeners_,
config.udpListenerConfig()->listenerFactory().createActiveUdpListener(
runtime, *worker_index_, *this, socket_factory->getListenSocket(*worker_index_),
dispatcher_, config));//添加活跃监听器
}
}
ASSERT(!listener_map_by_tag_.contains(config.listenerTag()));
for (const auto& per_address_details : details->per_address_details_list_) {//遍历监听器详情的每地址详情列表
// This map only stores the new listener.
if (absl::holds_alternative<std::reference_wrapper<ActiveTcpListener>>(
per_address_details->typed_listener_)) {
tcp_listener_map_by_address_.insert_or_assign(per_address_details->address_->asStringView(),
per_address_details);//通过地址添加tcp监听器
auto& address = per_address_details->address_;//获取地址
// If the address is Ipv6 and isn't v6only, parse out the ipv4 compatible address from the
// Ipv6 address and put an item to the map. Then this allows the `getBalancedHandlerByAddress`
// can match the Ipv4 request to Ipv4-mapped address also.
if (address->type() == Network::Address::Type::Ip &&
address->ip()->version() == Network::Address::IpVersion::v6 &&
!address->ip()->ipv6()->v6only()) {//地址类型是ip,并且地址类型是ipv6,并且不是仅ipv6
if (address->ip()->isAnyAddress()) {//如果是任意地址
// Since both "::" with ipv4_compat and "0.0.0.0" can be supported.
// Only override the listener when this is an update of the existing listener by
// checking the address, this ensures the Ipv4 address listener won't be override
// by the listener which has the same IPv4-mapped address.
auto ipv4_any_address = Network::Address::Ipv4Instance(address->ip()->port()).asString();//获取ipv4地址
auto ipv4_any_listener = tcp_listener_map_by_address_.find(ipv4_any_address);//通过ipv4地址获取tcp监听器
if (ipv4_any_listener == tcp_listener_map_by_address_.end() ||
*ipv4_any_listener->second->address_ == *address) {//没找到监听器,或者找到的监听器地址相同
tcp_listener_map_by_address_.insert_or_assign(ipv4_any_address, per_address_details);//通过地址追加tcp监听器
}
} else {
auto v4_compatible_addr = address->ip()->ipv6()->v4CompatibleAddress();//获取ipv6兼容的v4地址
// Remove this check when runtime flag
// `envoy.reloadable_features.strict_check_on_ipv4_compat` deprecated.
// If this isn't a valid Ipv4-mapped address, then do nothing.
if (v4_compatible_addr != nullptr) {//地址不为空
tcp_listener_map_by_address_.insert_or_assign(v4_compatible_addr->asStringView(),
per_address_details);//通过地址添加tcp监听器
}
}
}
} else if (absl::holds_alternative<std::reference_wrapper<Network::InternalListener>>(
per_address_details->typed_listener_)) {
internal_listener_map_by_address_.insert_or_assign(
per_address_details->address_->asStringView(), per_address_details);//通过地址添加内部监听器
}
}
listener_map_by_tag_.emplace(config.listenerTag(), std::move(details));//通过标签添加监听器
}
source/server/connection_handler_impl.cc 306行
ConnectionHandlerImpl::ActiveListenerDetailsOptRef
ConnectionHandlerImpl::findActiveListenerByTag(uint64_t listener_tag) {
if (auto iter = listener_map_by_tag_.find(listener_tag); iter != listener_map_by_tag_.end()) {
return *iter->second;
}
return absl::nullopt;
}
source/server/connection_handler_impl.cc 102行
void invokeListenerMethod(ListenerMethodFn fn) {
std::for_each(per_address_details_list_.begin(), per_address_details_list_.end(),
[&fn](std::shared_ptr<PerAddressActiveListenerDetails>& details) {
fn(*details->listener_);
});
}
source/server/connection_handler_impl.cc 112行
template <class ActiveListener>
void addActiveListener(Network::ListenerConfig& config,
const Network::Address::InstanceConstSharedPtr& address,
UnitFloat& listener_reject_fraction, bool disable_listeners,
ActiveListener&& listener) {//添加活跃监听器
auto per_address_details = std::make_shared<PerAddressActiveListenerDetails>();//每地址活跃监听器详情
per_address_details->typed_listener_ = *listener;//设置typed监听器
per_address_details->listener_ = std::move(listener);//设置监听器
per_address_details->address_ = address;//设置地址
if (disable_listeners) {//禁用监听器
per_address_details->listener_->pauseListening();//暂停监听
}
if (auto* listener = per_address_details->listener_->listener(); listener != nullptr) {//监听
listener->setRejectFraction(listener_reject_fraction);//设置监听器拒绝分数
}
per_address_details->listener_tag_ = config.listenerTag();//设置监听标签
per_address_details_list_.emplace_back(per_address_details);//往列表里追加
}
source/server/ssl_context_manager.cc 53行
Ssl::ContextManagerPtr createContextManager(const std::string& factory_name,
TimeSource& time_source) {//创建上下文管理器
Ssl::ContextManagerFactory* factory =
Registry::FactoryRegistry<Ssl::ContextManagerFactory>::getFactory(factory_name);//获取工厂
if (factory != nullptr) {
return factory->createContextManager(time_source);//创建上下文管理器
}
return std::make_unique<SslContextManagerNoTlsStub>();//创建上下文管理器
}
dns
source/common/network/dns_resolver/dns_factory_util.h 130行
template <class ConfigType>
Network::DnsResolverFactory& createDnsResolverFactoryFromProto(
const ConfigType& config,
envoy::config::core::v3::TypedExtensionConfig& typed_dns_resolver_config) {//创建dns解析器工厂
ASSERT_IS_MAIN_OR_TEST_THREAD();
typed_dns_resolver_config = makeDnsResolverConfig(config);//获取dns解析配置
return createDnsResolverFactoryFromTypedConfig(typed_dns_resolver_config);//创建dns解析器工厂
}
source/common/network/dns_resolver/dns_factory_util.h 90行
envoy::config::core::v3::TypedExtensionConfig makeDnsResolverConfig(const ConfigType& config) {
envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
// typed_dns_resolver_config takes precedence
if (checkTypedDnsResolverConfigExist(config, typed_dns_resolver_config)) {//判断dns解析配置是否存在
return typed_dns_resolver_config;
}
// If use apple API for DNS lookups, create an AppleDnsResolverConfig typed config.
if (checkUseAppleApiForDnsLookups(typed_dns_resolver_config)) {//检查使用apple api 为dns查询
return typed_dns_resolver_config;
}
// If dns_resolution_config exits, create a CaresDnsResolverConfig typed config based on it.
if (checkDnsResolutionConfigExist(config, typed_dns_resolver_config)) {//判断dns resolution配置是否存在
return typed_dns_resolver_config;
}
// Handle legacy DNS resolver fields for backward compatibility.
// Different config type has different fields to copy.
handleLegacyDnsResolverData(config, typed_dns_resolver_config);//处理遗留配置
return typed_dns_resolver_config;
}
source/common/network/dns_resolver/dns_factory_util.h 34行
template <class ConfigType>
bool checkTypedDnsResolverConfigExist(
const ConfigType& config,
envoy::config::core::v3::TypedExtensionConfig& typed_dns_resolver_config) {
if (config.has_typed_dns_resolver_config()) {//如果配置中包含typed配置
typed_dns_resolver_config.MergeFrom(config.typed_dns_resolver_config());//合并配置
return true;
}
return false;
}
source/common/network/dns_resolver/dns_factory_util.cc 35行
bool checkUseAppleApiForDnsLookups(
envoy::config::core::v3::TypedExtensionConfig& typed_dns_resolver_config) {
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_apple_api_for_dns_lookups")) {//判断特性是否启用
if (Config::Utility::getAndCheckFactoryByName<Network::DnsResolverFactory>(
std::string(AppleDnsResolver), true) != nullptr) {//获取工厂
makeDefaultAppleDnsResolverConfig(typed_dns_resolver_config);//创建默认apple dns解析配置
ENVOY_LOG_MISC(debug, "create Apple DNS resolver type: {} in MacOS.",
typed_dns_resolver_config.name());
return true;
}
#ifdef __APPLE__
RELEASE_ASSERT(false,
"In MacOS, if run-time flag 'use_apple_api_for_dns_lookups' is enabled, "
"but the envoy.network.dns_resolver.apple extension is not included in Envoy "
"build file. This is wrong. Abort Envoy.");
#endif
}
return false;
}
source/common/network/dns_resolver/dns_factory_util.h 47行
template <class ConfigType>
bool checkDnsResolutionConfigExist(
const ConfigType& config,
envoy::config::core::v3::TypedExtensionConfig& typed_dns_resolver_config) {
if (config.has_dns_resolution_config()) {//如果配置中有dns resolution配置
envoy::extensions::network::dns_resolver::cares::v3::CaresDnsResolverConfig cares;
if (!config.dns_resolution_config().resolvers().empty()) {//如果解析器不为空
cares.mutable_resolvers()->MergeFrom(config.dns_resolution_config().resolvers());//合并解析器
}
cares.mutable_dns_resolver_options()->MergeFrom(
config.dns_resolution_config().dns_resolver_options());//合并dns解析选项
typed_dns_resolver_config.mutable_typed_config()->PackFrom(cares);
typed_dns_resolver_config.set_name(std::string(CaresDnsResolver));
return true;
}
return false;
}
source/common/network/dns_resolver/dns_factory_util.h 67行
template <class ConfigType>
void handleLegacyDnsResolverData(
const ConfigType& config,
envoy::config::core::v3::TypedExtensionConfig& typed_dns_resolver_config) {
envoy::extensions::network::dns_resolver::cares::v3::CaresDnsResolverConfig cares;
cares.mutable_dns_resolver_options()->set_use_tcp_for_dns_lookups(
config.use_tcp_for_dns_lookups());//设置use tcp选项
typed_dns_resolver_config.mutable_typed_config()->PackFrom(cares);
typed_dns_resolver_config.set_name(std::string(CaresDnsResolver));
}
source/common/network/dns_resolver/dns_factory_util.cc 405行
DnsResolverSharedPtr createDnsResolver(Event::Dispatcher& dispatcher, Api::Api& api,
const envoy::config::core::v3::TypedExtensionConfig&
typed_dns_resolver_config) const override {
ASSERT(dispatcher.isThreadSafe());
envoy::extensions::network::dns_resolver::apple::v3::AppleDnsResolverConfig apple;
Envoy::MessageUtil::unpackTo(typed_dns_resolver_config.typed_config(), apple);
return std::make_shared<Network::AppleDnsResolverImpl>(apple, dispatcher, api.rootScope());
}
clusterManager
source/server/configuration_impl.cc 78行
void MainImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server,
Upstream::ClusterManagerFactory& cluster_manager_factory) {
// In order to support dynamic configuration of tracing providers,
// a former server-wide HttpTracer singleton has been replaced by
// an HttpTracer instance per "envoy.filters.network.http_connection_manager" filter.
// Tracing configuration as part of bootstrap config is still supported,
// however, it's become mandatory to process it prior to static Listeners.
// Otherwise, static Listeners will be configured in assumption that
// tracing configuration is missing from the bootstrap config.
initializeTracers(bootstrap.tracing(), server);//初始化追踪器
const auto& secrets = bootstrap.static_resources().secrets();//获取静态secrets
ENVOY_LOG(info, "loading {} static secret(s)", secrets.size());
for (ssize_t i = 0; i < secrets.size(); i++) {//遍历secrets
ENVOY_LOG(debug, "static secret #{}: {}", i, secrets[i].name());
server.secretManager().addStaticSecret(secrets[i]);//把secret添加到secret管理器中
}
ENVOY_LOG(info, "loading {} cluster(s)", bootstrap.static_resources().clusters().size());
cluster_manager_ = cluster_manager_factory.clusterManagerFromProto(bootstrap);//创建集群管理器
const auto& listeners = bootstrap.static_resources().listeners();//获取静态监听器
ENVOY_LOG(info, "loading {} listener(s)", listeners.size());
for (ssize_t i = 0; i < listeners.size(); i++) {//遍历静态监听器
ENVOY_LOG(debug, "listener #{}:", i);
server.listenerManager().addOrUpdateListener(listeners[i], "", false);//把监听器添加到监听器管理器中
}
initializeWatchdogs(bootstrap, server);//初始化开门狗
initializeStatsConfig(bootstrap, server);//初始化统计配置
}
source/server/configuration_impl.cc 129行
void MainImpl::initializeTracers(const envoy::config::trace::v3::Tracing& configuration,
Instance& server) {
ENVOY_LOG(info, "loading tracing configuration");
// Default tracing configuration must be set prior to processing of static Listeners begins.
server.setDefaultTracingConfig(configuration);//设置默认追踪配置
if (!configuration.has_http()) {//如果没有http配置
return;
}
// Validating tracing configuration (minimally).
ENVOY_LOG(info, " validating default server-wide tracing driver: {}",
configuration.http().name());
// Now see if there is a factory that will accept the config.
auto& factory = Config::Utility::getAndCheckFactory<TracerFactory>(configuration.http());//获取工厂
ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
configuration.http(), server.messageValidationContext().staticValidationVisitor(), factory);//把工厂转换为配置
// Notice that the actual HttpTracer instance will be created on demand
// in the context of "envoy.filters.network.http_connection_manager" filter.
// The side effect of this is that provider-specific configuration
// is no longer validated in this step.
}
source/server/configuration_impl.cc 155行
void MainImpl::initializeWatchdogs(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server) {
if (bootstrap.has_watchdog() && bootstrap.has_watchdogs()) {//如果同时配了watchdog和watchdogs
throw EnvoyException("Only one of watchdog or watchdogs should be set!");
}
if (bootstrap.has_watchdog()) {//如果配置了watchdog
main_thread_watchdog_ = std::make_unique<WatchdogImpl>(bootstrap.watchdog(), server);//创建主开门狗
worker_watchdog_ = std::make_unique<WatchdogImpl>(bootstrap.watchdog(), server);//创建工作线程开门狗
} else {
main_thread_watchdog_ =
std::make_unique<WatchdogImpl>(bootstrap.watchdogs().main_thread_watchdog(), server);//创建主线程开门狗
worker_watchdog_ =
std::make_unique<WatchdogImpl>(bootstrap.watchdogs().worker_watchdog(), server);//创建工作线程开门狗
}
}
source/server/configuration_impl.cc 111行
void MainImpl::initializeStatsConfig(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
Instance& server) {
ENVOY_LOG(info, "loading stats configuration");
// stats_config_ should be set before populating the sinks so that it is available
// from the ServerFactoryContext when creating the stats sinks.
stats_config_ = std::make_unique<StatsConfigImpl>(bootstrap);//创建统计配置
for (const envoy::config::metrics::v3::StatsSink& sink_object : bootstrap.stats_sinks()) {//遍历统计槽
// Generate factory and translate stats sink custom config.
auto& factory = Config::Utility::getAndCheckFactory<StatsSinkFactory>(sink_object);//获取工厂
ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
sink_object, server.messageValidationContext().staticValidationVisitor(), factory);//把工厂转换为配置
stats_config_->addSink(factory.createStatsSink(*message, server.serverFactoryContext()));//添加槽
}
}
lds
source/server/listener_manager_impl.h 195行
void createLdsApi(const envoy::config::core::v3::ConfigSource& lds_config,
const xds::core::v3::ResourceLocator* lds_resources_locator) override {//创建lds api
ASSERT(lds_api_ == nullptr);
lds_api_ = factory_.createLdsApi(lds_config, lds_resources_locator);//创建lds api
}
source/server/listener_manager_impl.h 72行
LdsApiPtr createLdsApi(const envoy::config::core::v3::ConfigSource& lds_config,
const xds::core::v3::ResourceLocator* lds_resources_locator) override {
return std::make_unique<LdsApiImpl>(
lds_config, lds_resources_locator, server_.clusterManager(), server_.initManager(),
server_.stats(), server_.listenerManager(),
server_.messageValidationContext().dynamicValidationVisitor());
}
source/server/lds_api.cc 22行
LdsApiImpl::LdsApiImpl(const envoy::config::core::v3::ConfigSource& lds_config,
const xds::core::v3::ResourceLocator* lds_resources_locator,
Upstream::ClusterManager& cm, Init::Manager& init_manager,
Stats::Scope& scope, ListenerManager& lm,
ProtobufMessage::ValidationVisitor& validation_visitor)
: Envoy::Config::SubscriptionBase<envoy::config::listener::v3::Listener>(validation_visitor,
"name"),
listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm),
init_target_("LDS", [this]() { subscription_->start({}); }) {
const auto resource_name = getResourceName();//获取资源名称
if (lds_resources_locator == nullptr) {//如果资源locator为空
subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});//创建订阅
} else {
subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
*lds_resources_locator, lds_config, resource_name, *scope_, *this, resource_decoder_);//创建订阅
}
init_manager.add(init_target_);
}