Source code for nuc.validate

"""
NUC validation.
"""

import itertools
from enum import Enum
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Any, Dict, List

from nuc.envelope import DecodedNucToken, InvalidSignatureException, NucTokenEnvelope
from nuc.policy import (
    AndConnector,
    AnyOfOperator,
    EqualsOperator,
    NotConnector,
    NotEqualsOperator,
    OperatorPolicy,
    OrConnector,
    Policy,
)
from nuc.selector import SelectorContext
from nuc.token import Command, DelegationBody, Did, InvocationBody, NucToken

_REVOCATION_COMMAND: Command = Command(["nuc", "revoke"])


[docs] @dataclass class InvocationRequirement: """ Require an invocation. """ audience: Did
[docs] @dataclass class DelegationRequirement: """ Require a delegation. """ audience: Did
[docs] @dataclass class ValidationParameters: """ Parameters used during token validation. """ max_chain_length: int max_policy_width: int max_policy_depth: int token_requirements: InvocationRequirement | DelegationRequirement | None
[docs] @staticmethod def default() -> "ValidationParameters": """ Build the default validation parameters. """ return ValidationParameters( max_chain_length=5, max_policy_width=10, max_policy_depth=5, token_requirements=None, )
[docs] class NucTokenValidator: """ A validator for NUC tokens. Example ------- .. code-block:: py3 from nuc.validate import NucTokenValidator from nuc.token import NucToken token = NucToken.parse( { "iss": "did:nil:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "aud": "did:nil:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", "sub": "did:nil:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc", "cmd": "/nil/db/read", "pol": [["==", ".foo", 42]], "nonce": "beef", } ) validator = NucTokenValidator([]) validator.validate(token) """ def __init__(self, root_issuers: List[Did]) -> None: """ Construct a new token validator. Arguments --------- root_issuers The list of entities that can issue root tokens. """ self._root_issuers = set(root_issuers) self._time_provider = lambda: datetime.now(timezone.utc)
[docs] def validate( self, envelope: NucTokenEnvelope, context: SelectorContext, parameters: ValidationParameters = ValidationParameters.default(), ) -> None: """ Validate a NUC token using the given parameters. This will raise an exception if validation fails. Arguments --------- envelope The token to be validated. parameters The validation parameters. """ if len(envelope.proofs) + 1 > parameters.max_chain_length: raise ValidationException(ValidationKind.CHAIN_TOO_LONG) token = envelope.token.token match token.proofs: case []: proofs = [] case [proof_hash]: proofs = self._sort_proofs(proof_hash, envelope.proofs) case _: raise ValidationException(ValidationKind.TOO_MANY_PROOFS) # Build a chain from root token up to the token itself token_chain = [token, *proofs] token_chain.reverse() now = self._time_provider() self._validate_proofs(token, proofs) self._validate_token_chain(token_chain, parameters, now) self._validate_token(token, proofs, parameters.token_requirements, context) try: envelope.validate_signatures() except InvalidSignatureException as ex: raise ValidationException(ValidationKind.INVALID_SIGNATURES) from ex
def _validate_proofs(self, token: NucToken, proofs: List[NucToken]) -> None: if self._root_issuers: # The root issuer of this token is either the last proof issuer or the issuer of the # token itself if we have no proofs. root = proofs[-1] if proofs else token if root.issuer not in self._root_issuers: raise ValidationException(ValidationKind.ROOT_KEY_SIGNATURE_MISSING) for proof in proofs: match proof.body: case DelegationBody(): pass case InvocationBody(): raise ValidationException(ValidationKind.PROOFS_MUST_BE_DELEGATIONS) @staticmethod def _validate_token_chain( tokens: List[NucToken], parameters: ValidationParameters, now: datetime ) -> None: for previous, current in itertools.pairwise(tokens): NucTokenValidator._validate_relationship_properties(previous, current) for token in tokens: NucTokenValidator._validate_temporal_properties(token, now) for token in tokens: if isinstance(token.body, DelegationBody): NucTokenValidator._validate_policies_properties( token.body.policies, parameters ) if len(tokens) >= 2: token = tokens[1] _validate( token.issuer == token.subject, ValidationKind.SUBJECT_NOT_IN_CHAIN ) @staticmethod def _validate_relationship_properties( previous: NucToken, current: NucToken ) -> None: _validate( previous.audience == current.issuer, ValidationKind.ISSUER_AUDIENCE_MISMATCH ) _validate( previous.subject == current.subject, ValidationKind.DIFFERENT_SUBJECTS ) _validate( current.command.is_attenuation_of(previous.command) or current.command == _REVOCATION_COMMAND, ValidationKind.COMMAND_NOT_ATTENUATED, ) if previous.not_before and current.not_before: _validate( previous.not_before <= current.not_before, ValidationKind.NOT_BEFORE_BACKWARDS, ) @staticmethod def _validate_temporal_properties(token: NucToken, current_time: datetime) -> None: if token.expires_at: _validate(token.expires_at >= current_time, ValidationKind.TOKEN_EXPIRED) if token.not_before: _validate( current_time >= token.not_before, ValidationKind.NOT_BEFORE_NOT_MET ) @staticmethod def _validate_policies_properties( policies: List[Policy], parameters: ValidationParameters ) -> None: _validate( len(policies) <= parameters.max_policy_width, ValidationKind.POLICY_TOO_WIDE ) for policy in policies: properties = PolicyTreeProperties.from_policy(policy) _validate( properties.max_policy_width <= parameters.max_policy_width, ValidationKind.POLICY_TOO_WIDE, ) _validate( properties.max_depth <= parameters.max_policy_depth, ValidationKind.POLICY_TOO_DEEP, ) @staticmethod def _validate_token( token: NucToken, proofs: List[NucToken], token_requirements: InvocationRequirement | DelegationRequirement | None, context: SelectorContext, ) -> None: match token.body: case DelegationBody(): match token_requirements: case InvocationRequirement(): raise ValidationException(ValidationKind.NEED_INVOCATION) case DelegationRequirement(audience): _validate( token.audience == audience, ValidationKind.INVALID_AUDIENCE ) case None: pass case InvocationBody(): match token_requirements: case InvocationRequirement(audience): _validate( token.audience == audience, ValidationKind.INVALID_AUDIENCE ) case DelegationRequirement(audience): raise ValidationException(ValidationKind.NEED_DELEGATION) case None: pass token_json = token.to_json() for proof in proofs: NucTokenValidator._validate_policy_matches( proof, token_json, context ) @staticmethod def _validate_policy_matches( proof: NucToken, token_json: Dict[str, Any], context: SelectorContext ) -> None: match proof.body: case DelegationBody(policies): for policy in policies: if not policy.matches(token_json, context): raise ValidationException(ValidationKind.POLICY_NOT_MET) case InvocationBody(): raise ValidationException(ValidationKind.PROOFS_MUST_BE_DELEGATIONS) @staticmethod def _sort_proofs( starting_hash: bytes, proofs: List[DecodedNucToken] ) -> List[NucToken]: indexed_proofs = dict((proof.compute_hash(), proof.token) for proof in proofs) sorted_proofs: List[NucToken] = [] next_hash = starting_hash while True: next_proof = indexed_proofs.get(next_hash) if not next_proof: raise ValidationException(ValidationKind.MISSING_PROOF) indexed_proofs.pop(next_hash) sorted_proofs.append(next_proof) match next_proof.proofs: case []: break case [proof_hash]: next_hash = proof_hash case _: raise ValidationException(ValidationKind.TOO_MANY_PROOFS) if indexed_proofs: raise ValidationException(ValidationKind.UNCHAINED_PROOFS) return sorted_proofs
[docs] class ValidationKind(Enum): """ The kind of validation that failed. """ CHAIN_TOO_LONG = "token chain is too long" COMMAND_NOT_ATTENUATED = "command is not an attenuation" DIFFERENT_SUBJECTS = "different subjects in chain" INVALID_AUDIENCE = "invalid audience" INVALID_SIGNATURES = "invalid signatures" ISSUER_AUDIENCE_MISMATCH = "issuer/audience mismatch" MISSING_PROOF = "proof is missing" NEED_DELEGATION = "token must be a delegation" NEED_INVOCATION = "token must be an invocation" NOT_BEFORE_BACKWARDS = "`not before` cannot move backwards" NOT_BEFORE_NOT_MET = "`not before` date not met" POLICY_NOT_MET = "policy not met" POLICY_TOO_DEEP = "policy is too deep" POLICY_TOO_WIDE = "policy is too wide" PROOFS_MUST_BE_DELEGATIONS = "proofs must be delegations" ROOT_KEY_SIGNATURE_MISSING = "root NUC is not signed by root keypair" SUBJECT_NOT_IN_CHAIN = "subject not in chain" TOKEN_EXPIRED = "token is expired" TOO_MANY_PROOFS = "up to one `prf` in a token is allowed" UNCHAINED_PROOFS = "extra proofs not part of chain provided"
[docs] class ValidationException(Exception): """ Token validation failed. """ def __init__(self, kind: ValidationKind) -> None: super().__init__(self, f"validation failed: {kind}") self.kind = kind
def _validate(condition: bool, validation: ValidationKind) -> None: if not condition: raise ValidationException(validation)
[docs] @dataclass class PolicyTreeProperties: """ The properties of a policy tree. """ max_depth: int max_policy_width: int
[docs] @staticmethod def from_policy(root_policy: Policy) -> "PolicyTreeProperties": """ Construct a policy tree properties object from a policy. """ match root_policy.body: case AndConnector() | OrConnector(): properties = PolicyTreeProperties( max_depth=0, max_policy_width=len(root_policy.body.policies) ) for policy in root_policy.body.policies: inner_properties = PolicyTreeProperties.from_policy(policy) properties.max_depth = max( properties.max_depth, inner_properties.max_depth ) properties.max_policy_width = max( properties.max_policy_width, inner_properties.max_policy_width ) properties.max_depth += 1 return properties case NotConnector(policy): properties = PolicyTreeProperties.from_policy(policy) properties.max_depth += 1 return properties case OperatorPolicy(): match root_policy.body.operator: case EqualsOperator() | NotEqualsOperator(): return PolicyTreeProperties(max_depth=1, max_policy_width=1) case AnyOfOperator(choices): return PolicyTreeProperties( max_depth=1, max_policy_width=len(choices) )