RFC7662: OAuth 2.0 Token Introspection

This section contains the generic implementation of RFC7662.

Register Introspection Endpoint

With the help of register_endpoint offered by Flask OAuth 2.0 Server, we can easily add introspection endpoint to the authorization server. But first, we need to implement the missing methods:

from authlib.oauth2.rfc7662 import IntrospectionEndpoint

class MyIntrospectionEndpoint(IntrospectionEndpoint):
    def query_token(self, token, token_type_hint, client):
        if token_type_hint == 'access_token':
            tok = Token.query.filter_by(access_token=token).first()
        elif token_type_hint == 'refresh_token':
            tok = Token.query.filter_by(refresh_token=token).first()
        else:
            # without token_type_hint
            tok = Token.query.filter_by(access_token=token).first()
            if not tok:
                tok = Token.query.filter_by(refresh_token=token).first()
        if tok:
            if tok.client_id == client.client_id:
                return tok
            if has_introspect_permission(client):
                return tok

    def introspect_token(self, token):
        return {
            'active': True,
            'client_id': token.client_id,
            'token_type': token.token_type,
            'username': get_token_username(token),
            'scope': token.get_scope(),
            'sub': get_token_user_sub(token),
            'aud': token.client_id,
            'iss': 'https://server.example.com/',
            'exp': token.expires_at,
            'iat': token.issued_at,
        }

# register it to authorization server
server.register_endpoint(MyIntrospectionEndpoint)

After the registration, we can create a response with:

@app.route('/oauth/introspect', methods=['POST'])
def introspect_token():
    return server.create_endpoint_response(MyIntrospectionEndpoint.ENDPOINT_NAME)

API Reference

class authlib.oauth2.rfc7662.IntrospectionEndpoint(request, server)

Implementation of introspection endpoint which is described in RFC7662.

ENDPOINT_NAME = 'introspection'

Endpoint name to be registered

validate_endpoint_request()

The protected resource calls the introspection endpoint using an HTTP POST request with parameters sent as “application/x-www-form-urlencoded” data. The protected resource sends a parameter representing the token along with optional parameters representing additional context that is known by the protected resource to aid the authorization server in its response.

token
REQUIRED The string value of the token. For access tokens, this is the access_token value returned from the token endpoint defined in OAuth 2.0. For refresh tokens, this is the refresh_token value returned from the token endpoint as defined in OAuth 2.0.
token_type_hint
OPTIONAL A hint about the type of the token submitted for introspection.
create_endpoint_response()

Validate introspection request and create the response.

Returns:(status_code, body, headers)
query_token(token, token_type_hint, client)

Get the token from database/storage by the given token string. Developers should implement this method:

def query_token(self, token, token_type_hint, client):
    if token_type_hint == 'access_token':
        tok = Token.query_by_access_token(token)
    elif token_type_hint == 'refresh_token':
        tok = Token.query_by_refresh_token(token)
    else:
        tok = Token.query_by_access_token(token)
        if not tok:
            tok = Token.query_by_refresh_token(token)

    if check_client_permission(client, tok):
        return tok
introspect_token(token)

Read given token and return its introspection metadata as a dictionary following Section 2.2:

def introspect_token(self, token):
    active = is_token_active(token)
    return {
        'active': active,
        'client_id': token.client_id,
        'token_type': token.token_type,
        'username': get_token_username(token),
        'scope': token.get_scope(),
        'sub': get_token_user_sub(token),
        'aud': token.client_id,
        'iss': 'https://server.example.com/',
        'exp': token.expires_at,
        'iat': token.issued_at,
    }
authenticate_endpoint_client()

Authentication client for endpoint with CLIENT_AUTH_METHODS.