Files
PeakRDL-BusDecoder/tests/cocotb_lib/protocol_utils.py
Arnav Sacheti bad845d15e Fix/better spec enforcing (#41)
* revamp

* consolidate

* version bump
2026-02-03 00:03:04 -08:00

102 lines
2.9 KiB
Python

"""Shared helpers for cocotb smoke tests."""
from __future__ import annotations
import json
import os
from collections.abc import Iterable
from typing import Any
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, Timer
from tests.cocotb_lib.handle_utils import resolve_handle
def load_config() -> dict[str, Any]:
"""Read the JSON payload describing the generated register topology."""
payload = os.environ.get("RDL_TEST_CONFIG")
if payload is None:
raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
return json.loads(payload)
def resolve(handle, indices: Iterable[int]):
"""Index into hierarchical cocotb handles."""
return resolve_handle(handle, indices)
def set_value(handle, indices: Iterable[int], value: int) -> None:
resolve(handle, indices).value = value
def get_int(handle, indices: Iterable[int]) -> int:
return int(resolve(handle, indices).value)
def all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def find_invalid_address(config: dict[str, Any]) -> int | None:
"""Return an address outside any master/array span, or None if fully covered."""
addr_width = config["address_width"]
max_addr = 1 << addr_width
ranges: list[tuple[int, int]] = []
for master in config["masters"]:
inst_address = master["inst_address"]
inst_size = master["inst_size"]
n_elems = 1
if master.get("is_array"):
for dim in master.get("dimensions", []):
n_elems *= dim
span = inst_size * n_elems
ranges.append((inst_address, inst_address + span))
ranges.sort()
cursor = 0
for start, end in ranges:
if cursor < start:
return cursor
cursor = max(cursor, end)
if cursor < max_addr:
return cursor
return None
async def start_clock(clk_handle, period_ns: int = 2) -> None:
"""Start a simple clock if handle is present."""
if clk_handle is None:
return
clk_handle.value = 0
cocotb.start_soon(Clock(clk_handle, period_ns, unit="ns").start())
await RisingEdge(clk_handle)
async def apb_setup(slave, addr: int, write: bool, data: int, *, strobe_mask: int | None = None) -> None:
"""APB setup phase helper."""
if hasattr(slave, "PPROT"):
slave.PPROT.value = 0
if hasattr(slave, "PSTRB"):
if strobe_mask is None:
strobe_mask = (1 << len(slave.PSTRB)) - 1
slave.PSTRB.value = strobe_mask
slave.PADDR.value = addr
slave.PWDATA.value = data
slave.PWRITE.value = 1 if write else 0
slave.PSEL.value = 1
slave.PENABLE.value = 0
await Timer(1, unit="ns")
async def apb_access(slave) -> None:
"""APB access phase helper."""
slave.PENABLE.value = 1
await Timer(1, unit="ns")