This section contains the generic implementation of RFC7662.
With the help of register_endpoint
offered by Flask OAuth 2.0 Server,
we can easily add introspection endpoint to the authorization server. But
first, we need to implement the missing methods:
from authlib.oauth2.rfc7662 import IntrospectionEndpoint
class MyIntrospectionEndpoint(IntrospectionEndpoint):
def query_token(self, token, token_type_hint, client):
if token_type_hint == 'access_token':
tok = Token.query.filter_by(access_token=token).first()
elif token_type_hint == 'refresh_token':
tok = Token.query.filter_by(refresh_token=token).first()
else:
# without token_type_hint
tok = Token.query.filter_by(access_token=token).first()
if not tok:
tok = Token.query.filter_by(refresh_token=token).first()
if tok:
if tok.client_id == client.client_id:
return tok
if has_introspect_permission(client):
return tok
def introspect_token(self, token):
return {
'active': True,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
# register it to authorization server
server.register_endpoint(MyIntrospectionEndpoint)
After the registration, we can create a response with:
@app.route('/oauth/introspect', methods=['POST'])
def introspect_token():
return server.create_endpoint_response(MyIntrospectionEndpoint.ENDPOINT_NAME)
authlib.oauth2.rfc7662.
IntrospectionEndpoint
(request, server)¶Implementation of introspection endpoint which is described in RFC7662.
ENDPOINT_NAME
= 'introspection'¶Endpoint name to be registered
validate_endpoint_request
()¶The protected resource calls the introspection endpoint using an HTTP
POST
request with parameters sent as
“application/x-www-form-urlencoded” data. The protected resource sends a
parameter representing the token along with optional parameters
representing additional context that is known by the protected resource
to aid the authorization server in its response.
access_token
value returned from the token endpoint
defined in OAuth 2.0. For refresh tokens, this is the
refresh_token
value returned from the token endpoint as defined
in OAuth 2.0.create_endpoint_response
()¶Validate introspection request and create the response.
Returns: | (status_code, body, headers) |
---|
query_token
(token, token_type_hint, client)¶Get the token from database/storage by the given token string. Developers should implement this method:
def query_token(self, token, token_type_hint, client):
if token_type_hint == 'access_token':
tok = Token.query_by_access_token(token)
elif token_type_hint == 'refresh_token':
tok = Token.query_by_refresh_token(token)
else:
tok = Token.query_by_access_token(token)
if not tok:
tok = Token.query_by_refresh_token(token)
if check_client_permission(client, tok):
return tok
introspect_token
(token)¶Read given token and return its introspection metadata as a dictionary following Section 2.2:
def introspect_token(self, token):
active = is_token_active(token)
return {
'active': active,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
authenticate_endpoint_client
()¶Authentication client for endpoint with CLIENT_AUTH_METHODS
.