Source code for nuc.policy

"""
NUC policy definitions.
"""

from dataclasses import dataclass
from typing import Any, List

from .selector import Selector, SelectorContext


[docs] @dataclass class EqualsOperator: """ An operator that checks for equality. """ arg: Any
[docs] @dataclass class NotEqualsOperator: """ An operator that checks for inequality. """ arg: Any
[docs] @dataclass class AnyOfOperator: """ An operator that checks that a value is within a list of values. """ arg: List[Any]
[docs] @dataclass class OperatorPolicy: """ A policy that applies a selector on the NUC token and applies an operator to it. """ selector: Selector operator: EqualsOperator | NotEqualsOperator | AnyOfOperator
[docs] @staticmethod def parse(operator: str, data: Any) -> "OperatorPolicy": """ Parse an operator policy. """ keys = _ensure_list(data) raw_selector = _pop_next(keys, "selector") if not isinstance(raw_selector, str): raise MalformedPolicyException("selector must be a string") selector = Selector.parse(raw_selector) value = _pop_next(keys, "value") output = None match operator: case "==": output = EqualsOperator(value) case "!=": output = NotEqualsOperator(value) case "anyOf": if not isinstance(value, list): raise MalformedPolicyException("'anyOf' expects list as value") output = AnyOfOperator(value) case _: raise MalformedPolicyException(f"invalid operator '{operator}'") return OperatorPolicy(selector, output)
[docs] def serialize(self) -> List[Any]: """ Serialize this policy as a list. """ selector = str(self.selector) match self.operator: case EqualsOperator(arg): return ["==", selector, arg] case NotEqualsOperator(arg): return ["!=", selector, arg] case AnyOfOperator(args): return ["anyOf", selector, args]
[docs] def matches(self, value: Any, context: SelectorContext) -> bool: """ Checks whether this policy matches a value. """ value = self.selector.apply(value, context) match self.operator: case EqualsOperator(arg): return arg == value case NotEqualsOperator(arg): return arg != value case AnyOfOperator(args): return any(value == arg for arg in args)
[docs] @dataclass class AndConnector: """ A connector that checks that a sequence of policies is valid. """ policies: List["Policy"]
[docs] @dataclass class OrConnector: """ A connector that checks that at least policy in a sequence is valid. """ policies: List["Policy"]
[docs] @dataclass class NotConnector: """ A connector that checks that at a policy is not valid """ policy: "Policy"
type ConnectorPolicy = AndConnector | OrConnector | NotConnector
[docs] @dataclass class Policy: """ A policy that restricts how a NUC can be used. """ body: OperatorPolicy | ConnectorPolicy
[docs] @staticmethod def parse(data: Any) -> "Policy": """ Parse a policy. Arguments --------- data The raw policy to be parsed. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.parse(["eq", ".foo", 42]) """ keys = _ensure_list(data) op = _pop_next(keys, "operand") body = None match op: case "==" | "!=" | "anyOf": body = OperatorPolicy.parse(op, keys) case "and": body = AndConnector(_parse_policies(_pop_next(keys, "policies"))) case "or": body = OrConnector(_parse_policies(_pop_next(keys, "policies"))) case "not": body = NotConnector(Policy.parse(_pop_next(keys, "policy"))) case _: raise MalformedPolicyException(f"invalid operator '{op}'") return Policy(body)
[docs] def serialize(self) -> List[Any]: """ Serialize this policy as a list. """ match self.body: case OperatorPolicy(): return self.body.serialize() case AndConnector(policies): return ["and", [policy.serialize() for policy in policies]] case OrConnector(policies): return ["or", [policy.serialize() for policy in policies]] case NotConnector(policy): return ["not", policy.serialize()]
[docs] def matches(self, value: Any, context: SelectorContext) -> bool: """ Checks whether this policy matches a value. Arguments --------- value The value to be matched. Example ------- .. code-block:: py3 from nuc.policy import Policy # Parse a policy policy = Policy.parse(["eq", ".foo", 42]) # Ensure it matches a given value. assert policy.matches({ "foo": 42 }) """ match self.body: case OperatorPolicy(): return self.body.matches(value, context) case AndConnector(policies): return bool(policies) and all( policy.matches(value, context) for policy in policies ) case OrConnector(policies): return any(policy.matches(value, context) for policy in policies) case NotConnector(policy): return not policy.matches(value, context)
[docs] @staticmethod def equals(selector: str, value: Any) -> "Policy": """ Create a policy that expects a selected value to equal another. Arguments --------- selector A jq-like selector. value The value that the value pointed to by the selector should match. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.equals(".foo", 42) assert policy.matches({ "foo": 42 }) """ return Policy(OperatorPolicy(Selector.parse(selector), EqualsOperator(value)))
[docs] @staticmethod def not_equals(selector: str, value: Any) -> "Policy": """ Create a policy that expects a selected value to be distinct from another. Arguments --------- selector A jq-like selector. value The value that the value pointed to by the selector should not match. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.not_equals(".foo", 42) assert policy.matches({ "foo": 1337 }) """ return Policy( OperatorPolicy(Selector.parse(selector), NotEqualsOperator(value)) )
[docs] @staticmethod def any_of(selector: str, values: List[Any]) -> "Policy": """ Create a policy that expects a selected value to match an element from a list. Arguments --------- selector A jq-like selector. values The values to be checked. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.any_of(".foo", [42, 1337]) assert policy.matches({ "foo": 42 }) assert policy.matches({ "foo": 1337 }) """ return Policy(OperatorPolicy(Selector.parse(selector), AnyOfOperator(values)))
[docs] @staticmethod def and_(policies: List["Policy"]) -> "Policy": """ Create a policy that expects all sub-policies to be valid. Arguments --------- policies The policies that must be valid. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.and_([ Policy.equals(".foo", 42), Policy.equals(".bar", 1337) ]) assert policy.matches({ "foo": 42, "bar": 1337 }) """ return Policy(AndConnector(policies))
[docs] @staticmethod def or_(policies: List["Policy"]) -> "Policy": """ Create a policy that expects at least one sub-policy to be valid. Arguments --------- policies The policies to be checked. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.or_([ Policy.equals(".foo", 42), Policy.equals(".bar", 1337) ]) assert policy.matches({ "foo": 42, "bar": 100 }) """ return Policy(OrConnector(policies))
[docs] @staticmethod def not_(policy: "Policy") -> "Policy": """ Create a policy that expects a policy to be invalid. Arguments --------- policy The policy to be checked. Example ------- .. code-block:: py3 from nuc.policy import Policy policy = Policy.not_(Policy.equals(".foo", 42)) assert policy.matches({ "foo": 1337 }) """ return Policy(NotConnector(policy))
def _ensure_list(data: Any) -> List[Any]: if not isinstance(data, list): raise MalformedPolicyException("expected list") return data def _pop_next(keys: Any, expected: str) -> Any: if not isinstance(keys, list): raise MalformedPolicyException("policy must be a list") if not keys: raise MalformedPolicyException(f"invalid policy: expected {expected}") return keys.pop(0) def _parse_policies(keys: Any) -> List[Policy]: if not isinstance(keys, list): raise MalformedPolicyException("expected a list of policies") policies = [] for element in keys: if not isinstance(element, list): raise MalformedPolicyException("expected a policy list") policies.append(Policy.parse(element)) return policies
[docs] class MalformedPolicyException(Exception): """ An exception that indicates a policy was malformed. """