<paste_deploy function="业务流程分析">
<keystone-paste.ini path="/etc/keystone/">
# 我们分析admin
[composite: admin]
use = egg:Pate#urlmap
/v2.0 = admin_api
/v3 = api_v3
/ = public_version_api
[pipeline:admin_api]
pipeline = sizelimit url_normalize request_id build_auth_context token_auth
admin_token_auth json_body ec2_extension s3_extension crud_extension admin_service
# 分析/v2.0 入口处理业务
1-[filter:sizelimit]
paste.filter_factory = oslo_middleware.sizelimit:RequestBodySizeLimiter.factory
<RequestBodySizeLimiter path="oslo_middle.sizelimit">
_opts = [
cfg.IntOpt('max_request_body_size',
default=114688, # 默认为112k
help='...',
deprecated_opts=_oldopts)
]
class RequestBodySizeLimiter(base.Middleware):
# 限制进入的多个请求body大小
def __init__(self, application, conf=None):
super(RequestBodySizeLimiter, self).__init__(application, conf)
<base.Middleware values="" path="oslo_middleware.base">
# 最后会调用其factory方法
class Middleware(object):
# factory会调用子类的__call__方法,在调用前需要初始化
@classmethod
def factory(cls, global_conf, **local_conf):
conf = global_conf.copy() if global_conf else {}
conf.update(local_conf)
def middleware_filter(app):
return cls(app, conf)
return middleware_filter
def __init__(self, application, conf=None):
if isinstance(conf, cfg.COnfigOPts):
self.conf = []
self.oslo_conf = conf
else:
self.conf = conf or []
if 'oslo_config_project' in self.conf:
if 'oslo_config_file' in self.conf:
default_config_files = [self.conf['oslo_config_fil']]
else:
default_config_files = None
self.oslo_conf = cfg.ConfigOpts()
self.oslo_conf([], project=self.conf['oslo_config_project'],
default_config_files=default_config_files,
validate_default_values=True)
# 上面的调用 ConfigOpts实例的__call__
else:
# 直接调用cfg.CONF
self.oslo_conf = cfg.CONF
def _conf_get(self, key, group='oslo_middleware'):
if key in self.conf:
self.oslo_conf.set_override(key, self.conf[key], group=group,
enfore_type=True)
# 是配置有效
return getattr(getattr(self.oslo_conf, group), key)
@staticmethod
def process_request(req):
# 对每个请求进行处理
return None
@staticmethod
def process_response(response, request=None):
return response
@webob.dec.wsgif
def __call__(self, req):
response = self.process_request(req)
if response:
return response
response = req.get_response(self.application)
# 这个get_response(self.application)不知道是哪里的
(args, varargs, varkw, defaults) = getargspec(self.process_response)
# getargsepc 是inspec库的函数,用来检测函数的参数
# ArgSpec(args=['response', 'request'], varargs=None, keywords=None, defaults=(None,))
if 'request' in args:
return self.process_response(response, request=req)
return self.process_response(response)
</base.Middleware>
# 继承的是base.Middleware
self.oslo_conf.register_opts(_opts, group='oslo_middleware')
# 将上面的选项注册进ConfigOpts实例中
@webob.dec.wsgif
# 这个装饰器会将func转变为app
def __call__(self, req):
max_size = self._conf_get('max_request_body_size')
if (req.content_length is not None and req.content_length > max_size):
msg = _('Request is too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
if req.content_length is None and req.is_body_readable:
limiter = LimitingRead(req.body_file, max_size)
<LimitingRead values="(req.body_file, max_size)" path="." function="对单个请求大小进行限制">
class LimitingReader(object):
def __init__(self, data, limit):
self.data = data
self.limit = limit
self.bytes_read = 0
def __iter__(self):
for chunk in self.data:
self.bytes_read += len(chunk)
if self.bytes_read > self.limit:
msg = _('Request too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
else:
yield chunk
def read(self, i=None):
# mod_wsgi 与 eventlet不同,所以不能简单的提高一个read
if i is None:
result = self.data.read()
else:
result = self.data.read(i)
if self.bytes_read > self.limit:
# read 加上limit判断
msg = _('Request too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
return result
</LimitingRead>
req.body_file = limiter
return self.application
1.1流程->RequestBodySizeLimiter初始化后调用父类Middleware的factory函数继续调用RequestBodySizeLimter的__call__
</RequestBodySizeLimiter>
2-[filter:url_normalize]
paste.filter_factory = keystone.middleware:NormalizingFilter.factory
<NormalizingFIlter.factory values="" path="keystone.middleware.core">
class NormalizingFilter(wsgi.Middleware):
<wsgi.Middleware path="keystone.common.wsgi.core">
class Middleware(Application):
<Application path=".">
</Application>
# 基础WSGI 中间件
# 初始化需要app,然后才调用factory,只会调用__call__
@classmethod
def factory(cls, global_config, **local_config):
# 配置文件如下
# [filter:analytics]
# redis_host = 127.0.0.1
# paste.filter_factory = keystone.analytics:Analytics.factory
# 相当于如下,后返回call调用
# import keystone.analytics
# keystone.analytics.Analytics(app, redis_host='127.0.0.1')
def _factory(app):
conf = global_config.copy()
conf.update(local)
return cls(app, **local_config)
return _factory
def __init__(self, application)
super(Middleware, self).__init__()
self.application = application
def process_requires(self, request):
return None
def process_response(self, request, response):
return response
@webob.dec.wsgify()
# 初始化后调用wsgi().__call__
def __call__(self, request):
try:
response = self.process_request(self.application)
if response:
return response
response = request.get_response(self.application)
# 通过get_response 可以返回一个webob response object
return self.process_response(request, response)
except
....
# 这个__call__调用下面的process_request
</wsgi.Middleware>
# 中间件过滤处理url正常化
def process_request(request):
# 正常化 url
# 移除多余反斜杠
if (len(request.environ['PATH_INFO'])>1 and
request.environ['PATH_INFO'][-1] == '/'):
request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1]
# 重写path,将其置为root
elif not request.environ['PATH_INFO']:
request.environ['PATH_INFO'] = '/'
</NormalizingFIlter.factory>
3-[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory
<RequestId.factory path="oslo_middleware.request_id">
ENV_REQUEST_ID = 'openstack.request_id'
HTTP_RESP_HEADER_ID = 'x-openstack-request-id'
class RequestId(base.Middleware):
# 中间件确保request ID
# 确保分配ID对应每个API请求,将其置为request env,request ID同样被添加到API response
@webob.dec.wsgify
def __call__(self, req):
req_id = context.generate_request_id()
<get_response_id path="oslo_context.context.generate_request_id">
def generate_request_id():
return b'req-' + str(uuid.uuid4()).encode('ascii')
</get_response_id>
req.environ[ENV_REQUEST_ID] = req.id
response = req.get_response(self.application)
if HTTP_RESP_HEADER_REQUEST_ID not in response.headers:
response.headers.add(HTTP_RESP_HEADER_REQUEST_ID, req_id)
return response
</RequestId.factory>
4-[filter:build_auth_context]
paste.filter_factory = keystone.middleware:AuthContextMiddleware.factory
<AuthContextMiddleware.factory path="keystone.middleware">
class AuthContextMiddleware(wsgi.Middleware):
# 从 request auth token 中构建 authentication context
def _build_auth_context(self, request):
token_id = request.headers.get(AUTH_TOKEN_HEADER).strip()
if token_id == CONF.admin_token:
return {}
context = {'token_id': token_id}
context['environment'] = request.environ
try:
token_ref = token_model.KeystoneToken(
token_id=token_id,
token_data=self.token_provider_api.validate_token(token_id) # Application 有token_provider_api
)
<token_model.KeystoneToken values="(token_id, token_data)" path="keystone.model.token_model">
class KeystoneToken(dict):
def __init__(self, token_id, token_data):
self.token_data = token_data
if 'access' in token_access:
super(KeystoneToken, self).__init__(**token_data['access'])
elif 'token' in token_data and 'methods' in token_data['token']:
super(KeystoneToken, self).__init__(**token_data['token'])
else:
raise exception.UnsupportedTokenVersionException()
self.token_id = token_id
self.short_id = cms.cms_has_token(token_id, mode=CONF.token.has_algorithm) # mode = 'md5' return hash code
if self.project_scoped and self.domain_scoped:
raise exception.UnexpectedError(_('Found invalid token: scoped to '
'both project and domain'))
</token_model.KeystoneToken>
wsgi.validate_token_bind(context, token_ref)
<wsgi.validate_token_bind values="(context, token_ref)" path="keystone.common.wsgi">
CONTEXT_ENV = 'openstack.context'
PARAMS_ENV = 'openstack.params'
JSON_ENCODE_CONTENT_TYPES = set(['application/json', 'application/json-home'])
def validate_token_bind(context, token_ref):
bind_mode = CONF.token.enforce_token_bind # permissive
if not isinstance(token_ref, token_model.KeystoneToken):
raise exception.UnexpectedError(_('token reference must be a KeystoneToken type, got: %s') % type(token_ref))
bind = token_ref.bind
permissive = bind_mode in ('permissive', 'strict')
name = None if permissive or bind_name == 'required' else bind_mode
if not bind:
if permissive:
# 需要绑定而且permissive不为空
return
else:
LOG(_LI('No bind information present in token'))
raise exception.Unauthorized()
if name and name not in bind:
raise exception.Unauthorized()
for bind_type, identifier in bind.items():
if bind_type == 'kerberos':
if not (context['environment'].get('AUTH_TYPE', '').lower()
== 'negotiate'):
LOG.info(_LI("Kerberos credentials required and not present"))
raise exception.Unauthorized()
if not context['environment'].get('REMOTE_USER') == identifier:
LOG.info(_LI("Kerberos credentials do not match "
"those in bind"))
raise exception.Unauthorized()
LOG.info(_LI("Kerberos bind authentication successful"))
elif bind_mode == 'permissive':
LOG.debug(("Ignoring unknown bind for permissive mode: "
"{%(bind_type)s: %(identifier)s}"),
{'bind_type': bind_type, 'identifier': identifier})
else:
LOG.info(_LI("Couldn't verify unknown bind: "
"{%(bind_type)s: %(identifier)s}"),
{'bind_type': bind_type, 'identifier': identifier})
raise exception.Unauthorized()
</wsgi.validate_token_bind>
return authorization.token_to_auth_context(token_ref)
<authorization.token_to_auth_context values="token_ref" path="keystone.common.authorization">
AUTH_CONTEXT_ENV = 'KEYSTONE_AUTH_CONTEXT'
def token_to_auth_context(token):
if not isinstance(token, token_model.KeystoneToken):
raise exception.UnexpectedError(_('token refernce must be a KeystoneToken type, got: %s') % type(token))
auth_context = {'token': token, 'is_delegated_auth': False}
try:
auth_context['user_id'] = token.user_id
except KeyError:
LOG.warning(_LW('RBAC:Invalid user data in token'))
raise exception.Unauthorized()
if token.project_scoped:
auth_context['project_id'] = token.project_id
elif token.domain_scoped:
auth_context['domain_id'] = token.domain_id
auth_context['domain_name'] = token.domain_name
else:
LOG.debug('RBAC: proceeding without project or domain scope')
if token.trust_scoped:
auth_context['is_delegated_auth'] = True
auth_context['trust_id'] = token.trust_id
auth_context['trustor_id'] = token.trustor_user_id
auth_conetxt['trustee_id'] = token.trustee_user_id
else:
auth_context['trust_id'] = None
auth_context['trustor_id'] = None
auth_context['trustee_id'] = None
roles = token.role_names
if roles:
auth_context['roles'] = roles
if token.oauth_scoped:
auth_context['is_delegated_auth'] = True
auth_context['consumer_id'] = token.oauth_consumer_id
auth_context['access_token_id'] = token.oauth_access_token_id
if token.is_federated_user:
auth_context['group_ids'] = token.federation_group_ids
return auth_context
</authorization.token_to_auth_context>
except exception.TokenNotFound:
LOG.warning(_LW('RBAC:Invalid token'))
raise exception.Unauthorized
def process_request(self, request):
if AUTH_TOKEN_HEADER not in request.headers:
LOG.debug(('Auth token not in the request header.'
'will not build auth context'))
return
if authorization.AUTH_CONTEXT_ENV in request.environ:
# 已经存在,返回
return
auth_context = self._build_auth_context(request)
request.environ[authorization.AUTH_CONTEXT_ENV] = auth_context
# request.environ['KEYSTONE_AUTH_CONTEXT'] = auth_context
</AuthContextMiddleware.factory>
5-[filter:token_auth]
paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory
<TokenAuthMiddleware.factory path="keystone.middleware.core">
AUTH_TOKEN_HEADER = 'X-Auth-Token'
CONTEXT_ENV = 'openstack.context'
PARAMS_ENV = 'openstack.params'
SUBJECT_TOKEN_HEADER = 'X-Subject-Token'
class TokenAuthMIddleware(wsgi.Middleware):
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['token_id'] = token
if SUBJECT_TOKEN_HEADER in request.headers:
context['subject_token_id'] = request.headers[SUBJECT_TOKEN_HEADER]
request.environ[CONTEXT_ENV] = context
</TokenAuthMiddleware.factory>
6-[filter:admin_token_auth]
paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory
<AdminTokenAuthMIddleware.factory path="keystone.middleware.core">
class AdminAuthMiddleware(wsgi.Middleware):
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['is_admin'] = (token == CONF.admin_token)
request.environ[CONTEXT_ENV] = context
# 验证token == CONF.admin_token,置is_admin的布尔值
</AdminTokenAuthMIddleware.factory>
7-[filter:json_body]
paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory
<JsonBodyMiddleware.factory path="keystone.middleware.core">
class JsonBodyMiddleware(wsgi.Middleware):
# 允许方法参数以序列化json形式通过
def process_request(self, request):
params_json = request.body:
# 如果为空,早早结束
if not params_json:
return
# 识别context_type是否为json
if request.context_type not in ('application/json', ''):
e = exception.ValidationError(attribute='application/json',
target='Content-Type header')
return wsgi.render_exception(e, request=request)
params_parsed = {}
try:
params_parsed = jsonutils.loads(params_json)
except ValueError:
e = exception.ValidationError(attribute='valid JSON',
target='request body')
return wsgi.render_exception(e, request=request)
finally:
if not params_parsed:
params_parsed = {}
if not isinstance(params_parsed, dict):
e = exception.ValidationError(attribute='valid JSON object',
target='request body')
return wsgi.render_exception(e, request=request)
params = {}
for k, v in params_parsed.items():
if k in (self, 'context'):
continue
if k.startswith('_'):
continue
params[k] = v
request.environ[PARAMS_ENV] = params
# 将request.body json化 ,然后以字典形式 放到 request.environ['openstack.params']
</JsonBodyMiddleware.factory>
8-[filter:ec2_extension]
paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory
<EC2Extension.factory path="keystone.contrib/ec2/routers">
build_resource_relation = functools.partial(
json_home.build_v3_extension_resource_relation, extension_name='OS-EC2',
extension_version='1.0')
build_parameter_relation = functools.partial(
json_home.build_v3_extension_parameter_relation.extension_name='OS-EC2',
extension_version='1.0')
class EC2Extension(wsgi.ExtensionRouter):
<wsgi.ExtensionRouter values="" path="keystone.common.wsgi">
class ExtensionRouter(Router):
<Router path="keystone.common.wsgi">
class Router(object):
def __init__(self, mapper):
# 为给定的routes.Mapper创建一个router
# 每个在Mapper里的router都需要指定一个controller,这就是要被调用的WSGI app
# action 就是controller里面的方法
self.map = mapper
self._router = routes.middleware.RouterMiddleware(self._dispatch, self.map)
@webob.dec.wsgify() # 下面就是调用的wsgfiy().__call__
def __call__(self, req):
@staticmethod
@webob.dec.wsgify()
def _dispatch(req):
# 将请求转发到匹配的controller
# self._router调用它, 匹配request,并将信息装入req.environ
match = req.environ['wsgiorg.routing_args'][1]
if not match:
msg = _('The resource could not be found.')
return render_exception(exception.NotFound(msg),
request=req,
user_local=best_match_language(req))
app = match['controller']
return app
return self._router
</Router>
def __init__(self, application, mapper=None):
if mapper is None:
mapper = routes.Mapper()
self.application = application
self.add_routes(mapper)
mapper.connect('/{path_info:.*}', controller=self.application)
super(ExtensionRouter, self).__init__(mapper)
def add_router(self, mapper)# 需要覆写
return pass
@classmethod
def factory(cls, global_config, **local_config):
def _factory(app):
conf = global_config.copy()
conf.update(local_config)
return cls(app, **local_config) # 调用类初始化
return _factory
</wsgi.ExtensionRouter>
def add_routes(self, mapper):
ec2_controller = controllers.EC2Controller()
<EC2Controller values="" path="keystone.contrib.ec2.controller">
@dependency.requires('policy_api', 'token_provider_api')
class EC2Controller(Ec2ControllerCommon, controller.V2Controller):
@controller.v2_deprecated
def authenticate(self, context, credentials=None, ec2Credentials=None):
(user_ref, tenant_ref, metadata_ref, role_ref, catalog_ref) =
self._authenticate(credentials=credentials,
ec2credentials=ec2credentials)
# 继承EC2ControllerCommon._authenticate
<Ec2ControllerCommon._authenticate path="">
def _authenticate(self, credentials=None, ec2credentials=None):
# credentials: ec2 签名字典
# ec2credentials : 丢弃的ec2 签名字典
if not credentials and ec2credentials: 只要ec2credentials
credentials = ec2credentials
if 'access' not in credentials:
raise exception.Unauthorized(message='EC2 signature not supplied.')
creds_ref = self._get_credentials(credentials['access']) # credentials['access'] 是ID
<EC2ControllerCommon._get_credentials values="(self, credential_id)" path="keystone.contrib.ec2.controller">
def _get_credentials(self, credentials):
# return: credentials ec2 credentials 字典
# utils 在 keystone.common
ec2_credentials_id = utils.hash_access_key(credential_id) # 对credential_id进行hash
creds = self.credential_api.get_credential(ec2_credential_id)
<credential_api.get_credential values="ec2_credential_id" path="keystone.credential.Credential">
self.credential_api 就是下面的类的实例
# Manager->Driver的过程
class Credential(credential.Driver):
def get_credential(self, credential_id):
session = sql.get_session()
return self._get_credential(session, credential_id).to_dict()
# self._get_credential(session, credential_id)返回的是CredentialModel
def _get_credential(self, session, credential_id):
ref = session.query(CredentialModel).get(credential_id)
# 查询表CredentialModel
if ref is None:
raise exception.CredentialNotFound(credential_id=credential_id)
return ref
</credential_api.get_credential>
if not creds:
raise exception.Unauthorized(message='EC2 access key not found.')
return self._convert_v3_to_ec2_credential(creds)
</EC2ControllerCommon._get_credentials>
self.check_signature(creds_ref, credentials)
tenant_ref = self.resource_api.get_project(creds_ref['tenant_id'])
user_ref = self.identity_api.get_user(creds_ref['user_id'])
metadata_ref = {}
metadata_ref[roles] = (
self.assignment_api.get_roles_for_user_and_project(user_ref['id'], tenant_ref['id'])
trust_id = creds_ref.get('trust_id')
if trust_id:
metadata_trf['trust_id'] = trust_id
metadata_ref['trustee_user_id'] = user_ref['id']
try:
self.identity_api.assert_user_enable(
user_id=user_ref['id'], user=user_ref)
self.resource_api.assert_domain_enabled(
domain_id=user_ref['domain_id'])
self.resource_api.assert_project_enabled(
project_id=tenant_ref['id'], project=tenant_ref)
except AssertionError as e:
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
roles = metadata_ref.get('roles', [])
if not roles:
raise exception.Unauthorized(message='User not valid for tenant.')
roles_ref = [self.role_api.get_role(role_id) for role_id in roles]
catalog_ref = self.catalog_api.get_catalog(user_ref['id'], tenant_ref['id'])
return user_ref, tenant_ref, metadata_ref, roles_ref, catalog_ref
</Ec2ControllerCommon._authenticate>
</EC2Controller>
# 生效
mapper.connect(
'/ec2tokens',
# /v2.0/ec2tokens url是组装还是就直接 /ec2tokens ?
controller=ec2_controller,
action='authenticate',
conditions=dict(method=['POST'])
)
# crub
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controllers=ec2_controller,
action='create_credential',
conditions=dict(method=['POST']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controllers=ec2_controller,
action='get_credential',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controllers=ec2_controller,
action='get_credential',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controllers=ec2_controller,
action='delete_credential',
conditions=dict(method=['DELETE']))
</EC2Extension.factory>
9-[filter:s3_extension]
paste.filter_factory = keystone.contrib.s3:S3Extension.factory
10-[filter:crud_extension]
paste.filter_factory = keystone.contrib.admin_crud:CrudExtension.factory
11-[filter:admin_service]
paste.app_factory = keystone.service:admin_app_factory
</keystone-paste.ini>
</paste_deploy>