diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..6d2e586 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,22 @@ +FROM verilator/verilator:latest + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + python3 \ + python3-venv \ + python3-pip \ + python3-dev \ + build-essential \ + pkg-config \ + git \ + curl \ + ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +ENV UV_INSTALL_DIR=/usr/local/bin +ENV UV_LINK_MODE=copy +# Install uv globally so both VS Code and terminals can use it +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +RUN uv --version diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..ec3e3a9 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,33 @@ +{ + "name": "PeakRDL BusDecoder", + "build": { + "dockerfile": "Dockerfile" + }, + "runArgs": [ + "--init" + ], + "features": { + "ghcr.io/devcontainers/features/common-utils:2": { + "username": "vscode", + "uid": "1000", + "gid": "1000", + "installZsh": "false", + "installOhMyZsh": "false" + } + }, + "remoteUser": "vscode", + "postCreateCommand": "uv sync --frozen --all-extras --group tools --group test", + "customizations": { + "vscode": { + "settings": { + "python.defaultInterpreterPath": ".venv/bin/python", + "terminal.integrated.shell.linux": "/bin/bash" + }, + "extensions": [ + "ms-python.python", + "ms-python.vscode-pylance", + "ms-vscode.cpptools" + ] + } + } +} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5231d35..ed90c13 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,6 +14,8 @@ on: jobs: test: runs-on: ubuntu-latest + container: + image: verilator/verilator:latest permissions: contents: read strategy: @@ -27,19 +29,21 @@ jobs: uses: astral-sh/setup-uv@v3 with: python-version: ${{ matrix.python-version }} + enable-cache: true - - name: Install Verilator + - name: Check Verilator version + run: verilator --version + + - name: Install Python development packages run: | - sudo apt-get update - sudo apt-get install -y verilator - verilator --version + apt-get update && apt-get install -y python3-dev libpython3-dev - name: Install dependencies run: | uv sync --all-extras --group test - name: Run tests - run: uv run pytest tests/ -v --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term + run: uv run pytest tests/ --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 diff --git a/pyproject.toml b/pyproject.toml index 8cd9acb..fb41594 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,11 @@ build-backend = "setuptools.build_meta" [project] name = "peakrdl-busdecoder" -version = "0.4.0" +version = "0.5.0" requires-python = ">=3.10" dependencies = ["jinja2>=3.1.6", "systemrdl-compiler~=1.30.1"] -authors = [{ name = "Alex Mykyta" }] +authors = [{ name = "Arnav Sacheti" }] description = "Generate a SystemVerilog bus decoder from SystemRDL for splitting CPU interfaces to multiple sub-address spaces" readme = "README.md" license = { text = "LGPLv3" } @@ -114,3 +114,4 @@ markers = [ "simulation: marks tests as requiring cocotb simulation (deselect with '-m \"not simulation\"')", "verilator: marks tests as requiring verilator simulator (deselect with '-m \"not verilator\"')", ] +filterwarnings = ["error", "ignore::UserWarning"] diff --git a/src/peakrdl_busdecoder/cpuif/apb3/apb3_cpuif_flat.py b/src/peakrdl_busdecoder/cpuif/apb3/apb3_cpuif_flat.py index 2620ccf..944d2ad 100644 --- a/src/peakrdl_busdecoder/cpuif/apb3/apb3_cpuif_flat.py +++ b/src/peakrdl_busdecoder/cpuif/apb3/apb3_cpuif_flat.py @@ -35,15 +35,15 @@ class APB3CpuifFlat(BaseCpuif): def fanout(self, node: AddressableNode) -> str: fanout: dict[str, str] = {} - fanout[self.signal("PSEL", node)] = ( - f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" + fanout[self.signal("PSEL", node, "gi")] = ( + f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}" ) - fanout[self.signal("PENABLE", node)] = self.signal("PENABLE") - fanout[self.signal("PWRITE", node)] = ( - f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" + fanout[self.signal("PENABLE", node, "gi")] = self.signal("PENABLE") + fanout[self.signal("PWRITE", node, "gi")] = ( + f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}" ) - fanout[self.signal("PADDR", node)] = self.signal("PADDR") - fanout[self.signal("PWDATA", node)] = "cpuif_wr_data" + fanout[self.signal("PADDR", node, "gi")] = self.signal("PADDR") + fanout[self.signal("PWDATA", node, "gi")] = "cpuif_wr_data" return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items())) @@ -53,8 +53,8 @@ class APB3CpuifFlat(BaseCpuif): fanin["cpuif_rd_ack"] = "'0" fanin["cpuif_rd_err"] = "'0" else: - fanin["cpuif_rd_ack"] = self.signal("PREADY", node) - fanin["cpuif_rd_err"] = self.signal("PSLVERR", node) + fanin["cpuif_rd_ack"] = self.signal("PREADY", node, "i") + fanin["cpuif_rd_err"] = self.signal("PSLVERR", node, "i") return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) @@ -63,6 +63,6 @@ class APB3CpuifFlat(BaseCpuif): if node is None: fanin["cpuif_rd_data"] = "'0" else: - fanin["cpuif_rd_data"] = self.signal("PRDATA", node) + fanin["cpuif_rd_data"] = self.signal("PRDATA", node, "i") return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) diff --git a/src/peakrdl_busdecoder/cpuif/apb4/apb4_cpuif_flat.py b/src/peakrdl_busdecoder/cpuif/apb4/apb4_cpuif_flat.py index 357c46b..17ec392 100644 --- a/src/peakrdl_busdecoder/cpuif/apb4/apb4_cpuif_flat.py +++ b/src/peakrdl_busdecoder/cpuif/apb4/apb4_cpuif_flat.py @@ -35,17 +35,17 @@ class APB4CpuifFlat(BaseCpuif): def fanout(self, node: AddressableNode) -> str: fanout: dict[str, str] = {} - fanout[self.signal("PSEL", node)] = ( - f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" + fanout[self.signal("PSEL", node, "gi")] = ( + f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}" ) - fanout[self.signal("PENABLE", node)] = self.signal("PENABLE") - fanout[self.signal("PWRITE", node)] = ( - f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" + fanout[self.signal("PENABLE", node, "gi")] = self.signal("PENABLE") + fanout[self.signal("PWRITE", node, "gi")] = ( + f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}" ) - fanout[self.signal("PADDR", node)] = self.signal("PADDR") - fanout[self.signal("PPROT", node)] = self.signal("PPROT") - fanout[self.signal("PWDATA", node)] = "cpuif_wr_data" - fanout[self.signal("PSTRB", node)] = "cpuif_wr_byte_en" + fanout[self.signal("PADDR", node, "gi")] = self.signal("PADDR") + fanout[self.signal("PPROT", node, "gi")] = self.signal("PPROT") + fanout[self.signal("PWDATA", node, "gi")] = "cpuif_wr_data" + fanout[self.signal("PSTRB", node, "gi")] = "cpuif_wr_byte_en" return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items())) @@ -55,8 +55,8 @@ class APB4CpuifFlat(BaseCpuif): fanin["cpuif_rd_ack"] = "'0" fanin["cpuif_rd_err"] = "'0" else: - fanin["cpuif_rd_ack"] = self.signal("PREADY", node) - fanin["cpuif_rd_err"] = self.signal("PSLVERR", node) + fanin["cpuif_rd_ack"] = self.signal("PREADY", node, "i") + fanin["cpuif_rd_err"] = self.signal("PSLVERR", node, "i") return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) @@ -65,6 +65,6 @@ class APB4CpuifFlat(BaseCpuif): if node is None: fanin["cpuif_rd_data"] = "'0" else: - fanin["cpuif_rd_data"] = self.signal("PRDATA", node) + fanin["cpuif_rd_data"] = self.signal("PRDATA", node, "i") return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) diff --git a/src/peakrdl_busdecoder/cpuif/interface.py b/src/peakrdl_busdecoder/cpuif/interface.py index 8971aa8..6c27451 100644 --- a/src/peakrdl_busdecoder/cpuif/interface.py +++ b/src/peakrdl_busdecoder/cpuif/interface.py @@ -1,10 +1,13 @@ """Interface abstraction for handling flat and non-flat signal declarations.""" +import re from abc import ABC, abstractmethod from typing import TYPE_CHECKING from systemrdl.node import AddressableNode +from ..utils import get_indexed_path + if TYPE_CHECKING: from .base_cpuif import BaseCpuif @@ -93,7 +96,6 @@ class SVInterface(Interface): indexer: str | int | None = None, ) -> str: """Generate SystemVerilog interface signal reference.""" - from ..utils import get_indexed_path # SVInterface only supports string indexers (loop variable names like "i", "gi") if indexer is not None and not isinstance(indexer, str): @@ -166,6 +168,13 @@ class FlatInterface(Interface): # Is an array if indexer is not None: + if isinstance(indexer, str): + indexed_path = get_indexed_path(node.parent, node, indexer, skip_kw_filter=True) + pattern = r"\[.*?\]" + indexes = re.findall(pattern, indexed_path) + + return f"{base}_{signal}{''.join(indexes)}" + return f"{base}_{signal}[{indexer}]" return f"{base}_{signal}[N_{node.inst_name.upper()}S]" diff --git a/src/peakrdl_busdecoder/decode_logic_gen.py b/src/peakrdl_busdecoder/decode_logic_gen.py index 90ff59f..3180f6e 100644 --- a/src/peakrdl_busdecoder/decode_logic_gen.py +++ b/src/peakrdl_busdecoder/decode_logic_gen.py @@ -70,7 +70,9 @@ class DecodeLogicGenerator(BusDecoderListener): # Avoid generating a redundant >= 0 comparison, which triggers Verilator warnings. if not (l_bound.value == 0 and len(l_bound_comp) == 1): predicates.append(lower_expr) - predicates.append(upper_expr) + # Avoid generating a redundant full-width < max comparison, which triggers Verilator warnings. + if not (u_bound.value == (1 << addr_width) and len(u_bound_comp) == 1): + predicates.append(upper_expr) return predicates diff --git a/src/peakrdl_busdecoder/sv_int.py b/src/peakrdl_busdecoder/sv_int.py index 96d47ec..4b7ccb8 100644 --- a/src/peakrdl_busdecoder/sv_int.py +++ b/src/peakrdl_busdecoder/sv_int.py @@ -1,3 +1,6 @@ +from typing import Literal + + class SVInt: def __init__(self, value: int, width: int | None = None) -> None: self.value = value @@ -19,3 +22,27 @@ class SVInt: return SVInt(self.value + other.value, max(self.width, other.width)) else: return SVInt(self.value + other.value, None) + + def __sub__(self, other: "SVInt") -> "SVInt": + if self.width is not None and other.width is not None: + return SVInt(self.value - other.value, max(self.width, other.width)) + else: + return SVInt(self.value - other.value, None) + + def __len__(self) -> int: + if self.width is not None: + return self.width + else: + return self.value.bit_length() + + def to_bytes(self, byteorder: Literal["little", "big"] = "little") -> bytes: + byte_length = (self.value.bit_length() + 7) // 8 + return self.value.to_bytes(byte_length, byteorder) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, SVInt): + return NotImplemented + return self.value == other.value and self.width == other.width + + def __hash__(self) -> int: + return hash((self.value, self.width)) diff --git a/tests/cocotb/apb3/smoke/test_register_access.py b/tests/cocotb/apb3/smoke/test_register_access.py index be4873f..faeac39 100644 --- a/tests/cocotb/apb3/smoke/test_register_access.py +++ b/tests/cocotb/apb3/smoke/test_register_access.py @@ -1,15 +1,19 @@ -"""APB3 smoke tests for generated multi-register design.""" +"""APB3 smoke tests generated from SystemRDL sources.""" + +from __future__ import annotations + +import json +import os +from typing import Any, Iterable import cocotb from cocotb.triggers import Timer -WRITE_ADDR = 0x0 -READ_ADDR = 0x8 -WRITE_DATA = 0xCAFEBABE -READ_DATA = 0x0BAD_F00D - +from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle class _Apb3SlaveShim: + """Accessor for the APB3 slave signals on the DUT.""" + def __init__(self, dut): prefix = "s_apb" self.PSEL = getattr(dut, f"{prefix}_PSEL") @@ -22,102 +26,161 @@ class _Apb3SlaveShim: self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") -class _Apb3MasterShim: - def __init__(self, dut, base: str): - self.PSEL = getattr(dut, f"{base}_PSEL") - self.PENABLE = getattr(dut, f"{base}_PENABLE") - self.PWRITE = getattr(dut, f"{base}_PWRITE") - self.PADDR = getattr(dut, f"{base}_PADDR") - self.PWDATA = getattr(dut, f"{base}_PWDATA") - self.PRDATA = getattr(dut, f"{base}_PRDATA") - self.PREADY = getattr(dut, f"{base}_PREADY") - self.PSLVERR = getattr(dut, f"{base}_PSLVERR") +def _load_config() -> dict[str, Any]: + payload = os.environ.get("RDL_TEST_CONFIG") + if payload is None: + raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") + return json.loads(payload) -def _apb3_slave(dut): - return getattr(dut, "s_apb", None) or _Apb3SlaveShim(dut) +def _resolve(handle, indices: Iterable[int]): + return resolve_handle(handle, indices) -def _apb3_master(dut, base: str): - return getattr(dut, base, None) or _Apb3MasterShim(dut, base) +def _set_value(handle, indices: Iterable[int], value: int) -> None: + _resolve(handle, indices).value = value + + +def _get_int(handle, indices: Iterable[int]) -> int: + return int(_resolve(handle, indices).value) + + +def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + table: dict[str, dict[str, Any]] = {} + for master in masters_cfg: + prefix = master["port_prefix"] + entry = { + "indices": [tuple(idx) for idx in master["indices"]] or [tuple()], + "outputs": { + "PSEL": SignalHandle(dut, f"{prefix}_PSEL"), + "PENABLE": SignalHandle(dut, f"{prefix}_PENABLE"), + "PWRITE": SignalHandle(dut, f"{prefix}_PWRITE"), + "PADDR": SignalHandle(dut, f"{prefix}_PADDR"), + "PWDATA": SignalHandle(dut, f"{prefix}_PWDATA"), + }, + "inputs": { + "PRDATA": SignalHandle(dut, f"{prefix}_PRDATA"), + "PREADY": SignalHandle(dut, f"{prefix}_PREADY"), + "PSLVERR": SignalHandle(dut, f"{prefix}_PSLVERR"), + }, + } + table[master["inst_name"]] = entry + return table + + +def _all_index_pairs(table: dict[str, dict[str, Any]]): + for name, entry in table.items(): + for idx in entry["indices"]: + yield name, idx + + +def _write_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address * 0x2041) ^ 0xCAFEBABE) & mask + + +def _read_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address ^ 0x0BAD_F00D) + width) & mask @cocotb.test() -async def test_apb3_read_write_paths(dut): - """Exercise APB3 slave interface and observe master fanout.""" - s_apb = _apb3_slave(dut) - masters = { - "reg1": _apb3_master(dut, "m_apb_reg1"), - "reg2": _apb3_master(dut, "m_apb_reg2"), - "reg3": _apb3_master(dut, "m_apb_reg3"), - } +async def test_apb3_address_decoding(dut) -> None: + """Exercise the APB3 slave interface against sampled register addresses.""" + config = _load_config() + slave = _Apb3SlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - s_apb.PWRITE.value = 0 - s_apb.PADDR.value = 0 - s_apb.PWDATA.value = 0 + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + slave.PADDR.value = 0 + slave.PWDATA.value = 0 - for master in masters.values(): - master.PRDATA.value = 0 - master.PREADY.value = 0 - master.PSLVERR.value = 0 + for master_name, idx in _all_index_pairs(masters): + entry = masters[master_name] + _set_value(entry["inputs"]["PRDATA"], idx, 0) + _set_value(entry["inputs"]["PREADY"], idx, 0) + _set_value(entry["inputs"]["PSLVERR"], idx, 0) await Timer(1, units="ns") - # Write to reg1 - masters["reg1"].PREADY.value = 1 - s_apb.PADDR.value = WRITE_ADDR - s_apb.PWDATA.value = WRITE_DATA - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 + addr_mask = (1 << config["address_width"]) - 1 - await Timer(1, units="ns") + for txn in config["transactions"]: + master_name = txn["master"] + index = tuple(txn["index"]) + entry = masters[master_name] - assert int(masters["reg1"].PSEL.value) == 1, "reg1 should be selected for write" - assert int(masters["reg1"].PWRITE.value) == 1, "Write should propagate to master" - assert int(masters["reg1"].PADDR.value) == WRITE_ADDR, "Address should reach selected master" - assert int(masters["reg1"].PWDATA.value) == WRITE_DATA, "Write data should fan out" + address = txn["address"] & addr_mask + write_data = _write_pattern(address, config["data_width"]) - for name, master in masters.items(): - if name != "reg1": - assert int(master.PSEL.value) == 0, f"{name} must idle during reg1 write" + _set_value(entry["inputs"]["PREADY"], index, 1) + _set_value(entry["inputs"]["PSLVERR"], index, 0) - assert int(s_apb.PREADY.value) == 1, "Ready must reflect selected master" - assert int(s_apb.PSLVERR.value) == 0, "Write should not signal error" + slave.PADDR.value = address + slave.PWDATA.value = write_data + slave.PWRITE.value = 1 + slave.PSEL.value = 1 + slave.PENABLE.value = 1 - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - s_apb.PWRITE.value = 0 - masters["reg1"].PREADY.value = 0 - await Timer(1, units="ns") + await Timer(1, units="ns") - # Read from reg3 - masters["reg3"].PRDATA.value = READ_DATA - masters["reg3"].PREADY.value = 1 - masters["reg3"].PSLVERR.value = 0 + assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" + assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction" + assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address" + assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data" - s_apb.PADDR.value = READ_ADDR - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - s_apb.PWRITE.value = 0 + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0 + ), f"{other_name}{other_idx} should remain idle during {txn['label']}" - await Timer(1, units="ns") + assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master" + assert int(slave.PSLVERR.value) == 0, "Write should complete without error" - assert int(masters["reg3"].PSEL.value) == 1, "reg3 should be selected for read" - assert int(masters["reg3"].PWRITE.value) == 0, "Read should clear write" - assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Address should reach read target" + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + _set_value(entry["inputs"]["PREADY"], index, 0) + await Timer(1, units="ns") - for name, master in masters.items(): - if name != "reg3": - assert int(master.PSEL.value) == 0, f"{name} must idle during reg3 read" + # ------------------------------------------------------------------ + # Read phase + # ------------------------------------------------------------------ + read_data = _read_pattern(address, config["data_width"]) + _set_value(entry["inputs"]["PRDATA"], index, read_data) + _set_value(entry["inputs"]["PREADY"], index, 1) + _set_value(entry["inputs"]["PSLVERR"], index, 0) - assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return to slave" - assert int(s_apb.PREADY.value) == 1, "Read should acknowledge" - assert int(s_apb.PSLVERR.value) == 0, "Read should not signal error" + slave.PADDR.value = address + slave.PWRITE.value = 0 + slave.PSEL.value = 1 + slave.PENABLE.value = 1 - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - masters["reg3"].PREADY.value = 0 - await Timer(1, units="ns") + await Timer(1, units="ns") + + assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" + assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should clear write during read" + assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address" + + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0 + ), f"{other_name}{other_idx} must stay idle during read of {txn['label']}" + + assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave" + assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read" + assert int(slave.PSLVERR.value) == 0, "Read should not signal an error" + + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + _set_value(entry["inputs"]["PREADY"], index, 0) + _set_value(entry["inputs"]["PRDATA"], index, 0) + await Timer(1, units="ns") diff --git a/tests/cocotb/apb3/smoke/test_runner.py b/tests/cocotb/apb3/smoke/test_runner.py index 210338e..9b11bb9 100644 --- a/tests/cocotb/apb3/smoke/test_runner.py +++ b/tests/cocotb/apb3/smoke/test_runner.py @@ -1,5 +1,8 @@ -"""Pytest wrapper launching the APB3 cocotb smoke test.""" +"""Pytest wrapper launching the APB3 cocotb smoke tests.""" +from __future__ import annotations + +import json from pathlib import Path import pytest @@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner -from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources +from tests.cocotb_lib import RDL_CASES +from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation @pytest.mark.verilator -def test_apb3_smoke(tmp_path: Path) -> None: - """Compile the APB3 design and execute the cocotb smoke test.""" +@pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES]) +def test_apb3_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None: + """Compile each APB3 design variant and execute the cocotb smoke test.""" repo_root = Path(__file__).resolve().parents[4] + rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file + build_root = tmp_path / top_name - module_path, package_path = compile_rdl_and_export( - str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), - "multi_reg", - tmp_path, + module_path, package_path, config = prepare_cpuif_case( + str(rdl_path), + top_name, + build_root, cpuif_cls=APB3CpuifFlat, + control_signal="PSEL", ) sources = get_verilog_sources( @@ -34,17 +42,18 @@ def test_apb3_smoke(tmp_path: Path) -> None: ) runner = get_runner("verilator") - build_dir = tmp_path / "sim_build" + sim_build = build_root / "sim_build" runner.build( sources=sources, hdl_toplevel=module_path.stem, - build_dir=build_dir, + build_dir=sim_build, ) runner.test( hdl_toplevel=module_path.stem, test_module="tests.cocotb.apb3.smoke.test_register_access", - build_dir=build_dir, - log_file=str(tmp_path / "sim.log"), + build_dir=sim_build, + log_file=str(build_root / "simulation.log"), + extra_env={"RDL_TEST_CONFIG": json.dumps(config)}, ) diff --git a/tests/cocotb/apb4/smoke/test_register_access.py b/tests/cocotb/apb4/smoke/test_register_access.py index d27078b..6dac661 100644 --- a/tests/cocotb/apb4/smoke/test_register_access.py +++ b/tests/cocotb/apb4/smoke/test_register_access.py @@ -1,15 +1,19 @@ -"""APB4 smoke tests using generated multi-register design.""" +"""APB4 smoke tests generated from SystemRDL sources.""" + +from __future__ import annotations + +import json +import os +from typing import Any, Iterable import cocotb from cocotb.triggers import Timer -WRITE_ADDR = 0x4 -READ_ADDR = 0x8 -WRITE_DATA = 0x1234_5678 -READ_DATA = 0x89AB_CDEF - +from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle class _Apb4SlaveShim: + """Lightweight accessor for the APB4 slave side of the DUT.""" + def __init__(self, dut): prefix = "s_apb" self.PSEL = getattr(dut, f"{prefix}_PSEL") @@ -24,115 +28,175 @@ class _Apb4SlaveShim: self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") -class _Apb4MasterShim: - def __init__(self, dut, base: str): - self.PSEL = getattr(dut, f"{base}_PSEL") - self.PENABLE = getattr(dut, f"{base}_PENABLE") - self.PWRITE = getattr(dut, f"{base}_PWRITE") - self.PADDR = getattr(dut, f"{base}_PADDR") - self.PPROT = getattr(dut, f"{base}_PPROT") - self.PWDATA = getattr(dut, f"{base}_PWDATA") - self.PSTRB = getattr(dut, f"{base}_PSTRB") - self.PRDATA = getattr(dut, f"{base}_PRDATA") - self.PREADY = getattr(dut, f"{base}_PREADY") - self.PSLVERR = getattr(dut, f"{base}_PSLVERR") +def _load_config() -> dict[str, Any]: + """Read the JSON payload describing the generated register topology.""" + payload = os.environ.get("RDL_TEST_CONFIG") + if payload is None: + raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") + return json.loads(payload) -def _apb4_slave(dut): - return getattr(dut, "s_apb", None) or _Apb4SlaveShim(dut) +def _resolve(handle, indices: Iterable[int]): + """Index into hierarchical cocotb handles.""" + return resolve_handle(handle, indices) -def _apb4_master(dut, base: str): - return getattr(dut, base, None) or _Apb4MasterShim(dut, base) +def _set_value(handle, indices: Iterable[int], value: int) -> None: + _resolve(handle, indices).value = value + + +def _get_int(handle, indices: Iterable[int]) -> int: + return int(_resolve(handle, indices).value) + + +def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + table: dict[str, dict[str, Any]] = {} + for master in masters_cfg: + port_prefix = master["port_prefix"] + entry = { + "port_prefix": port_prefix, + "indices": [tuple(idx) for idx in master["indices"]] or [tuple()], + "outputs": { + "PSEL": SignalHandle(dut, f"{port_prefix}_PSEL"), + "PENABLE": SignalHandle(dut, f"{port_prefix}_PENABLE"), + "PWRITE": SignalHandle(dut, f"{port_prefix}_PWRITE"), + "PADDR": SignalHandle(dut, f"{port_prefix}_PADDR"), + "PPROT": SignalHandle(dut, f"{port_prefix}_PPROT"), + "PWDATA": SignalHandle(dut, f"{port_prefix}_PWDATA"), + "PSTRB": SignalHandle(dut, f"{port_prefix}_PSTRB"), + }, + "inputs": { + "PRDATA": SignalHandle(dut, f"{port_prefix}_PRDATA"), + "PREADY": SignalHandle(dut, f"{port_prefix}_PREADY"), + "PSLVERR": SignalHandle(dut, f"{port_prefix}_PSLVERR"), + }, + } + table[master["inst_name"]] = entry + return table + + +def _all_index_pairs(table: dict[str, dict[str, Any]]): + for name, entry in table.items(): + for idx in entry["indices"]: + yield name, idx + + +def _write_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address * 0x1021) ^ 0x1357_9BDF) & mask + + +def _read_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address ^ 0xDEAD_BEE5) + width) & mask @cocotb.test() -async def test_apb4_read_write_paths(dut): - """Drive APB4 slave signals and observe master activity.""" - s_apb = _apb4_slave(dut) - masters = { - "reg1": _apb4_master(dut, "m_apb_reg1"), - "reg2": _apb4_master(dut, "m_apb_reg2"), - "reg3": _apb4_master(dut, "m_apb_reg3"), - } +async def test_apb4_address_decoding(dut) -> None: + """Drive the APB4 slave interface and verify master fanout across all sampled registers.""" + config = _load_config() + slave = _Apb4SlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) - # Default slave side inputs - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - s_apb.PWRITE.value = 0 - s_apb.PADDR.value = 0 - s_apb.PWDATA.value = 0 - s_apb.PPROT.value = 0 - s_apb.PSTRB.value = 0 + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + slave.PADDR.value = 0 + slave.PPROT.value = 0 + slave.PWDATA.value = 0 + slave.PSTRB.value = 0 - for master in masters.values(): - master.PRDATA.value = 0 - master.PREADY.value = 0 - master.PSLVERR.value = 0 + for master_name, idx in _all_index_pairs(masters): + entry = masters[master_name] + _set_value(entry["inputs"]["PRDATA"], idx, 0) + _set_value(entry["inputs"]["PREADY"], idx, 0) + _set_value(entry["inputs"]["PSLVERR"], idx, 0) await Timer(1, units="ns") - # ------------------------------------------------------------------ - # Write transfer to reg2 - # ------------------------------------------------------------------ - masters["reg2"].PREADY.value = 1 - s_apb.PADDR.value = WRITE_ADDR - s_apb.PWDATA.value = WRITE_DATA - s_apb.PSTRB.value = 0xF - s_apb.PPROT.value = 0 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 + addr_mask = (1 << config["address_width"]) - 1 + strobe_mask = (1 << config["byte_width"]) - 1 - await Timer(1, units="ns") + for txn in config["transactions"]: + master_name = txn["master"] + index = tuple(txn["index"]) + entry = masters[master_name] - assert int(masters["reg2"].PSEL.value) == 1, "reg2 must be selected for write" - assert int(masters["reg2"].PWRITE.value) == 1, "Write strobes should propagate" - assert int(masters["reg2"].PADDR.value) == WRITE_ADDR, "Address should fan out" - assert int(masters["reg2"].PWDATA.value) == WRITE_DATA, "Write data should fan out" + address = txn["address"] & addr_mask + write_data = _write_pattern(address, config["data_width"]) - for name, master in masters.items(): - if name != "reg2": - assert int(master.PSEL.value) == 0, f"{name} should remain idle on write" + # Prime master-side inputs for the write phase + _set_value(entry["inputs"]["PREADY"], index, 1) + _set_value(entry["inputs"]["PSLVERR"], index, 0) - assert int(s_apb.PREADY.value) == 1, "Ready should mirror selected master" - assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful write" + slave.PADDR.value = address + slave.PWDATA.value = write_data + slave.PSTRB.value = strobe_mask + slave.PPROT.value = 0 + slave.PWRITE.value = 1 + slave.PSEL.value = 1 + slave.PENABLE.value = 1 - # Return to idle - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - s_apb.PWRITE.value = 0 - masters["reg2"].PREADY.value = 0 - await Timer(1, units="ns") + await Timer(1, units="ns") - # ------------------------------------------------------------------ - # Read transfer from reg3 - # ------------------------------------------------------------------ - masters["reg3"].PRDATA.value = READ_DATA - masters["reg3"].PREADY.value = 1 - masters["reg3"].PSLVERR.value = 0 + assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" + assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent" + assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address" + assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data" + assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, f"{master_name} must receive full strobes" - s_apb.PADDR.value = READ_ADDR - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - s_apb.PWRITE.value = 0 + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0 + ), f"{other_name}{other_idx} should remain idle during {txn['label']}" - await Timer(1, units="ns") + assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master" + assert int(slave.PSLVERR.value) == 0, "No error expected during write" - assert int(masters["reg3"].PSEL.value) == 1, "reg3 must be selected for read" - assert int(masters["reg3"].PWRITE.value) == 0, "Read should deassert write" - assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Read address should propagate" + # Return to idle for next transaction + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + _set_value(entry["inputs"]["PREADY"], index, 0) + await Timer(1, units="ns") - for name, master in masters.items(): - if name != "reg3": - assert int(master.PSEL.value) == 0, f"{name} should remain idle on read" + # ------------------------------------------------------------------ + # Read phase + # ------------------------------------------------------------------ + read_data = _read_pattern(address, config["data_width"]) + _set_value(entry["inputs"]["PRDATA"], index, read_data) + _set_value(entry["inputs"]["PREADY"], index, 1) + _set_value(entry["inputs"]["PSLVERR"], index, 0) - assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return from master" - assert int(s_apb.PREADY.value) == 1, "Ready must follow selected master" - assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful read" + slave.PADDR.value = address + slave.PWRITE.value = 0 + slave.PSEL.value = 1 + slave.PENABLE.value = 1 - # Back to idle - s_apb.PSEL.value = 0 - s_apb.PENABLE.value = 0 - masters["reg3"].PREADY.value = 0 - await Timer(1, units="ns") + await Timer(1, units="ns") + + assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" + assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should deassert write for reads" + assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address" + + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0 + ), f"{other_name}{other_idx} must stay idle during read of {txn['label']}" + + assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master" + assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master" + assert int(slave.PSLVERR.value) == 0, "Read should complete without error" + + # Reset to idle before progressing + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + _set_value(entry["inputs"]["PREADY"], index, 0) + _set_value(entry["inputs"]["PRDATA"], index, 0) + await Timer(1, units="ns") diff --git a/tests/cocotb/apb4/smoke/test_runner.py b/tests/cocotb/apb4/smoke/test_runner.py index 69f4003..04440ec 100644 --- a/tests/cocotb/apb4/smoke/test_runner.py +++ b/tests/cocotb/apb4/smoke/test_runner.py @@ -1,7 +1,10 @@ -"""Pytest wrapper launching the APB4 cocotb smoke test.""" +"""Pytest wrapper launching the APB4 cocotb smoke tests.""" +from __future__ import annotations + +import json from pathlib import Path - +import logging import pytest from peakrdl_busdecoder.cpuif.apb4.apb4_cpuif_flat import APB4CpuifFlat @@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner -from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources +from tests.cocotb_lib import RDL_CASES +from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation @pytest.mark.verilator -def test_apb4_smoke(tmp_path: Path) -> None: - """Compile the APB4 design and execute the cocotb smoke test.""" +@pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES]) +def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None: + """Compile each APB4 design variant and execute the cocotb smoke test.""" repo_root = Path(__file__).resolve().parents[4] + rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file + build_root = tmp_path / top_name - module_path, package_path = compile_rdl_and_export( - str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), - "multi_reg", - tmp_path, + module_path, package_path, config = prepare_cpuif_case( + str(rdl_path), + top_name, + build_root, cpuif_cls=APB4CpuifFlat, + control_signal="PSEL", ) sources = get_verilog_sources( @@ -34,17 +42,39 @@ def test_apb4_smoke(tmp_path: Path) -> None: ) runner = get_runner("verilator") - build_dir = tmp_path / "sim_build" + sim_build = build_root / "sim_build" + + try: + runner.build( + sources=sources, + hdl_toplevel=module_path.stem, + build_dir=sim_build, + log_file=str(build_root / "build.log"), + ) + except SystemExit as e: + # Print build log on failure for easier debugging + log_path = build_root / "build.log" + if log_path.exists(): + logging.error("\n\n=== Build Log ===\n") + logging.error(log_path.read_text()) + logging.error("\n=== End Build Log ===\n") + if e.code != 0: + raise - runner.build( - sources=sources, - hdl_toplevel=module_path.stem, - build_dir=build_dir, - ) - - runner.test( - hdl_toplevel=module_path.stem, - test_module="tests.cocotb.apb4.smoke.test_register_access", - build_dir=build_dir, - log_file=str(tmp_path / "sim.log"), - ) + try: + runner.test( + hdl_toplevel=module_path.stem, + test_module="tests.cocotb.apb4.smoke.test_register_access", + build_dir=sim_build, + log_file=str(build_root / "simulation.log"), + extra_env={"RDL_TEST_CONFIG": json.dumps(config)}, + ) + except SystemExit as e: + # Print simulation log on failure for easier debugging + log_path = build_root / "simulation.log" + if log_path.exists(): + logging.error("\n\n=== Simulation Log ===\n") + logging.error(log_path.read_text()) + logging.error("\n=== End Simulation Log ===\n") + if e.code != 0: + raise diff --git a/tests/cocotb/axi4lite/smoke/test_register_access.py b/tests/cocotb/axi4lite/smoke/test_register_access.py index d9f20bb..573beda 100644 --- a/tests/cocotb/axi4lite/smoke/test_register_access.py +++ b/tests/cocotb/axi4lite/smoke/test_register_access.py @@ -1,15 +1,19 @@ -"""AXI4-Lite smoke test ensuring address decode fanout works.""" +"""AXI4-Lite smoke test driven from SystemRDL-generated register maps.""" + +from __future__ import annotations + +import json +import os +from typing import Any, Iterable import cocotb from cocotb.triggers import Timer -WRITE_ADDR = 0x4 -READ_ADDR = 0x8 -WRITE_DATA = 0x1357_9BDF -READ_DATA = 0x2468_ACED - +from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle class _AxilSlaveShim: + """Accessor for AXI4-Lite slave ports on the DUT.""" + def __init__(self, dut): prefix = "s_axil" self.AWREADY = getattr(dut, f"{prefix}_AWREADY") @@ -33,129 +37,177 @@ class _AxilSlaveShim: self.RRESP = getattr(dut, f"{prefix}_RRESP") -class _AxilMasterShim: - def __init__(self, dut, base: str): - self.AWREADY = getattr(dut, f"{base}_AWREADY") - self.AWVALID = getattr(dut, f"{base}_AWVALID") - self.AWADDR = getattr(dut, f"{base}_AWADDR") - self.AWPROT = getattr(dut, f"{base}_AWPROT") - self.WREADY = getattr(dut, f"{base}_WREADY") - self.WVALID = getattr(dut, f"{base}_WVALID") - self.WDATA = getattr(dut, f"{base}_WDATA") - self.WSTRB = getattr(dut, f"{base}_WSTRB") - self.BREADY = getattr(dut, f"{base}_BREADY") - self.BVALID = getattr(dut, f"{base}_BVALID") - self.BRESP = getattr(dut, f"{base}_BRESP") - self.ARREADY = getattr(dut, f"{base}_ARREADY") - self.ARVALID = getattr(dut, f"{base}_ARVALID") - self.ARADDR = getattr(dut, f"{base}_ARADDR") - self.ARPROT = getattr(dut, f"{base}_ARPROT") - self.RREADY = getattr(dut, f"{base}_RREADY") - self.RVALID = getattr(dut, f"{base}_RVALID") - self.RDATA = getattr(dut, f"{base}_RDATA") - self.RRESP = getattr(dut, f"{base}_RRESP") +def _load_config() -> dict[str, Any]: + payload = os.environ.get("RDL_TEST_CONFIG") + if payload is None: + raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") + return json.loads(payload) -def _axil_slave(dut): - return getattr(dut, "s_axil", None) or _AxilSlaveShim(dut) +def _resolve(handle, indices: Iterable[int]): + return resolve_handle(handle, indices) -def _axil_master(dut, base: str): - return getattr(dut, base, None) or _AxilMasterShim(dut, base) +def _set_value(handle, indices: Iterable[int], value: int) -> None: + _resolve(handle, indices).value = value + + +def _get_int(handle, indices: Iterable[int]) -> int: + return int(_resolve(handle, indices).value) + + +def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: + table: dict[str, dict[str, Any]] = {} + for master in masters_cfg: + prefix = master["port_prefix"] + entry = { + "indices": [tuple(idx) for idx in master["indices"]] or [tuple()], + "outputs": { + "AWVALID": SignalHandle(dut, f"{prefix}_AWVALID"), + "AWADDR": SignalHandle(dut, f"{prefix}_AWADDR"), + "AWPROT": SignalHandle(dut, f"{prefix}_AWPROT"), + "WVALID": SignalHandle(dut, f"{prefix}_WVALID"), + "WDATA": SignalHandle(dut, f"{prefix}_WDATA"), + "WSTRB": SignalHandle(dut, f"{prefix}_WSTRB"), + "ARVALID": SignalHandle(dut, f"{prefix}_ARVALID"), + "ARADDR": SignalHandle(dut, f"{prefix}_ARADDR"), + "ARPROT": SignalHandle(dut, f"{prefix}_ARPROT"), + }, + "inputs": { + "AWREADY": SignalHandle(dut, f"{prefix}_AWREADY"), + "WREADY": SignalHandle(dut, f"{prefix}_WREADY"), + "BVALID": SignalHandle(dut, f"{prefix}_BVALID"), + "BRESP": SignalHandle(dut, f"{prefix}_BRESP"), + "ARREADY": SignalHandle(dut, f"{prefix}_ARREADY"), + "RVALID": SignalHandle(dut, f"{prefix}_RVALID"), + "RDATA": SignalHandle(dut, f"{prefix}_RDATA"), + "RRESP": SignalHandle(dut, f"{prefix}_RRESP"), + }, + } + table[master["inst_name"]] = entry + return table + + +def _all_index_pairs(table: dict[str, dict[str, Any]]): + for name, entry in table.items(): + for idx in entry["indices"]: + yield name, idx + + +def _write_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address * 0x3105) ^ 0x1357_9BDF) & mask + + +def _read_pattern(address: int, width: int) -> int: + mask = (1 << width) - 1 + return ((address ^ 0x2468_ACED) + width) & mask @cocotb.test() -async def test_axi4lite_read_write_paths(dut): - """Drive AXI4-Lite slave channels and validate master side wiring.""" - s_axil = _axil_slave(dut) - masters = { - "reg1": _axil_master(dut, "m_axil_reg1"), - "reg2": _axil_master(dut, "m_axil_reg2"), - "reg3": _axil_master(dut, "m_axil_reg3"), - } +async def test_axi4lite_address_decoding(dut) -> None: + """Stimulate AXI4-Lite slave channels and verify master port selection.""" + config = _load_config() + slave = _AxilSlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) - # Default slave-side inputs - s_axil.AWVALID.value = 0 - s_axil.AWADDR.value = 0 - s_axil.AWPROT.value = 0 - s_axil.WVALID.value = 0 - s_axil.WDATA.value = 0 - s_axil.WSTRB.value = 0 - s_axil.BREADY.value = 0 - s_axil.ARVALID.value = 0 - s_axil.ARADDR.value = 0 - s_axil.ARPROT.value = 0 - s_axil.RREADY.value = 0 + slave.AWVALID.value = 0 + slave.AWADDR.value = 0 + slave.AWPROT.value = 0 + slave.WVALID.value = 0 + slave.WDATA.value = 0 + slave.WSTRB.value = 0 + slave.BREADY.value = 0 + slave.ARVALID.value = 0 + slave.ARADDR.value = 0 + slave.ARPROT.value = 0 + slave.RREADY.value = 0 - for master in masters.values(): - master.AWREADY.value = 0 - master.WREADY.value = 0 - master.BVALID.value = 0 - master.BRESP.value = 0 - master.ARREADY.value = 0 - master.RVALID.value = 0 - master.RDATA.value = 0 - master.RRESP.value = 0 + for master_name, idx in _all_index_pairs(masters): + entry = masters[master_name] + _set_value(entry["inputs"]["AWREADY"], idx, 0) + _set_value(entry["inputs"]["WREADY"], idx, 0) + _set_value(entry["inputs"]["BVALID"], idx, 0) + _set_value(entry["inputs"]["BRESP"], idx, 0) + _set_value(entry["inputs"]["ARREADY"], idx, 0) + _set_value(entry["inputs"]["RVALID"], idx, 0) + _set_value(entry["inputs"]["RDATA"], idx, 0) + _set_value(entry["inputs"]["RRESP"], idx, 0) await Timer(1, units="ns") - # -------------------------------------------------------------- - # Write transaction targeting reg2 - # -------------------------------------------------------------- - s_axil.AWADDR.value = WRITE_ADDR - s_axil.AWPROT.value = 0 - s_axil.AWVALID.value = 1 - s_axil.WDATA.value = WRITE_DATA - s_axil.WSTRB.value = 0xF - s_axil.WVALID.value = 1 - s_axil.BREADY.value = 1 + addr_mask = (1 << config["address_width"]) - 1 + strobe_mask = (1 << config["byte_width"]) - 1 - await Timer(1, units="ns") + for txn in config["transactions"]: + master_name = txn["master"] + index = tuple(txn["index"]) + entry = masters[master_name] - assert int(masters["reg2"].AWVALID.value) == 1, "reg2 AWVALID should follow slave" - assert int(masters["reg2"].WVALID.value) == 1, "reg2 WVALID should follow slave" - assert int(masters["reg2"].AWADDR.value) == WRITE_ADDR, "AWADDR should fan out" - assert int(masters["reg2"].WDATA.value) == WRITE_DATA, "WDATA should fan out" - assert int(masters["reg2"].WSTRB.value) == 0xF, "WSTRB should propagate" + address = txn["address"] & addr_mask + write_data = _write_pattern(address, config["data_width"]) - for name, master in masters.items(): - if name != "reg2": - assert int(master.AWVALID.value) == 0, f"{name} AWVALID should stay low" - assert int(master.WVALID.value) == 0, f"{name} WVALID should stay low" + slave.AWADDR.value = address + slave.AWPROT.value = 0 + slave.AWVALID.value = 1 + slave.WDATA.value = write_data + slave.WSTRB.value = strobe_mask + slave.WVALID.value = 1 + slave.BREADY.value = 1 - # Release write channel - s_axil.AWVALID.value = 0 - s_axil.WVALID.value = 0 - s_axil.BREADY.value = 0 - await Timer(1, units="ns") + await Timer(1, units="ns") - # -------------------------------------------------------------- - # Read transaction targeting reg3 - # -------------------------------------------------------------- - masters["reg3"].RVALID.value = 1 - masters["reg3"].RDATA.value = READ_DATA - masters["reg3"].RRESP.value = 0 + assert _get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted" + assert _get_int(entry["outputs"]["AWADDR"], index) == address, f"{master_name} must receive AWADDR" + assert _get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted" + assert _get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA" + assert _get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB" - s_axil.ARADDR.value = READ_ADDR - s_axil.ARPROT.value = 0 - s_axil.ARVALID.value = 1 - s_axil.RREADY.value = 1 + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0 + ), f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}" + assert ( + _get_int(other_entry["outputs"]["WVALID"], other_idx) == 0 + ), f"{other_name}{other_idx} WVALID should remain low during {txn['label']}" - await Timer(1, units="ns") + slave.AWVALID.value = 0 + slave.WVALID.value = 0 + slave.BREADY.value = 0 + await Timer(1, units="ns") - assert int(masters["reg3"].ARVALID.value) == 1, "reg3 ARVALID should follow slave" - assert int(masters["reg3"].ARADDR.value) == READ_ADDR, "ARADDR should fan out" + read_data = _read_pattern(address, config["data_width"]) + _set_value(entry["inputs"]["RVALID"], index, 1) + _set_value(entry["inputs"]["RDATA"], index, read_data) + _set_value(entry["inputs"]["RRESP"], index, 0) - for name, master in masters.items(): - if name != "reg3": - assert int(master.ARVALID.value) == 0, f"{name} ARVALID should stay low" + slave.ARADDR.value = address + slave.ARPROT.value = 0 + slave.ARVALID.value = 1 + slave.RREADY.value = 1 - assert int(s_axil.RVALID.value) == 1, "Slave should raise RVALID when master responds" - assert int(s_axil.RDATA.value) == READ_DATA, "Read data should return to slave" - assert int(s_axil.RRESP.value) == 0, "No error expected for read" + await Timer(1, units="ns") - # Return to idle - s_axil.ARVALID.value = 0 - s_axil.RREADY.value = 0 - masters["reg3"].RVALID.value = 0 - await Timer(1, units="ns") + assert _get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID" + assert _get_int(entry["outputs"]["ARADDR"], index) == address, f"{master_name} must receive ARADDR" + + for other_name, other_idx in _all_index_pairs(masters): + if other_name == master_name and other_idx == index: + continue + other_entry = masters[other_name] + assert ( + _get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0 + ), f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}" + + assert int(slave.RVALID.value) == 1, "Slave should observe RVALID when master responds" + assert int(slave.RDATA.value) == read_data, "Read data must fold back to slave" + assert int(slave.RRESP.value) == 0, "Read response should indicate success" + + slave.ARVALID.value = 0 + slave.RREADY.value = 0 + _set_value(entry["inputs"]["RVALID"], index, 0) + _set_value(entry["inputs"]["RDATA"], index, 0) + await Timer(1, units="ns") diff --git a/tests/cocotb/axi4lite/smoke/test_runner.py b/tests/cocotb/axi4lite/smoke/test_runner.py index 6b1982f..1360d50 100644 --- a/tests/cocotb/axi4lite/smoke/test_runner.py +++ b/tests/cocotb/axi4lite/smoke/test_runner.py @@ -1,5 +1,8 @@ -"""Pytest wrapper launching the AXI4-Lite cocotb smoke test.""" +"""Pytest wrapper launching the AXI4-Lite cocotb smoke tests.""" +from __future__ import annotations + +import json from pathlib import Path import pytest @@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner -from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources +from tests.cocotb_lib import RDL_CASES +from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation @pytest.mark.verilator -def test_axi4lite_smoke(tmp_path: Path) -> None: - """Compile the AXI4-Lite design and execute the cocotb smoke test.""" +@pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES]) +def test_axi4lite_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None: + """Compile each AXI4-Lite design variant and execute the cocotb smoke test.""" repo_root = Path(__file__).resolve().parents[4] + rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file + build_root = tmp_path / top_name - module_path, package_path = compile_rdl_and_export( - str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), - "multi_reg", - tmp_path, + module_path, package_path, config = prepare_cpuif_case( + str(rdl_path), + top_name, + build_root, cpuif_cls=AXI4LiteCpuifFlat, + control_signal="AWVALID", ) sources = get_verilog_sources( @@ -34,17 +42,18 @@ def test_axi4lite_smoke(tmp_path: Path) -> None: ) runner = get_runner("verilator") - build_dir = tmp_path / "sim_build" + sim_build = build_root / "sim_build" runner.build( sources=sources, hdl_toplevel=module_path.stem, - build_dir=build_dir, + build_dir=sim_build, ) runner.test( hdl_toplevel=module_path.stem, test_module="tests.cocotb.axi4lite.smoke.test_register_access", - build_dir=build_dir, - log_file=str(tmp_path / "sim.log"), + build_dir=sim_build, + log_file=str(build_root / "simulation.log"), + extra_env={"RDL_TEST_CONFIG": json.dumps(config)}, ) diff --git a/tests/cocotb_lib/__init__.py b/tests/cocotb_lib/__init__.py index 3b33873..91bea50 100644 --- a/tests/cocotb_lib/__init__.py +++ b/tests/cocotb_lib/__init__.py @@ -1,3 +1,10 @@ -from pathlib import Path +"""Manifest of SystemRDL sources used by the cocotb simulations.""" -rdls = map(Path, ["simple.rdl", "multiple_reg.rdl"]) +RDL_CASES: list[tuple[str, str]] = [ + ("simple.rdl", "simple_test"), + ("multiple_reg.rdl", "multi_reg"), + ("deep_hierarchy.rdl", "deep_hierarchy"), + ("wide_status.rdl", "wide_status"), + ("variable_layout.rdl", "variable_layout"), + ("asymmetric_bus.rdl", "asymmetric_bus"), +] diff --git a/tests/cocotb_lib/handle_utils.py b/tests/cocotb_lib/handle_utils.py new file mode 100644 index 0000000..e29f79e --- /dev/null +++ b/tests/cocotb_lib/handle_utils.py @@ -0,0 +1,69 @@ +"""Utilities for resolving cocotb signal handles across simulators.""" + +from __future__ import annotations + +from typing import Any, Iterable + + +class SignalHandle: + """ + Wrapper that resolves array elements even when the simulator does not expose + unpacked arrays through ``handle[idx]``. + """ + + def __init__(self, dut, name: str) -> None: + self._dut = dut + self._name = name + self._base = getattr(dut, name, None) + self._cache: dict[tuple[int, ...], Any] = {} + + def resolve(self, indices: tuple[int, ...]): + if not indices: + return self._base if self._base is not None else self._lookup(tuple()) + + if indices not in self._cache: + self._cache[indices] = self._direct_or_lookup(indices) + return self._cache[indices] + + def _direct_or_lookup(self, indices: tuple[int, ...]): + if self._base is not None: + ref = self._base + try: + for idx in indices: + ref = ref[idx] + return ref + except (IndexError, TypeError, AttributeError): + pass + + return self._lookup(indices) + + def _lookup(self, indices: tuple[int, ...]): + suffix = "".join(f"[{idx}]" for idx in indices) + path = f"{self._name}{suffix}" + + try: + return getattr(self._dut, path) + except AttributeError: + pass + + errors: list[Exception] = [] + for extended in (False, True): + try: + return self._dut._id(path, extended=extended) + except (AttributeError, ValueError) as exc: + errors.append(exc) + + raise AttributeError(f"Unable to resolve handle '{path}' via dut._id") from errors[-1] + + +def resolve_handle(handle, indices: Iterable[int]): + """Resolve either a regular cocotb handle or a ``SignalHandle`` wrapper.""" + index_tuple = tuple(indices) + + if isinstance(handle, SignalHandle): + return handle.resolve(index_tuple) + + ref = handle + for idx in index_tuple: + ref = ref[idx] + return ref diff --git a/tests/cocotb_lib/rdl/asymmetric_bus.rdl b/tests/cocotb_lib/rdl/asymmetric_bus.rdl new file mode 100644 index 0000000..8a730f4 --- /dev/null +++ b/tests/cocotb_lib/rdl/asymmetric_bus.rdl @@ -0,0 +1,105 @@ +regfile port_rf { + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } port_enable[0:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } port_speed[3:1]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } port_width[8:4]; + } control @ 0x0; + + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } error_count[15:0]; + + field { + sw = r; + hw = w; + reset = 0x0; + } retry_count[31:16]; + } counters @ 0x4; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } qos[7:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } virtual_channel[9:8]; + } qos @ 0x8; +}; + +addrmap asymmetric_bus { + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } control[3:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } id[15:4]; + } control @ 0x0; + + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } status_flags[19:0]; + } status @ 0x4; + + reg { + regwidth = 64; + field { + sw = rw; + hw = rw; + reset = 0x00abcdef; + } timestamp_low[31:0]; + + field { + sw = rw; + hw = rw; + reset = 0x00123456; + } timestamp_high[55:32]; + } timestamp @ 0x8; + + reg { + regwidth = 128; + field { + sw = rw; + hw = rw; + reset = 0x0; + } extended_id[63:0]; + + field { + sw = rw; + hw = rw; + reset = 0x1; + } parity[64:64]; + } extended @ 0x10; + + port_rf port[6] @ 0x40 += 0x20; +}; diff --git a/tests/cocotb_lib/rdl/deep_hierarchy.rdl b/tests/cocotb_lib/rdl/deep_hierarchy.rdl new file mode 100644 index 0000000..f1c3527 --- /dev/null +++ b/tests/cocotb_lib/rdl/deep_hierarchy.rdl @@ -0,0 +1,115 @@ +addrmap deep_hierarchy { + regfile context_rf { + reg { + field { + sw = rw; + hw = r; + reset = 0x1; + } enable[7:0]; + + field { + sw = r; + hw = w; + onread = rclr; + reset = 0x0; + } status[15:8]; + + field { + sw = rw; + hw = rw; + reset = 0x55; + } mode[23:16]; + } command @ 0x0; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x1234; + } threshold[15:0]; + } threshold @ 0x4; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } counter[31:0]; + } counter @ 0x8; + }; + + regfile engine_rf { + context_rf context[3] @ 0x0; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } timeout[15:0]; + + field { + sw = rw; + hw = rw; + reset = 0x1; + } priority[19:16]; + } config @ 0x30; + + reg { + field { + sw = r; + hw = w; + onread = rclr; + reset = 0x0; + } error[31:0]; + } error_log @ 0x34; + }; + + addrmap fabric_slice { + engine_rf engines[4] @ 0x0; + + regfile monitor_rf { + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } perf_count[31:0]; + } perf @ 0x0; + + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } last_error[31:0]; + } last_error @ 0x4; + }; + + monitor_rf monitor @ 0x400; + + reg { + field { + sw = rw; + hw = rw; + reset = 0xdeadbeef; + } fabric_ctrl[31:0]; + } fabric_ctrl @ 0x500; + }; + + fabric_slice slices[2] @ 0x0 += 0x800; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x1; + } global_enable[0:0]; + + field { + sw = rw; + hw = rw; + reset = 0x4; + } debug_level[3:1]; + } global_control @ 0x1000; +}; diff --git a/tests/cocotb_lib/multiple_reg.rdl b/tests/cocotb_lib/rdl/multiple_reg.rdl similarity index 100% rename from tests/cocotb_lib/multiple_reg.rdl rename to tests/cocotb_lib/rdl/multiple_reg.rdl diff --git a/tests/cocotb_lib/simple.rdl b/tests/cocotb_lib/rdl/simple.rdl similarity index 100% rename from tests/cocotb_lib/simple.rdl rename to tests/cocotb_lib/rdl/simple.rdl diff --git a/tests/cocotb_lib/rdl/variable_layout.rdl b/tests/cocotb_lib/rdl/variable_layout.rdl new file mode 100644 index 0000000..2cac1e8 --- /dev/null +++ b/tests/cocotb_lib/rdl/variable_layout.rdl @@ -0,0 +1,156 @@ +reg ctrl_reg_t { + desc = "Control register shared across channels."; + + field { + sw = rw; + hw = rw; + reset = 0x1; + } enable[0:0]; + + field { + sw = rw; + hw = rw; + reset = 0x2; + } mode[3:1]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } prescale[11:4]; +}; + +regfile channel_rf { + ctrl_reg_t control @ 0x0; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } gain[11:0]; + + field { + sw = rw; + hw = rw; + reset = 0x200; + } offset[23:12]; + } calibrate @ 0x4; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } sample_count[15:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } error_count[31:16]; + } counters @ 0x8; +}; + +regfile slice_rf { + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } slope[15:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } intercept[31:16]; + } calibration @ 0x0; + + reg { + regwidth = 64; + field { + sw = r; + hw = w; + reset = 0x0; + } min_val[31:0]; + + field { + sw = r; + hw = w; + reset = 0x0; + } max_val[63:32]; + } range @ 0x4; +}; + +regfile tile_rf { + channel_rf channel[3] @ 0x0; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } tile_mode[1:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } tile_enable[2:2]; + } tile_ctrl @ 0x100; + + slice_rf slice[2] @ 0x200; +}; + +regfile summary_rf { + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } total_errors[31:0]; + } errors @ 0x0; + + reg { + field { + sw = r; + hw = w; + reset = 0x0; + } total_samples[31:0]; + } samples @ 0x4; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } interrupt_enable[7:0]; + } interrupt_enable @ 0x8; +}; + +addrmap variable_layout { + tile_rf tiles[2] @ 0x0; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } watchdog_enable[0:0]; + + field { + sw = rw; + hw = rw; + reset = 0x100; + } watchdog_timeout[16:1]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } watchdog_mode[18:17]; + } watchdog @ 0x2000; + + summary_rf summary @ 0x3000; +}; diff --git a/tests/cocotb_lib/rdl/wide_status.rdl b/tests/cocotb_lib/rdl/wide_status.rdl new file mode 100644 index 0000000..4b34c2a --- /dev/null +++ b/tests/cocotb_lib/rdl/wide_status.rdl @@ -0,0 +1,69 @@ +reg status_reg_t { + regwidth = 64; + desc = "Status register capturing wide flags and sticky bits."; + + field { + sw = r; + hw = w; + onread = rclr; + reset = 0x0; + } flags[62:0]; + + field { + sw = rw; + hw = r; + reset = 0x1; + } sticky_bit[63:63]; +}; + +reg metrics_reg_t { + regwidth = 64; + desc = "Metrics register pairing counters with thresholds."; + + field { + sw = r; + hw = w; + reset = 0x0; + } count[31:0]; + + field { + sw = rw; + hw = rw; + reset = 0x0; + } threshold[63:32]; +}; + +addrmap wide_status { + status_reg_t status_blocks[16] @ 0x0; + + metrics_reg_t metrics[4] @ 0x400; + + reg { + regwidth = 128; + field { + sw = rw; + hw = rw; + reset = 0x0; + } configuration[127:0]; + } configuration @ 0x800; + + reg { + field { + sw = rw; + hw = rw; + reset = 0x0; + } version_major[7:0]; + + field { + sw = rw; + hw = rw; + reset = 0x1; + } version_minor[15:8]; + + field { + sw = rw; + hw = rw; + reset = 0x0100; + } build[31:16]; + } version @ 0x900; +}; diff --git a/tests/cocotb_lib/utils.py b/tests/cocotb_lib/utils.py index cd9f7a7..1df6b2e 100644 --- a/tests/cocotb_lib/utils.py +++ b/tests/cocotb_lib/utils.py @@ -1,9 +1,13 @@ """Common utilities for cocotb testbenches.""" +from __future__ import annotations + +from collections import defaultdict from pathlib import Path from typing import Any from systemrdl import RDLCompiler +from systemrdl.node import AddressableNode, AddrmapNode, RegNode from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif from peakrdl_busdecoder.exporter import BusDecoderExporter @@ -65,3 +69,206 @@ def get_verilog_sources(module_path: Path, package_path: Path, intf_files: list[ # Add module file sources.append(str(module_path)) return sources + + +def prepare_cpuif_case( + rdl_source: str, + top_name: str, + output_dir: Path, + cpuif_cls: type[BaseCpuif], + *, + control_signal: str, + max_samples_per_master: int = 3, + exporter_kwargs: dict[str, Any] | None = None, +) -> tuple[Path, Path, dict[str, Any]]: + """ + Compile SystemRDL, export the CPUIF, and build a configuration payload for cocotb tests. + + Parameters + ---------- + rdl_source: + Path to the SystemRDL source file. + top_name: + Name of the top-level addrmap to elaborate. + output_dir: + Directory where generated HDL will be written. + cpuif_cls: + CPUIF implementation class to use during export. + control_signal: + Name of the control signal used to identify master ports + (``"PSEL"`` for APB, ``"AWVALID"`` for AXI4-Lite, etc.). + max_samples_per_master: + Limit for the number of register addresses sampled per master in the test matrix. + exporter_kwargs: + Optional keyword overrides passed through to :class:`BusDecoderExporter`. + + Returns + ------- + tuple + ``(module_path, package_path, config_dict)``, where the configuration dictionary + is JSON-serializable and describes masters, indices, and sampled transactions. + """ + compiler = RDLCompiler() + compiler.compile_file(rdl_source) + root = compiler.elaborate(top_name) + top_node = root.top # type: ignore[assignment] + + export_kwargs: dict[str, Any] = {"cpuif_cls": cpuif_cls} + if exporter_kwargs: + export_kwargs.update(exporter_kwargs) + + exporter = BusDecoderExporter() + exporter.export(root, str(output_dir), **export_kwargs) + + module_name = export_kwargs.get("module_name", top_name) + package_name = export_kwargs.get("package_name", f"{top_name}_pkg") + + module_path = Path(output_dir) / f"{module_name}.sv" + package_path = Path(output_dir) / f"{package_name}.sv" + + config = _build_case_config( + top_node, + exporter.cpuif, + control_signal, + max_samples_per_master=max_samples_per_master, + ) + + config["address_width"] = exporter.cpuif.addr_width + config["data_width"] = exporter.cpuif.data_width + config["byte_width"] = exporter.cpuif.data_width // 8 + + return module_path, package_path, config + + +def _build_case_config( + top_node: AddrmapNode, + cpuif: BaseCpuif, + control_signal: str, + *, + max_samples_per_master: int, +) -> dict[str, Any]: + master_entries: dict[str, dict[str, Any]] = {} + + for child in cpuif.addressable_children: + signal = cpuif.signal(control_signal, child) + # Example: m_apb_tiles_PSEL[N_TILESS] -> m_apb_tiles + base = signal.split("[", 1)[0] + suffix = f"_{control_signal}" + if not base.endswith(suffix): + raise ValueError(f"Unable to derive port prefix from '{signal}'") + port_prefix = base[: -len(suffix)] + + master_entries[child.inst_name] = { + "inst_name": child.inst_name, + "port_prefix": port_prefix, + "is_array": bool(child.is_array), + "dimensions": list(child.array_dimensions or []), + "indices": set(), + } + + # Map each register to its top-level master and collect addresses + groups: dict[tuple[str, tuple[int, ...]], list[tuple[int, str]]] = defaultdict(list) + + def visit(node: AddressableNode) -> None: + if isinstance(node, RegNode): + master = node # type: AddressableNode + while master.parent is not top_node: + parent = master.parent + if not isinstance(parent, AddressableNode): + raise RuntimeError("Encountered unexpected hierarchy while resolving master node") + master = parent + + inst_name = master.inst_name + if inst_name not in master_entries: + # Handles cases where the register itself is the master (direct child of top) + signal = cpuif.signal(control_signal, master) + base = signal.split("[", 1)[0] + suffix = f"_{control_signal}" + if not base.endswith(suffix): + raise ValueError(f"Unable to derive port prefix from '{signal}'") + port_prefix = base[: -len(suffix)] + master_entries[inst_name] = { + "inst_name": inst_name, + "port_prefix": port_prefix, + "is_array": bool(master.is_array), + "dimensions": list(master.array_dimensions or []), + "indices": set(), + } + + idx_tuple = tuple(master.current_idx or []) + master_entries[inst_name]["indices"].add(idx_tuple) + + relative_addr = int(node.absolute_address) - int(top_node.absolute_address) + full_path = node.get_path() + label = full_path.split(".", 1)[1] if "." in full_path else full_path + groups[(inst_name, idx_tuple)].append((relative_addr, label)) + + for child in node.children(unroll=True): + if isinstance(child, AddressableNode): + visit(child) + + visit(top_node) + + masters_list = [] + for entry in master_entries.values(): + indices = entry["indices"] or {()} + entry["indices"] = [list(idx) for idx in sorted(indices)] + masters_list.append( + { + "inst_name": entry["inst_name"], + "port_prefix": entry["port_prefix"], + "is_array": entry["is_array"], + "dimensions": entry["dimensions"], + "indices": entry["indices"], + } + ) + + transactions = [] + for (inst_name, idx_tuple), items in groups.items(): + addresses = sorted({addr for addr, _ in items}) + samples = _sample_addresses(addresses, max_samples_per_master) + for addr in samples: + label = next(lbl for candidate, lbl in items if candidate == addr) + transactions.append( + { + "address": addr, + "master": inst_name, + "index": list(idx_tuple), + "label": label, + } + ) + + transactions.sort(key=lambda item: (item["master"], item["index"], item["address"])) + + masters_list.sort(key=lambda item: item["inst_name"]) + + return { + "masters": masters_list, + "transactions": transactions, + } + + +def _sample_addresses(addresses: list[int], max_samples: int) -> list[int]: + if len(addresses) <= max_samples: + return addresses + + samples: list[int] = [] + samples.append(addresses[0]) + if len(addresses) > 1: + samples.append(addresses[-1]) + + if len(addresses) > 2: + mid = addresses[len(addresses) // 2] + if mid not in samples: + samples.append(mid) + + idx = 1 + while len(samples) < max_samples: + pos = (len(addresses) * idx) // max_samples + candidate = addresses[min(pos, len(addresses) - 1)] + if candidate not in samples: + samples.append(candidate) + idx += 1 + + samples.sort() + return samples diff --git a/uv.lock b/uv.lock index 214ef75..e5ff9b8 100644 --- a/uv.lock +++ b/uv.lock @@ -608,7 +608,7 @@ wheels = [ [[package]] name = "peakrdl-busdecoder" -version = "0.4.0" +version = "0.5.0" source = { editable = "." } dependencies = [ { name = "jinja2" },