RFC7662: OAuth 2.0 Token Introspection

This section contains the generic implementation of RFC7662. OAuth 2.0 Token Introspection is usually designed to let resource servers to know content of a token.

Register Introspection Endpoint

Authlib is designed to be very extendable, with the method of .register_endpoint on AuthorizationServer, it is easy to add the introspection endpoint to the authorization server. It works on both Flask OAuth 2.0 Server and Django OAuth 2.0 Server. But first, we need to implement the missing methods:

from authlib.oauth2.rfc7662 import IntrospectionEndpoint

class MyIntrospectionEndpoint(IntrospectionEndpoint):
    def query_token(self, token, token_type_hint, client):
        if token_type_hint == 'access_token':
            tok = Token.query.filter_by(access_token=token).first()
        elif token_type_hint == 'refresh_token':
            tok = Token.query.filter_by(refresh_token=token).first()
        else:
            # without token_type_hint
            tok = Token.query.filter_by(access_token=token).first()
            if not tok:
                tok = Token.query.filter_by(refresh_token=token).first()
        if tok:
            if tok.client_id == client.client_id:
                return tok
            if has_introspect_permission(client):
                return tok

    def introspect_token(self, token):
        return {
            'active': True,
            'client_id': token.client_id,
            'token_type': token.token_type,
            'username': get_token_username(token),
            'scope': token.get_scope(),
            'sub': get_token_user_sub(token),
            'aud': token.client_id,
            'iss': 'https://server.example.com/',
            'exp': token.expires_at,
            'iat': token.issued_at,
        }

# register it to authorization server
server.register_endpoint(MyIntrospectionEndpoint)

After the registration, we can create a response with:

@app.route('/oauth/introspect', methods=['POST'])
def introspect_token():
    return server.create_endpoint_response(MyIntrospectionEndpoint.ENDPOINT_NAME)

API Reference

class authlib.oauth2.rfc7662.IntrospectionEndpoint(server)

Implementation of introspection endpoint which is described in RFC7662.

ENDPOINT_NAME = 'introspection'

Endpoint name to be registered

authenticate_endpoint_credential(request, client)

The protected resource calls the introspection endpoint using an HTTP POST request with parameters sent as “application/x-www-form-urlencoded” data. The protected resource sends a parameter representing the token along with optional parameters representing additional context that is known by the protected resource to aid the authorization server in its response.

token
REQUIRED The string value of the token. For access tokens, this is the access_token value returned from the token endpoint defined in OAuth 2.0. For refresh tokens, this is the refresh_token value returned from the token endpoint as defined in OAuth 2.0.
token_type_hint
OPTIONAL A hint about the type of the token submitted for introspection.
create_endpoint_response(request)

Validate introspection request and create the response.

Returns:(status_code, body, headers)
query_token(token, token_type_hint, client)

Get the token from database/storage by the given token string. Developers should implement this method:

def query_token(self, token, token_type_hint, client):
    if token_type_hint == 'access_token':
        tok = Token.query_by_access_token(token)
    elif token_type_hint == 'refresh_token':
        tok = Token.query_by_refresh_token(token)
    else:
        tok = Token.query_by_access_token(token)
        if not tok:
            tok = Token.query_by_refresh_token(token)

    if check_client_permission(client, tok):
        return tok
introspect_token(token)

Read given token and return its introspection metadata as a dictionary following Section 2.2:

def introspect_token(self, token):
    active = is_token_active(token)
    return {
        'active': active,
        'client_id': token.client_id,
        'token_type': token.token_type,
        'username': get_token_username(token),
        'scope': token.get_scope(),
        'sub': get_token_user_sub(token),
        'aud': token.client_id,
        'iss': 'https://server.example.com/',
        'exp': token.expires_at,
        'iat': token.issued_at,
    }
authenticate_endpoint_client(request)

Authentication client for endpoint with CLIENT_AUTH_METHODS.