This section contains the generic implementation of RFC7662. OAuth 2.0 Token Introspection is usually designed to let resource servers to know content of a token.
Changed in version v1.0.
Authlib is designed to be very extendable, with the method of
.register_endpoint
on AuthorizationServer
, it is easy to add the
introspection endpoint to the authorization server. It works on both
Flask OAuth 2.0 Server and Django OAuth 2.0 Server. But first,
we need to implement the missing methods:
from authlib.oauth2.rfc7662 import IntrospectionEndpoint
class MyIntrospectionEndpoint(IntrospectionEndpoint):
def query_token(self, token, token_type_hint):
if token_type_hint == 'access_token':
tok = Token.query.filter_by(access_token=token).first()
elif token_type_hint == 'refresh_token':
tok = Token.query.filter_by(refresh_token=token).first()
else:
# without token_type_hint
tok = Token.query.filter_by(access_token=token).first()
if not tok:
tok = Token.query.filter_by(refresh_token=token).first()
return tok
def introspect_token(self, token):
return {
'active': True,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
def check_permission(self, token, client, request):
# for example, we only allow internal client to access introspection endpoint
return client.client_type == 'internal'
# register it to authorization server
server.register_endpoint(MyIntrospectionEndpoint)
After the registration, we can create a response with:
@app.route('/oauth/introspect', methods=['POST'])
def introspect_token():
return server.create_endpoint_response(MyIntrospectionEndpoint.ENDPOINT_NAME)
New in version v1.0.
When resource server has no access to token database, it can use introspection endpoint to validate the given token. Here is how:
import requests
from authlib.oauth2.rfc7662 import IntrospectTokenValidator
from your_project import secrets
class MyIntrospectTokenValidator(IntrospectTokenValidator):
def introspect_token(self, token_string):
url = 'https://example.com/oauth/introspect'
data = {'token': token_string, 'token_type_hint': 'access_token'}
auth = (secrets.internal_client_id, secrets.internal_client_secret)
resp = requests.post(url, data=data, auth=auth)
resp.raise_for_status()
return resp.json()
We can then register this token validator in to resource protector:
require_oauth = ResourceProtector()
require_oauth.register_token_validator(MyIntrospectTokenValidator())
Please note, when using IntrospectTokenValidator
, the current_token
will be
a dict.
Implementation of introspection endpoint which is described in RFC7662.
Endpoint name to be registered
The protected resource calls the introspection endpoint using an HTTP
POST
request with parameters sent as
“application/x-www-form-urlencoded” data. The protected resource sends a
parameter representing the token along with optional parameters
representing additional context that is known by the protected resource
to aid the authorization server in its response.
REQUIRED The string value of the token. For access tokens, this
is the access_token
value returned from the token endpoint
defined in OAuth 2.0. For refresh tokens, this is the
refresh_token
value returned from the token endpoint as defined
in OAuth 2.0.
OPTIONAL A hint about the type of the token submitted for introspection.
Validate introspection request and create the response.
(status_code, body, headers)
Check if the request has permission to introspect the token. Developers MUST implement this method:
def check_permission(self, token, client, request):
# only allow a special client to introspect the token
return client.client_id == 'introspection_client'
bool
Get the token from database/storage by the given token string. Developers should implement this method:
def query_token(self, token_string, token_type_hint):
if token_type_hint == 'access_token':
tok = Token.query_by_access_token(token_string)
elif token_type_hint == 'refresh_token':
tok = Token.query_by_refresh_token(token_string)
else:
tok = Token.query_by_access_token(token_string)
if not tok:
tok = Token.query_by_refresh_token(token_string)
return tok
Read given token and return its introspection metadata as a dictionary following Section 2.2:
def introspect_token(self, token):
return {
'active': True,
'client_id': token.client_id,
'token_type': token.token_type,
'username': get_token_username(token),
'scope': token.get_scope(),
'sub': get_token_user_sub(token),
'aud': token.client_id,
'iss': 'https://server.example.com/',
'exp': token.expires_at,
'iat': token.issued_at,
}
Authentication client for endpoint with CLIENT_AUTH_METHODS
.
A method to query token from database with the given token string. Developers MUST re-implement this method. For instance:
def authenticate_token(self, token_string):
return get_token_from_database(token_string)
token_string – A string to represent the access_token.
token
Request introspection token endpoint with the given token string, authorization server will return token information in JSON format. Developers MUST implement this method before using it:
def introspect_token(self, token_string):
# for example, introspection token endpoint has limited
# internal IPs to access, so there is no need to add
# authentication.
url = 'https://example.com/oauth/introspect'
resp = requests.post(url, data={'token': token_string})
resp.raise_for_status()
return resp.json()
A method to validate if the authorized token is valid, if it has the permission on the given scopes. Developers MUST re-implement this method. e.g, check if token is expired, revoked:
def validate_token(self, token, scopes, request):
if not token:
raise InvalidTokenError()
if token.is_expired() or token.is_revoked():
raise InvalidTokenError()
if not match_token_scopes(token, scopes):
raise InsufficientScopeError()