<paste_deploy function="业务流程分析">
<keystone-paste.ini path="/etc/keystone/">
# 我们分析admin
[composite: admin]
use = egg:Pate#urlmap
/v2.0 = admin_api
/v3 = api_v3
/ = public_version_api
[pipeline:admin_api]
pipeline = sizelimit url_normalize request_id build_auth_context token_auth
admin_token_auth json_body ec2_extension s3_extension crud_extension admin_service
# 分析/v2.0 入口处理业务
1-[filter:sizelimit]
paste.filter_factory = oslo_middleware.sizelimit:RequestBodySizeLimiter.factory
<RequestBodySizeLimiter path="oslo_middle.sizelimit">
_opts = [
cfg.IntOpt('max_request_body_size',
default=114688, # 默认为112k
help='...',
deprecated_opts=_oldopts)
]
class RequestBodySizeLimiter(base.Middleware):
# 限制进入的多个请求body大小
def __init__(self, application, conf=None):
super(RequestBodySizeLimiter, self).__init__(application, conf)
<base.Middleware values="" path="oslo_middleware.base">
# 最后会调用其factory方法
class Middleware(object):
# factory会调用子类的__call__方法,在调用前需要初始化
@classmethod
def factory(cls, global_conf, **local_conf):
conf = global_conf.copy() if global_conf else {}
conf.update(local_conf)
def middleware_filter(app):
return cls(app, conf)
return middleware_filter
def __init__(self, application, conf=None):
if isinstance(conf, cfg.COnfigOPts):
self.conf = []
self.oslo_conf = conf
else:
self.conf = conf or []
if 'oslo_config_project' in self.conf:
if 'oslo_config_file' in self.conf:
default_config_files = [self.conf['oslo_config_fil']]
else:
default_config_files = None
self.oslo_conf = cfg.ConfigOpts()
self.oslo_conf([], project=self.conf['oslo_config_project'],
default_config_files=default_config_files,
validate_default_values=True)
# 上面的调用 ConfigOpts实例的__call__
else:
# 直接调用cfg.CONF
self.oslo_conf = cfg.CONF
def _conf_get(self, key, group='oslo_middleware'):
if key in self.conf:
self.oslo_conf.set_override(key, self.conf[key], group=group,
enfore_type=True)
# 是配置有效
return getattr(getattr(self.oslo_conf, group), key)
@staticmethod
def process_request(req):
# 对每个请求进行处理
return None
@staticmethod
def process_response(response, request=None):
return response
@webob.dec.wsgif
def __call__(self, req):
response = self.process_request(req)
if response:
return response
response = req.get_response(self.application)
# 这个get_response(self.application)不知道是哪里的
(args, varargs, varkw, defaults) = getargspec(self.process_response)
# getargsepc 是inspec库的函数,用来检测函数的参数
# ArgSpec(args=['response', 'request'], varargs=None, keywords=None, defaults=(None,))
if 'request' in args:
return self.process_response(response, request=req)
return self.process_response(response)
</base.Middleware>
# 继承的是base.Middleware
self.oslo_conf.register_opts(_opts, group='oslo_middleware')
# 将上面的选项注册进ConfigOpts实例中
@webob.dec.wsgif
# 这个装饰器会将func转变为app
def __call__(self, req):
max_size = self._conf_get('max_request_body_size')
if (req.content_length is not None and req.content_length > max_size):
msg = _('Request is too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
if req.content_length is None and req.is_body_readable:
limiter = LimitingRead(req.body_file, max_size)
<LimitingRead values="(req.body_file, max_size)" path="." function="对单个请求大小进行限制">
class LimitingReader(object):
def __init__(self, data, limit):
self.data = data
self.limit = limit
self.bytes_read = 0
def __iter__(self):
for chunk in self.data:
self.bytes_read += len(chunk)
if self.bytes_read > self.limit:
msg = _('Request too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
else:
yield chunk
def read(self, i=None):
# mod_wsgi 与 eventlet不同,所以不能简单的提高一个read
if i is None:
result = self.data.read()
else:
result = self.data.read(i)
if self.bytes_read > self.limit:
# read 加上limit判断
msg = _('Request too large')
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
return result
</LimitingRead>
req.body_file = limiter
return self.application
1.1流程->RequestBodySizeLimiter初始化后调用父类Middleware的factory函数继续调用RequestBodySizeLimter的__call__
</RequestBodySizeLimiter>
2-[filter:url_normalize]
paste.filter_factory = keystone.middleware:NormalizingFilter.factory
<NormalizingFIlter.factory values="" path="keystone.middleware.core">
class NormalizingFilter(wsgi.Middleware):
<wsgi.Middleware path="keystone.common.wsgi.core">
class Middleware(Application):
<Application path=".">
</Application>
# 基础WSGI 中间件
# 初始化需要app,然后才调用factory,只会调用__call__
@classmethod
def factory(cls, global_config, **local_config):
# 配置文件如下
# [filter:analytics]
# redis_host = 127.0.0.1
# paste.filter_factory = keystone.analytics:Analytics.factory
# 相当于如下,后返回call调用
# import keystone.analytics
# keystone.analytics.Analytics(app, redis_host='127.0.0.1')
def _factory(app):
conf = global_config.copy()
conf.update(local)
return cls(app, **local_config)
return _factory
def __init__(self, application)
super(Middleware, self).__init__()
self.application = application
def process_requires(self, request):
return None
def process_response(self, request, response):
return response
@webob.dec.wsgify()
# 初始化后调用wsgi().__call__
def __call__(self, request):
try:
response = self.process_request(self.application)
if response:
return response
response = request.get_response(self.application)
# 通过get_response 可以返回一个webob response object
return self.process_response(request, response)
except
....
# 这个__call__调用下面的process_request
</wsgi.Middleware>
# 中间件过滤处理url正常化
def process_request(request):
# 正常化 url
# 移除多余反斜杠
if (len(request.environ['PATH_INFO'])>1 and
request.environ['PATH_INFO'][-1] == '/'):
request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1]
# 重写path,将其置为root
elif not request.environ['PATH_INFO']:
request.environ['PATH_INFO'] = '/'
</NormalizingFIlter.factory>
3-[filter:request_id]
paste.filter_factory = oslo_middleware:RequestId.factory
<RequestId.factory path="oslo_middleware.request_id">
ENV_REQUEST_ID = 'openstack.request_id'
HTTP_RESP_HEADER_ID = 'x-openstack-request-id'
class RequestId(base.Middleware):
# 中间件确保request ID
# 确保分配ID对应每个API请求,将其置为request env,request ID同样被添加到API response
@webob.dec.wsgify
def __call__(self, req):
req_id = context.generate_request_id()
<get_response_id path="oslo_context.context.generate_request_id">
def generate_request_id():
return b'req-' + str(uuid.uuid4()).encode('ascii')
</get_response_id>
req.environ[ENV_REQUEST_ID] = req.id
response = req.get_response(self.application)
if HTTP_RESP_HEADER_REQUEST_ID not in response.headers:
response.headers.add(HTTP_RESP_HEADER_REQUEST_ID, req_id)
return response
</RequestId.factory>
4-[filter:build_auth_context]
paste.filter_factory = keystone.middleware:AuthContextMiddleware.factory
<AuthContextMiddleware.factory path="keystone.middleware">
class AuthContextMiddleware(wsgi.Middleware):
# 从 request auth token 中构建 authentication context
def _build_auth_context(self, request):
token_id = request.headers.get(AUTH_TOKEN_HEADER).strip()
if token_id == CONF.admin_token:
return {}
context = {'token_id': token_id}
context['environment'] = request.environ
try:
token_ref = token_model.KeystoneToken(
token_id=token_id,
token_data=self.token_provider_api.validate_token(token_id) # Application 有token_provider_api
)
<token_model.KeystoneToken values="(token_id, token_data)" path="keystone.model.token_model">
class KeystoneToken(dict):
def __init__(self, token_id, token_data):
self.token_data = token_data
if 'access' in token_access:
super(KeystoneToken, self).__init__(**token_data['access'])
elif 'token' in token_data and 'methods' in token_data['token']:
super(KeystoneToken, self).__init__(**token_data['token'])
else:
raise exception.UnsupportedTokenVersionException()
self.token_id = token_id
self.short_id = cms.cms_has_token(token_id, mode=CONF.token.has_algorithm) # mode = 'md5' return hash code
if self.project_scoped and self.domain_scoped:
raise exception.UnexpectedError(_('Found invalid token: scoped to '
'both project and domain'))
</token_model.KeystoneToken>
wsgi.validate_token_bind(context, token_ref)
<wsgi.validate_token_bind values="(context, token_ref)" path="keystone.common.wsgi">
CONTEXT_ENV = 'openstack.context'
PARAMS_ENV = 'openstack.params'
JSON_ENCODE_CONTENT_TYPES = set(['application/json', 'application/json-home'])
def validate_token_bind(context, token_ref):
bind_mode = CONF.token.enforce_token_bind # permissive
if not isinstance(token_ref, token_model.KeystoneToken):
raise exception.UnexpectedError(_('token reference must be a KeystoneToken type, got: %s') % type(token_ref))
bind = token_ref.bind
permissive = bind_mode in ('permissive', 'strict')
name = None if permissive or bind_name == 'required' else bind_mode
if not bind:
if permissive:
# 需要绑定而且permissive不为空
return
else:
LOG(_LI('No bind information present in token'))
raise exception.Unauthorized()
if name and name not in bind:
raise exception.Unauthorized()
for bind_type, identifier in bind.items():
if bind_type == 'kerberos':
if not (context['environment'].get('AUTH_TYPE', '').lower()
== 'negotiate'):
LOG.info(_LI("Kerberos credentials required and not present"))
raise exception.Unauthorized()
if not context['environment'].get('REMOTE_USER') == identifier:
LOG.info(_LI("Kerberos credentials do not match "
"those in bind"))
raise exception.Unauthorized()
LOG.info(_LI("Kerberos bind authentication successful"))
elif bind_mode == 'permissive':
LOG.debug(("Ignoring unknown bind for permissive mode: "
"{%(bind_type)s: %(identifier)s}"),
{'bind_type': bind_type, 'identifier': identifier})
else:
LOG.info(_LI("Couldn't verify unknown bind: "
"{%(bind_type)s: %(identifier)s}"),
{'bind_type': bind_type, 'identifier': identifier})
raise exception.Unauthorized()
</wsgi.validate_token_bind>
return authorization.token_to_auth_context(token_ref)
<authorization.token_to_auth_context values="token_ref" path="keystone.common.authorization">
AUTH_CONTEXT_ENV = 'KEYSTONE_AUTH_CONTEXT'
def token_to_auth_context(token):
if not isinstance(token, token_model.KeystoneToken):
raise exception.UnexpectedError(_('token refernce must be a KeystoneToken type, got: %s') % type(token))
auth_context = {'token': token, 'is_delegated_auth': False}
try:
auth_context['user_id'] = token.user_id
except KeyError:
LOG.warning(_LW('RBAC:Invalid user data in token'))
raise exception.Unauthorized()
if token.project_scoped:
auth_context['project_id'] = token.project_id
elif token.domain_scoped:
auth_context['domain_id'] = token.domain_id
auth_context['domain_name'] = token.domain_name
else:
LOG.debug('RBAC: proceeding without project or domain scope')
if token.trust_scoped:
auth_context['is_delegated_auth'] = True
auth_context['trust_id'] = token.trust_id
auth_context['trustor_id'] = token.trustor_user_id
auth_conetxt['trustee_id'] = token.trustee_user_id
else:
auth_context['trust_id'] = None
auth_context['trustor_id'] = None
auth_context['trustee_id'] = None
roles = token.role_names
if roles:
auth_context['roles'] = roles
if token.oauth_scoped:
auth_context['is_delegated_auth'] = True
auth_context['consumer_id'] = token.oauth_consumer_id
auth_context['access_token_id'] = token.oauth_access_token_id
if token.is_federated_user:
auth_context['group_ids'] = token.federation_group_ids
return auth_context
</authorization.token_to_auth_context>
except exception.TokenNotFound:
LOG.warning(_LW('RBAC:Invalid token'))
raise exception.Unauthorized
def process_request(self, request):
if AUTH_TOKEN_HEADER not in request.headers:
LOG.debug(('Auth token not in the request header.'
'will not build auth context'))
return
if authorization.AUTH_CONTEXT_ENV in request.environ:
# 已经存在,返回
return
auth_context = self._build_auth_context(request)
request.environ[authorization.AUTH_CONTEXT_ENV] = auth_context
# request.environ['KEYSTONE_AUTH_CONTEXT'] = auth_context
</AuthContextMiddleware.factory>
5-[filter:token_auth]
paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory
<TokenAuthMiddleware.factory path="keystone.middleware.core">
AUTH_TOKEN_HEADER = 'X-Auth-Token'
CONTEXT_ENV = 'openstack.context'
PARAMS_ENV = 'openstack.params'
SUBJECT_TOKEN_HEADER = 'X-Subject-Token'
class TokenAuthMIddleware(wsgi.Middleware):
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['token_id'] = token
if SUBJECT_TOKEN_HEADER in request.headers:
context['subject_token_id'] = request.headers[SUBJECT_TOKEN_HEADER]
request.environ[CONTEXT_ENV] = context
</TokenAuthMiddleware.factory>
6-[filter:admin_token_auth]
paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory
<AdminTokenAuthMIddleware.factory path="keystone.middleware.core">
class AdminAuthMiddleware(wsgi.Middleware):
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['is_admin'] = (token == CONF.admin_token)
request.environ[CONTEXT_ENV] = context
# 验证token == CONF.admin_token,置is_admin的布尔值
</AdminTokenAuthMIddleware.factory>
7-[filter:json_body]
paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory
<JsonBodyMiddleware.factory path="keystone.middleware.core">
class JsonBodyMiddleware(wsgi.Middleware):
# 允许方法参数以序列化json形式通过
def process_request(self, request):
params_json = request.body:
# 如果为空,早早结束
if not params_json:
return
# 识别context_type是否为json
if request.context_type not in ('application/json', ''):
e = exception.ValidationError(attribute='application/json',
target='Content-Type header')
return wsgi.render_exception(e, request=request)
params_parsed = {}
try:
params_parsed = jsonutils.loads(params_json)
except ValueError:
e = exception.ValidationError(attribute='valid JSON',
target='request body')
return wsgi.render_exception(e, request=request)
finally:
if not params_parsed:
params_parsed = {}
if not isinstance(params_parsed, dict):
e = exception.ValidationError(attribute='valid JSON object',
target='request body')
return wsgi.render_exception(e, request=request)
params = {}
for k, v in params_parsed.items():
if k in (self, 'context'):
continue
if k.startswith('_'):
continue
params[k] = v
request.environ[PARAMS_ENV] = params
# 将request.body json化 ,然后以字典形式 放到 request.environ['openstack.params']
</JsonBodyMiddleware.factory>
8-[filter:ec2_extension]
paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory
<EC2Extension.factory path="keystone.contrib/ec2/routers">
build_resource_relation = functools.partial(
json_home.build_v3_extension_resource_relation, extension_name='OS-EC2',
extension_version='1.0')
build_parameter_relation = functools.partial(
json_home.build_v3_extension_parameter_relation.extension_name='OS-EC2',
extension_version='1.0')
class EC2Extension(wsgi.ExtensionRouter):
<wsgi.ExtensionRouter values="" path="keystone.common.wsgi">
class ExtensionRouter(Router):
<Router path="keystone.common.wsgi">
class Router(object):
def __init__(self, mapper):
# 为给定的routes.Mapper创建一个router
# 每个在Mapper里的router都需要指定一个controller,这就是要被调用的WSGI app
# action 就是controller里面的方法
self.map = mapper
self._router = routes.middleware.RouterMiddleware(self._dispatch, self.map)
@webob.dec.wsgify() # 下面就是调用的wsgfiy().__call__
def __call__(self, req):
@staticmethod
@webob.dec.wsgify()
def _dispatch(req):
# 将请求转发到匹配的controller
# self._router调用它, 匹配request,并将信息装入req.environ
match = req.environ['wsgiorg.routing_args'][1]
if not match:
msg = _('The resource could not be found.')
return render_exception(exception.NotFound(msg),
request=req,
user_local=best_match_language(req))
app = match['controller']
return app
return self._router
</Router>
def __init__(self, application, mapper=None):
if mapper is None:
mapper = routes.Mapper()
self.application = application
self.add_routes(mapper)
mapper.connect('/{path_info:.*}', controller=self.application)
super(ExtensionRouter, self).__init__(mapper)
def add_router(self, mapper)# 需要覆写
return pass
@classmethod
def factory(cls, global_config, **local_config):
def _factory(app):
conf = global_config.copy()
conf.update(local_config)
return cls(app, **local_config) # 调用类初始化
return _factory
</wsgi.ExtensionRouter>
def add_routes(self, mapper):
ec2_controller = controllers.EC2Controller()
<EC2Controller values="" path="keystone.contrib.ec2.controller">
@dependency.requires('policy_api', 'token_provider_api')
class EC2Controller(Ec2ControllerCommon, controller.V2Controller):
@controller.v2_deprecated
def authenticate(self, context, credentials=None, ec2Credentials=None):
(user_ref, tenant_ref, metadata_ref, role_ref, catalog_ref) =
self._authenticate(credentials=credentials,
ec2credentials=ec2credentials)
# 继承EC2ControllerCommon._authenticate
<Ec2ControllerCommon._authenticate path="">
def _authenticate(self, credentials=None, ec2credentials=None):
# credentials: ec2 签名字典
# ec2credentials : 丢弃的ec2 签名字典
if not credentials and ec2credentials: 只要ec2credentials
credentials = ec2credentials
if 'access' not in credentials:
raise exception.Unauthorized(message='EC2 signature not supplied.')
creds_ref = self._get_credentials(credentials['access']) # credentials['access'] 是ID
<EC2ControllerCommon._get_credentials values="(self, credential_id)" path="keystone.contrib.ec2.controller">
def _get_credentials(self, credentials):
# return: credentials ec2 credentials 字典
# utils 在 keystone.common
ec2_credentials_id = utils.hash_access_key(credential_id) # 对credential_id进行hash
creds = self.credential_api.get_credential(ec2_credential_id)
<credential_api.get_credential values="ec2_credential_id" path="keystone.credential.Credential">
self.credential_api 就是下面的类的实例
# Manager->Driver的过程
class Credential(credential.Driver):
def get_credential(self, credential_id):
session = sql.get_session()
return self._get_credential(session, credential_id).to_dict()
# self._get_credential(session, credential_id)返回的是CredentialModel
def _get_credential(self, session, credential_id):
ref = session.query(CredentialModel).get(credential_id)
# 查询表CredentialModel
if ref is None:
raise exception.CredentialNotFound(credential_id=credential_id)
return ref
</credential_api.get_credential>
if not creds:
raise exception.Unauthorized(message='EC2 access key not found.')
return self._convert_v3_to_ec2_credential(creds)
<self._convert_v3_to_ec2_credential values="creds" path="keystone.contrib.ec2.controllers.EC2ControllerCommon">
@staticmethod
def _convert_v3_to_ec2_credential(credential):
try:
blob = jsonutils.loads(credential['blob'])
except TYpeError:
blob = credential['blob']
return {'user_id': credential.get('user_id'),
'tenant_id': credential.get('project_id'),
'access': blob.get('access'),
'secret': blob.get('secret'),
'trust_id': blob.get('trust_id')}
</self._convert_v3_to_ec2_credential>
</EC2ControllerCommon._get_credentials>
self.check_signature(creds_ref, credentials)
<self.check_signature values="(creds_ref, credentials)" path="keystone.contrib.ec2.controllers.EC2ControllerCommon">
signer = ec2_utils.Ec2Signer(creds_ref['secret'])
...
</self.check_signature>
tenant_ref = self.resource_api.get_project(creds_ref['tenant_id'])
<self.resource_api.get_project values="(creds_ref['tenant_id'])" path="keystone.resource.core.Manager">
@dependency.provider('resource_api')
@dependency.requires('assignment_api', 'credential_api', 'domain_config_qpi', 'identity_api', 'revoke_api')
class Manager(manager.Manager):
# main entry point into the resource service
_DOMAIN = 'domain'
_PROJECT = 'project'
def __init__(self):
# 没特别指定就用assignment的驱动
resource_driver = CONF.resource.driver
if resource_driver is None:
assignment_manager = dependency.get_provider('assignment_api')
resource_driver = assignment_manager.default_resource_driver()
# 返回的是'keystone.resource.backends.sql.Resource.'
super(Manger, self).__init__(resource_driver)
# 初始化驱动 传递的是entry_point
@MEMOIZE
def get_project(self, project_id):
return self.driver.get_project(project)
<self.driver.get_project values="project" path="keystone.resource.backends.sql.Resource">
# from keystone import resource as keystone_resource
class Resource(keystone_resource.Driver):
def get_project(self, tenant_id):
with sql.transaction() as session:
<sql.transaction values="" path="keystone.common.sql.core">
@contextlib.contextmanager
# contextlib.contextmanager 是装饰函数级别的with,保存上下文
def transaction(expire_on_commit=False):
# 返回SQLAlchemy session
session = get_session(expire_on_commit=expire_on_commit):
with session.begin():
yield session
</sql.transaction>
return self._get_project(session, tenant_id).to_dict()
# _get_project 见下面
# 每次sql返回值都会进行一下to_dict()操作
def default_assignment_driver(self):
# 上面找驱动的返回 就是这里返回的字符串
return 'keystone.assignment.backends.sql.Assignment'
def _get_project(self, session, project):
project_ref = session.query(Project).get(project_id)
# 表Project
if project_ref is None:
raise exception.ProjectNotFound(project_id=project_id)
return project_ref
</self.driver.get_project>
</self.resource_api.get_project>
user_ref = self.identity_api.get_user(creds_ref['user_id'])
<self.identity_api.get_user values="(creds_ref['user_id'])" path="keystone.identity.core.Manager">
MEMOIZE = cache.get_memoization_decorator(section='identity')
DOMAIN_CONF = 'keystone.'
DOMAIN_CONF_FIAIL = '.conf'
@dependency.provider('identity_api')
@dependency.requires('assignment_api', 'credential_api', 'id_mapping_api', 'resource_api', 'revoke_api')
class Manager(manager.Mnager):
_USER = 'user'
_GROUP = 'group'
def __init__(self):
super(Manager, self).__init__(CONF.identity.driver)
self.domain_configs = DomainConfigs()
self.event_callbacks = {
notification.ACTIONS.deleted: {
'domain': [self._domain_deleted]
}
}
@domain_configured
<domain_configured path="keystone.identity.core">
# 这边会启动domain的配置加载
def domains_configured(f):
@functools.wraps(f):
def wrapper(self, *args, **kwargs):
if (not self.domain_configs.configured and CONF.identity.domain_specific_drivers_enabled):
self.domain_configs.setup_domain_drivers(self.driver, self.resource_api)
return f(self, *args, **kwargs)
return wrapper
</domain_configured>
@exception_translated('user')
@MEMOIZE
def get_user(self, user_id):
domain_id, driver, entity_id = (self._get_domain_driver_and_entity_id(user_id))
<self._get_domain_driver_and_entity_id values="user_id" path="">
def _get_domain_driver_and_entity_id(self, public_id):
# 查询public_id详情
# 返回: domain_id ,可为空,支持多domain, driver根据domain获取,entity_id根据driver
# 使用映射表查找domain,driver以及local entity
# 单一driver无需映射表
conf = CONF.identiy
# 因为不知道entity,所以需要做映射
if conf.domain_specific_drivers_enabled:
local_id_ref = self.id_mapping_api.get_id_mapping(public_id)
return (local_id_ref['domain_id'], self._select_identity_driver(local_id_ref['domain_id']),
local_id_ref['local_id'])
driver = self.driver
if driver.generates_uuids():
if driver.is_domain_aware:
return (None, driver, public_id)
else:
return (conf.default_domain_id, driver, public_id)
if not CONF.identity_mapping.backward_compatible_ids:
local_id_ref = self.id_mapping_api.get_id_mapping(public)
if local_id_ref:
return (
local_id_ref['domain_id'],
driver,
local_id_ref['local_id'],
)
else:
raise exception.PublicDNotFound(id=public_id)
return (conf.default_domain_id, driver, public_id)
</self._get_domain_driver_and_entity_id>
ref = driver.get_user(entity_id)
return self._set_domain_id_and_mapping(ref, domain_id, driver, mapping.EntityType.USER)
</self.identity_api.get_user>
metadata_ref = {}
metadata_ref['roles'] = (
self.assignment_api.get_roles_for_user_and_project(user_ref['id'], tenant_ref['id'])
<self.assignment_api.get_roles_for_user_and_project values="(user_ref['id'], tenant_ref['id'])" path="keystone.assignment.core">
@dependency.provider('assignment_api')
@dependency.requires('credential_api', 'identity_api', 'resource_api', 'revoke_api', 'role_api')
class Manager(manager.Manager):
_PROJECT = 'project'
_ROLE_REMOVED_FROM_USER = 'role_removed_from_user'
_INVALIDATION_USER_PROJECT_TOKENS = 'invalidate_user_project_tokens'
def __init__(self):
assignemnt = CONF.assignment.driver
# 以后identity将包括 identity、resource、assignment
super(Manager, self).__init__(assignment_driver)
def get_roles_for_user_and_project(self, user_id, tenant_id):
# 如果OS-INHERIT拓展可用,那么domain将包含role inherited
project_ref = self.resource_api.get_project(tenant_id)
user_role_list = _get_user_project_roles(user_id, project_ref)
group_role_list = _get_group_project_roles(user_id, project_ref)
return list(set(user_role_list + group_role_list))
</self.assignment_api.get_roles_for_user_and_project>
trust_id = creds_ref.get('trust_id')
if trust_id:
metadata_trf['trust_id'] = trust_id
metadata_ref['trustee_user_id'] = user_ref['id']
try:
self.identity_api.assert_user_enable(
user_id=user_ref['id'], user=user_ref)
<self.identity_api.assert_user_enable values="(user_id=user_ref['id'], user=user_ref)" path="keystone.identity.core">
def assert_user_enabled(self, user_id, user=None):
if user is None:
user = self.get_user(user_id)
self.resource_api.assert_domain_enabled(user['domain_id'])
<self.reouce_api.assert_domain_enbaled values="(user['domain_id'])" path="keystone.resource.core.Manager">
def assert_domain_enabled(self, domain_id, domain=None):
if domain is None:
domain = self.get_domain(domain_id)
<self.get_domain values="domain_id" path=".">
@MEMOIZE
def get_domain(self, domain_id):
return self.driver.get_domain(domain_id)
# 调用sql等后端驱动处理
</self.get_domain>
if not domain.get('enabled', True):
raise AssertionError(_('Domain is disabled: %s') % domain_id)
</self.reouce_api.assert_domain_enbaled>
if not user.get('enabled', True):
raise AssertionError(_('User is disabled: %s') % user_id)
# identity_api.assert_user_enabled(user_id, user)--->resource_api.assert_domain_enabled(user['domain_id'])
</self.identity_api.assert_user_enable>
self.resource_api.assert_domain_enabled(
domain_id=user_ref['domain_id'])
<self.resource_api.assert_domain_enabled values="(domain_id=user_ref['domain_id'])" path="keystone.resource.core">
# 就是上面的
</self.resource_api.assert_domain_enabled>
self.resource_api.assert_project_enabled(
project_id=tenant_ref['id'], project=tenant_ref)
# 跟assert_project_enabled 类似
except AssertionError as e:
six.reraise(exception.Unauthorized, exception.Unauthorized(e),
sys.exc_info()[2])
roles = metadata_ref.get('roles', [])
if not roles:
raise exception.Unauthorized(message='User not valid for tenant.')
roles_ref = [self.role_api.get_role(role_id) for role_id in roles]
<self.role_api.get_role values="(role_id)" path="keystone.assignment.core.RoleManager">
@dependency.provider('role_api')
@dependency.requires('assignment_api')
class RoleManager(manager.Manager):
_ROLE = 'role'
def __init__(self):
role_driver = CONF.role.driver
if rle_driver is None:
assignment_manager = dependency.get_provider('assignment_api')
role_driver = assignment_manager.default_role_driver()
super(RoleManager, self).__init__(role_driver)
@MEMOIZE
def get_role(self, role_id):
return self.driver.get_role(role_id)
</self.role_api.get_role>
catalog_ref = self.catalog_api.get_catalog(user_ref['id'], tenant_ref['id'])
return user_ref, tenant_ref, metadata_ref, roles_ref, catalog_ref
</Ec2ControllerCommon._authenticate>
user_ref = self.v3_to_v2_user(user_ref)
<self.v3_to_v2_user values="user_ref" path="keystone.common.controller.v2Controller">
# 上面EC2Controller类的继承
@staticmethod
def v3_to_v2_user(ref):
# v3 转变为v2
# v2 users 没有domain
# v2 user 需要tenantId 而不是 default_project_id
# v2 user 有username 属性
def _format_default_project_id(ref):
# 将default_project_id 转变为 tenantId
default_project_id = ref.pop('default_project_id', None)
if default_project_id is not None:
ref['tenantId'] = default_project_id
elif 'tenantId' in ref:
del ref['tenantId']
def _normalize_and_filter_user_properties(ref):
_format_default_project_ids(ref)
V2Controller.filter_domain(ref)
<V2Controller.filter_domain values="ref" path="keystone.contrib.ec2.core.V2Controller">
@staticmethod
def filter_domain(ref):
# 移除domain,这个方法确保v3 user创建时候属于默认domain
if 'domain' in ref:
if ref['domain'].get('id') != CONF.identity.default_domain_id:
# CONF.identity.default_domain_id = default 应该是008
raise exception.Unauthorized(
_('Non-default domain is not supported'))
del ref['domain']
return ref
</V2Controller.filter_domain>
V2Controller.filter_domain_id(ref)
<V2Controller.filter_domain_id values="ref" path=".">
@staticmethod
def filter_domain_id(ref):
# 因为V2没有domain_id 所以移除
if 'domain_id' in ref:
if ref['domain_id'] != CONF.identity.default_domain_id:
raise exception.Unauthorized(
_('Non-default domain is not supported'))
def ref['domain_id']
return ref
</V2Controller.filter_domain_id>
V2Controller.normalize_username_in_response(ref)
<V2Controller.normalize_username_in_response values="ref" path="">
@staticmethod
def normalize_username_in_response(ref):
# 在输出的ref中添加username
if 'username' not in ref nad 'name' in ref:
ref['username'] = ref['name']
return ref
</V2Controller.normalize_username_in_response>
return ref
if isinstance(ref, dict):
return _normalize_and_filter_user_properties(ref)
elif isinstance(ref, list):
[_normalize_and_filter_user_properties(x) for x in ref]
else:
raise ValueError(_('Expected dict or list: %s') % type(ref))
</self.v3_to_v2_user>
auth_token_data = dict(user=user_ref, tenant_ref, metadata=metadata_ref, id='placeholder')
(token_id, token_data) = self.token_provider_api.issue_v2_token(auth_token_data, roles_ref, catalog_ref)
<self.token_provider_api.issue_v2_token values="(auth_token_data, roles_ref, catalog_ref)" path="keystone.token.provider.Manger">
# token 总共会用到token_api , token_provider_api, 前者依赖于后者
MEMOIZE = cache.get_memoization_decorator(section='token')
UnsupportedTokenVersionException = exception.UnsupportedTokenVersionException
# 防止某些依赖于旧版本的UnsupportedTokenVersionException,增强兼容
V2 = token_model.V2
V3 = token_model.V3
VERSIONS = token_model.VERSIONS
@dependency.provider(‘token_provider_api’)
@dependency.requires('assignment_api', 'revoke_api')
class Manager(manager.Manager):
V2 = V2
V3 = V3
VERSIONS = VERSIONS
INVALIDATE_PROJECT_TOKEN_PERSISTENCE = 'invalidate_project_tokens'
INVALIDATE_USER_TOKEN_PERSISTENCE = 'invalidate_user_tokens'
_persistence_manager = None
def __init__(self):
super(Manager, self).__init__(CONF.token.provider)
self._register_callback_listeners()
def issue_v2_token(self, token_ref, roles_ref=None, catalog_ref=None):
token_id, token_data = self.driver.issue_v2_token(token_ref, roles_ref, catalog_ref)
<self.driver.issue_v2_token values="(token_ref, roles_ref, catalog_ref)" path="keystone.token.providers.uuid">
# 看keystone.conf可知 provider使用uuid
# persistent使用sql
# uuid 继承common.BaseProvider
class Provider(common.BaseProvider):
def __init__(self, *args, **kwargs):
super(Provider, self).__init__(*args, **kwargs)
def _get_token_id(self, token_data):
return uuid.uuid4().hex
def needs_persistence(self):
return True
@dependency.requires('catalog_api', 'identity_api', 'oauth_api', 'resource_api', 'role_api', 'trust_api')
class BaseProvider(provider.Provider):
def __init__(self, *args, **kwargs):
super(BaeProvider, self).__init__(*args, **kwargs)
self.v3_token_data_helper = V3TokenDataHelper()
self.v2_token_data_helper = V2TokenDataHelper()
def issue_v2_token(self, token_ref, reolse_ref=None, catalog_ref=None):
metadata_ref = token_ref['metadata']
trust_ref = None
if CONF.trust.enabled and metadata_ref and 'trust_id' in metadata_ref:
trust_ref = self.trust_api.get_trust(metadata_ref['trust_id'])
token_data = self.v2_token_data_helper.format_token(token_ref, roles_ref, catalog_ref, trust_ref)
<self.v2_token_data_helper.format_token values="(token_ref, roles_ref, catalog_ref, trust_ref)" path=".">
@dependency.requires('catalog_api', 'resource_api')
class V2TokenDataHelper(object):
@classmethod
def format_token(cls, token_ref, roles_ref=None, catalog_ref=None):
audit_info =None
user_ref = token_ref['user']
metadata_ref = token_ref['metadata']
if roles_ref is None:
roles_ref = []
expires = token_ref.get('expires', provider.default_expire_time())
# provider.default_expire_time() 是调用的keystone.token.provider
<provider.default_expire_time values="" path="keystone.token.provider">
def default_expire_time():
expire_delta = datetime.timedelta(seconds=CONF.token.expiration)
return timeutils.utcnow() + expire_delta
</provider.default_expire_time>
if expires is not None:
if not isinstance(expires, six.text_type):
expires = timeutils.isotime(expires)
token_data = token_ref.get('token_data')
if token_data:
token_audit = token_data.get('access', token_data).get('token', {}).get('audit_ids')
audit_info = token_audit
if audit_info is None:
audit_info = provider.audit_info(token_ref.get('parent_audit_id'))
<provider.audit_info values="token_ref.get('parent_audit_id')" path="keystone.token.provider">
def random_urlsafe_str():
return base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
# 生成随机的url
def audit_info(parent_audit_id):
audit_id = random_urlsafe_str()
if parent_audit_id is not None:
return [audit_id, parent_audit_id]
return [audit_id]
</provider.audit_info>
o = {'access': {'id': token_ref['id'],
'expires': expires,
'issued_at': timeutils.strtime(),
'audit_ids': audit_info
},
'user': {'id': user_ref['id'],
'name': user_ref['name'],
'username': user_ref['name'],
'roles': roles_ref,
'roles_links': metadata_ref.get('roles_links', [])
}
}
if 'bind' in token_ref:
o['access']['token']['bind'] = token_ref['bind']
if 'tenant' in token_ref and token_ref['tenant']:
token_ref['tenant']['enabled'] = True
o['access']['token']['tenant'] = token_ref['tenant']
if catalog_ref is not None:
o['access']['serviceCatalog'] = V2TokenDataHelper.format_catalog(catalog_ref)
if metadata_ref:
if 'is_admin' in metadata_ref:
o['access']['metadata'] = {'is_admin': metadata_ref['is_admin']}
else:
o['access']['metadata'] = {'is_admin': 0}
if 'roles' in metadata_ref:
o['access']['metadata']['roles'] = metadata_ref['roles']
if CONF.trust.enabled and trust_ref:
o['access']['trust'] = {'trustee_user_id':
trust_ref['trustee_user_id'],
'id':trust_ref['id'],
'trustor_user_id':
trust_ref['trustor_user_id'],
'impersonation':
trust_ref['impersonation']
}
return o
@classmethod
def format_catalog(cls, catalog_ref):
<!--# 输出形式:
# {$REGION: {
# {$SERVICE: {
# $key1: $value1,
#
# }
# }
# }
# 输入形式:
# [{'name': $SERVICE[name],
#'type': $SERVICE,
# 'endpoints': [{
# 'tenantId': $tenant_id,
# ...
# 'region': $REGION,
# }],
# 'endpoints_links': [],
# }]
-->
if not catalog_ref:
services = {}
for region, region_ref in six.iteritems(catalog_ref):
for service, service_ref in six.iteritems(region_ref):
new_service_ref = services.get(service, {})
new_service_ref['name'] = service_ref.pop('name')
new_service_ref['type'] = service
new_service_ref['endpoints_links'] = []
service_ref['region'] = region
endpoints_ref = new_service_ref.get('endpoints', [])
endpoints_ref.append(service_ref)
services[service] = new_service_ref
return services.values()
</self.v2_token_data_helper.format_token>
token_id = self._get_token_id(token_data)
token_data['access']['token']['id'] = token_id
return token_id, token_data
</self.driver.issue_v2_token>
if self._needs_persistence:
<self._needs_persistence path=".">
@property
def _needs_persistence:
return self.driver.needs_persistence()
<self.driver.needs_persistence values="" path="keystone.token.provider.uuid.Provider">
class Provider(common.BaseProvider):
def __init__(self, *args, **kwargs):
super(Provider, self).__init__(*args, **kwargs)
def _get_token_id(self, token_data):
return uuid.uuid4().hex
def needs_persistence(self):
return True
# 这就是说token需要被后端存储
</self.driver.needs_persistence>
</self._needs_persistence>
data = dict(
id=token_id,
expires=token_data['access']['token']['expires'],
user=token_ref['user']