"""
NUC tokens.
"""
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, List
from .policy import Policy
_DID_PATTERN = re.compile("did:nil:([a-zA-Z0-9]{66})")
_HEX_PATTERN = re.compile("[a-zA-Z0-9]+")
[docs]
@dataclass(frozen=True)
class Did:
"""
A class representing a Decentralized Identifier (DID).
"""
public_key: bytes
[docs]
@staticmethod
def nil(public_key: bytes) -> "Did":
"""
Construct a new DID for the "nil" method.
Arguments
---------
public_key
The public key in compressed form.
"""
return Did(public_key)
[docs]
@staticmethod
def parse(data: str) -> "Did":
"""
Parse a DID from a string.
"""
matches = _DID_PATTERN.findall(data)
if not matches:
raise MalformedDidException("invalid DID")
public_key = matches[0]
try:
public_key = bytes.fromhex(public_key)
except Exception as ex:
raise MalformedDidException("invalid hex public key") from ex
return Did(public_key)
[docs]
def __str__(self) -> str:
return f"did:nil:{self.public_key.hex()}"
[docs]
@dataclass
class Command:
"""
A command to be invoked.
"""
segments: List[str]
[docs]
@staticmethod
def parse(data: str) -> "Command":
"""
Parse a command from a string.
Example
-------
.. code-block:: py3
from nuc.token import Command
command = Command.parse("/nil/db/read")
"""
if not data.startswith("/"):
raise MalformedCommandException("commands must start with '/'")
data = data[1:]
if not data:
return Command([])
segments = []
for segment in data.split("/"):
if not segment:
raise MalformedCommandException("empty segment")
segments.append(segment)
return Command(segments)
[docs]
def is_attenuation_of(self, other: "Command") -> bool:
"""
Check if this command is an attenuation of another one.
Example
-------
.. code-block:: py3
from nuc.token import Command
parent = Command.parse("/nil/db")
child = Command.parse("/nil/db/read")
assert child.is_attenuation_of(parent)
"""
if len(self.segments) < len(other.segments):
return False
our_segments = self.segments[: len(other.segments)]
return other.segments == our_segments
[docs]
def __str__(self) -> str:
return "/" + "/".join(self.segments)
[docs]
@dataclass
class InvocationBody:
"""
The body of an invocation.
"""
args: Dict[str, Any]
[docs]
@dataclass
class DelegationBody:
"""
The body of a delegation.
"""
policies: List[Policy]
# pylint: disable=R0902
[docs]
@dataclass
class NucToken:
"""
A class representing a NUC token.
"""
issuer: Did
audience: Did
subject: Did
not_before: datetime | None
expires_at: datetime | None
command: Command
body: InvocationBody | DelegationBody
meta: Dict[str, Any] | None
nonce: bytes
proofs: List[bytes]
[docs]
@staticmethod
def parse(raw_json: str | bytes) -> "NucToken":
"""
Parse a NUC token from a string.
Arguments
---------
raw_json
The raw JSON to be parsed.
Example
-------
.. code-block:: py3
from nuc.token import NucToken
token = NucToken.parse(
{
"iss": "did:nil:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"aud": "did:nil:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
"sub": "did:nil:cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
"cmd": "/nil/db/read",
"pol": [["==", ".foo", 42]],
"nonce": "beef",
}
)
"""
try:
data = json.loads(raw_json)
except Exception as ex:
raise MalformedNucTokenException(f"invalid JSON: {ex}") from ex
token = _safe_cast(data, dict)
issuer = Did.parse(_safe_get(token, "iss", str))
audience = Did.parse(_safe_get(token, "aud", str))
subject = Did.parse(_safe_get(token, "sub", str))
not_before = _safe_optional_get(token, "nbf", int)
expires_at = _safe_optional_get(token, "exp", int)
command = Command.parse(_safe_get(token, "cmd", str))
args = _safe_optional_get(token, "args", dict)
policies = _safe_optional_get(token, "pol", list)
match (args, policies):
case (None, None):
raise MalformedNucTokenException("one of 'args' and 'pol' must bet set")
case (dict(), None):
body = InvocationBody(args)
case (None, list()):
body = DelegationBody([Policy.parse(p) for p in policies])
case (dict(), list()):
raise MalformedNucTokenException("'args' and 'pol' can't both be set")
meta = _safe_optional_get(token, "meta", dict)
nonce = _parse_hex(_safe_get(token, "nonce", str), "nonce")
proofs = [
_parse_hex(p, "prf") for p in _safe_optional_get(token, "prf", list) or []
]
if data:
raise MalformedNucTokenException(
f"unexpected keys found in token: {data.keys()}"
)
return NucToken(
issuer,
audience,
subject,
datetime.fromtimestamp(not_before, timezone.utc) if not_before else None,
datetime.fromtimestamp(expires_at, timezone.utc) if expires_at else None,
command,
body,
meta,
nonce,
proofs,
)
[docs]
def to_json(self) -> Dict[str, Any]:
"""
Convert this token into JSON.
"""
match self.body:
case InvocationBody(args):
body = {"args": args}
case DelegationBody(policies):
body = {"pol": [policy.serialize() for policy in policies]}
return {
"iss": str(self.issuer),
"aud": str(self.audience),
"sub": str(self.subject),
**({"nbf": int(self.not_before.timestamp())} if self.not_before else {}),
**({"exp": int(self.expires_at.timestamp())} if self.expires_at else {}),
"cmd": str(self.command),
**body,
**({"meta": self.meta} if self.meta else {}),
"nonce": self.nonce.hex(),
**({"prf": [proof.hex() for proof in self.proofs]} if self.proofs else {}),
}
[docs]
def __str__(self) -> str:
return json.dumps(self.to_json())
def _safe_cast[T](data: Any, ty: type[T]) -> T:
if not isinstance(data, ty):
raise MalformedNucTokenException(f"expected {ty.__name__}")
return data
def _safe_optional_get[T](d: Dict, key: str, ty: type[T]) -> T | None:
value = d.pop(key, None)
if value is None:
return None
if not isinstance(value, ty):
raise MalformedNucTokenException(f"expected {ty.__name__} for key {key}")
return value
def _safe_get[T](d: Dict, key: str, ty: type[T]) -> T:
value = _safe_optional_get(d, key, ty)
if value is None:
raise MalformedNucTokenException(f"expected {ty.__name__} for key {key}")
return value
def _parse_hex(data: str, field: str) -> bytes:
# bytes.fromhex accepts newlines and whitespaces...
if not _HEX_PATTERN.match(data):
raise MalformedNucTokenException(f"invalid hex in {field}")
try:
return bytes.fromhex(data)
except Exception as ex:
raise MalformedNucTokenException(f"invalid hex in {field}") from ex