Tests/cocotb (#19)

* wip

* reorg

* update sv int

* apb4 working

* apb3 working

* version bump + ignore runner warning

* remove redundant check

* adding log on failure

* cleaning up verilator version issue

* devcontainer

* Fix missing libpython in GitHub Actions CI environment (#21)

* Initial plan

* Install libpython in GitHub Actions for cocotb tests

Co-authored-by: arnavsacheti <36746504+arnavsacheti@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: arnavsacheti <36746504+arnavsacheti@users.noreply.github.com>

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
Arnav Sacheti
2025-11-10 23:00:28 -08:00
committed by GitHub
parent d7481e71ba
commit a9653c8497
25 changed files with 1417 additions and 364 deletions

22
.devcontainer/Dockerfile Normal file
View File

@@ -0,0 +1,22 @@
FROM verilator/verilator:latest
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3 \
python3-venv \
python3-pip \
python3-dev \
build-essential \
pkg-config \
git \
curl \
ca-certificates && \
rm -rf /var/lib/apt/lists/*
ENV UV_INSTALL_DIR=/usr/local/bin
ENV UV_LINK_MODE=copy
# Install uv globally so both VS Code and terminals can use it
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
RUN uv --version

View File

@@ -0,0 +1,33 @@
{
"name": "PeakRDL BusDecoder",
"build": {
"dockerfile": "Dockerfile"
},
"runArgs": [
"--init"
],
"features": {
"ghcr.io/devcontainers/features/common-utils:2": {
"username": "vscode",
"uid": "1000",
"gid": "1000",
"installZsh": "false",
"installOhMyZsh": "false"
}
},
"remoteUser": "vscode",
"postCreateCommand": "uv sync --frozen --all-extras --group tools --group test",
"customizations": {
"vscode": {
"settings": {
"python.defaultInterpreterPath": ".venv/bin/python",
"terminal.integrated.shell.linux": "/bin/bash"
},
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance",
"ms-vscode.cpptools"
]
}
}
}

View File

@@ -14,6 +14,8 @@ on:
jobs: jobs:
test: test:
runs-on: ubuntu-latest runs-on: ubuntu-latest
container:
image: verilator/verilator:latest
permissions: permissions:
contents: read contents: read
strategy: strategy:
@@ -27,19 +29,21 @@ jobs:
uses: astral-sh/setup-uv@v3 uses: astral-sh/setup-uv@v3
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
enable-cache: true
- name: Install Verilator - name: Check Verilator version
run: verilator --version
- name: Install Python development packages
run: | run: |
sudo apt-get update apt-get update && apt-get install -y python3-dev libpython3-dev
sudo apt-get install -y verilator
verilator --version
- name: Install dependencies - name: Install dependencies
run: | run: |
uv sync --all-extras --group test uv sync --all-extras --group test
- name: Run tests - name: Run tests
run: uv run pytest tests/ -v --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term run: uv run pytest tests/ --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v4 uses: codecov/codecov-action@v4

View File

@@ -4,11 +4,11 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "peakrdl-busdecoder" name = "peakrdl-busdecoder"
version = "0.4.0" version = "0.5.0"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = ["jinja2>=3.1.6", "systemrdl-compiler~=1.30.1"] dependencies = ["jinja2>=3.1.6", "systemrdl-compiler~=1.30.1"]
authors = [{ name = "Alex Mykyta" }] authors = [{ name = "Arnav Sacheti" }]
description = "Generate a SystemVerilog bus decoder from SystemRDL for splitting CPU interfaces to multiple sub-address spaces" description = "Generate a SystemVerilog bus decoder from SystemRDL for splitting CPU interfaces to multiple sub-address spaces"
readme = "README.md" readme = "README.md"
license = { text = "LGPLv3" } license = { text = "LGPLv3" }
@@ -114,3 +114,4 @@ markers = [
"simulation: marks tests as requiring cocotb simulation (deselect with '-m \"not simulation\"')", "simulation: marks tests as requiring cocotb simulation (deselect with '-m \"not simulation\"')",
"verilator: marks tests as requiring verilator simulator (deselect with '-m \"not verilator\"')", "verilator: marks tests as requiring verilator simulator (deselect with '-m \"not verilator\"')",
] ]
filterwarnings = ["error", "ignore::UserWarning"]

View File

@@ -35,15 +35,15 @@ class APB3CpuifFlat(BaseCpuif):
def fanout(self, node: AddressableNode) -> str: def fanout(self, node: AddressableNode) -> str:
fanout: dict[str, str] = {} fanout: dict[str, str] = {}
fanout[self.signal("PSEL", node)] = ( fanout[self.signal("PSEL", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
) )
fanout[self.signal("PENABLE", node)] = self.signal("PENABLE") fanout[self.signal("PENABLE", node, "gi")] = self.signal("PENABLE")
fanout[self.signal("PWRITE", node)] = ( fanout[self.signal("PWRITE", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
) )
fanout[self.signal("PADDR", node)] = self.signal("PADDR") fanout[self.signal("PADDR", node, "gi")] = self.signal("PADDR")
fanout[self.signal("PWDATA", node)] = "cpuif_wr_data" fanout[self.signal("PWDATA", node, "gi")] = "cpuif_wr_data"
return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items())) return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items()))
@@ -53,8 +53,8 @@ class APB3CpuifFlat(BaseCpuif):
fanin["cpuif_rd_ack"] = "'0" fanin["cpuif_rd_ack"] = "'0"
fanin["cpuif_rd_err"] = "'0" fanin["cpuif_rd_err"] = "'0"
else: else:
fanin["cpuif_rd_ack"] = self.signal("PREADY", node) fanin["cpuif_rd_ack"] = self.signal("PREADY", node, "i")
fanin["cpuif_rd_err"] = self.signal("PSLVERR", node) fanin["cpuif_rd_err"] = self.signal("PSLVERR", node, "i")
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))
@@ -63,6 +63,6 @@ class APB3CpuifFlat(BaseCpuif):
if node is None: if node is None:
fanin["cpuif_rd_data"] = "'0" fanin["cpuif_rd_data"] = "'0"
else: else:
fanin["cpuif_rd_data"] = self.signal("PRDATA", node) fanin["cpuif_rd_data"] = self.signal("PRDATA", node, "i")
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))

View File

@@ -35,17 +35,17 @@ class APB4CpuifFlat(BaseCpuif):
def fanout(self, node: AddressableNode) -> str: def fanout(self, node: AddressableNode) -> str:
fanout: dict[str, str] = {} fanout: dict[str, str] = {}
fanout[self.signal("PSEL", node)] = ( fanout[self.signal("PSEL", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
) )
fanout[self.signal("PENABLE", node)] = self.signal("PENABLE") fanout[self.signal("PENABLE", node, "gi")] = self.signal("PENABLE")
fanout[self.signal("PWRITE", node)] = ( fanout[self.signal("PWRITE", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'i')}" f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
) )
fanout[self.signal("PADDR", node)] = self.signal("PADDR") fanout[self.signal("PADDR", node, "gi")] = self.signal("PADDR")
fanout[self.signal("PPROT", node)] = self.signal("PPROT") fanout[self.signal("PPROT", node, "gi")] = self.signal("PPROT")
fanout[self.signal("PWDATA", node)] = "cpuif_wr_data" fanout[self.signal("PWDATA", node, "gi")] = "cpuif_wr_data"
fanout[self.signal("PSTRB", node)] = "cpuif_wr_byte_en" fanout[self.signal("PSTRB", node, "gi")] = "cpuif_wr_byte_en"
return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items())) return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items()))
@@ -55,8 +55,8 @@ class APB4CpuifFlat(BaseCpuif):
fanin["cpuif_rd_ack"] = "'0" fanin["cpuif_rd_ack"] = "'0"
fanin["cpuif_rd_err"] = "'0" fanin["cpuif_rd_err"] = "'0"
else: else:
fanin["cpuif_rd_ack"] = self.signal("PREADY", node) fanin["cpuif_rd_ack"] = self.signal("PREADY", node, "i")
fanin["cpuif_rd_err"] = self.signal("PSLVERR", node) fanin["cpuif_rd_err"] = self.signal("PSLVERR", node, "i")
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))
@@ -65,6 +65,6 @@ class APB4CpuifFlat(BaseCpuif):
if node is None: if node is None:
fanin["cpuif_rd_data"] = "'0" fanin["cpuif_rd_data"] = "'0"
else: else:
fanin["cpuif_rd_data"] = self.signal("PRDATA", node) fanin["cpuif_rd_data"] = self.signal("PRDATA", node, "i")
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items())) return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))

View File

@@ -1,10 +1,13 @@
"""Interface abstraction for handling flat and non-flat signal declarations.""" """Interface abstraction for handling flat and non-flat signal declarations."""
import re
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from systemrdl.node import AddressableNode from systemrdl.node import AddressableNode
from ..utils import get_indexed_path
if TYPE_CHECKING: if TYPE_CHECKING:
from .base_cpuif import BaseCpuif from .base_cpuif import BaseCpuif
@@ -93,7 +96,6 @@ class SVInterface(Interface):
indexer: str | int | None = None, indexer: str | int | None = None,
) -> str: ) -> str:
"""Generate SystemVerilog interface signal reference.""" """Generate SystemVerilog interface signal reference."""
from ..utils import get_indexed_path
# SVInterface only supports string indexers (loop variable names like "i", "gi") # SVInterface only supports string indexers (loop variable names like "i", "gi")
if indexer is not None and not isinstance(indexer, str): if indexer is not None and not isinstance(indexer, str):
@@ -166,6 +168,13 @@ class FlatInterface(Interface):
# Is an array # Is an array
if indexer is not None: if indexer is not None:
if isinstance(indexer, str):
indexed_path = get_indexed_path(node.parent, node, indexer, skip_kw_filter=True)
pattern = r"\[.*?\]"
indexes = re.findall(pattern, indexed_path)
return f"{base}_{signal}{''.join(indexes)}"
return f"{base}_{signal}[{indexer}]" return f"{base}_{signal}[{indexer}]"
return f"{base}_{signal}[N_{node.inst_name.upper()}S]" return f"{base}_{signal}[N_{node.inst_name.upper()}S]"

View File

@@ -70,7 +70,9 @@ class DecodeLogicGenerator(BusDecoderListener):
# Avoid generating a redundant >= 0 comparison, which triggers Verilator warnings. # Avoid generating a redundant >= 0 comparison, which triggers Verilator warnings.
if not (l_bound.value == 0 and len(l_bound_comp) == 1): if not (l_bound.value == 0 and len(l_bound_comp) == 1):
predicates.append(lower_expr) predicates.append(lower_expr)
predicates.append(upper_expr) # Avoid generating a redundant full-width < max comparison, which triggers Verilator warnings.
if not (u_bound.value == (1 << addr_width) and len(u_bound_comp) == 1):
predicates.append(upper_expr)
return predicates return predicates

View File

@@ -1,3 +1,6 @@
from typing import Literal
class SVInt: class SVInt:
def __init__(self, value: int, width: int | None = None) -> None: def __init__(self, value: int, width: int | None = None) -> None:
self.value = value self.value = value
@@ -19,3 +22,27 @@ class SVInt:
return SVInt(self.value + other.value, max(self.width, other.width)) return SVInt(self.value + other.value, max(self.width, other.width))
else: else:
return SVInt(self.value + other.value, None) return SVInt(self.value + other.value, None)
def __sub__(self, other: "SVInt") -> "SVInt":
if self.width is not None and other.width is not None:
return SVInt(self.value - other.value, max(self.width, other.width))
else:
return SVInt(self.value - other.value, None)
def __len__(self) -> int:
if self.width is not None:
return self.width
else:
return self.value.bit_length()
def to_bytes(self, byteorder: Literal["little", "big"] = "little") -> bytes:
byte_length = (self.value.bit_length() + 7) // 8
return self.value.to_bytes(byte_length, byteorder)
def __eq__(self, other: object) -> bool:
if not isinstance(other, SVInt):
return NotImplemented
return self.value == other.value and self.width == other.width
def __hash__(self) -> int:
return hash((self.value, self.width))

View File

@@ -1,15 +1,19 @@
"""APB3 smoke tests for generated multi-register design.""" """APB3 smoke tests generated from SystemRDL sources."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x0 from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
READ_ADDR = 0x8
WRITE_DATA = 0xCAFEBABE
READ_DATA = 0x0BAD_F00D
class _Apb3SlaveShim: class _Apb3SlaveShim:
"""Accessor for the APB3 slave signals on the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_apb" prefix = "s_apb"
self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PSEL = getattr(dut, f"{prefix}_PSEL")
@@ -22,102 +26,161 @@ class _Apb3SlaveShim:
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
class _Apb3MasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): payload = os.environ.get("RDL_TEST_CONFIG")
self.PSEL = getattr(dut, f"{base}_PSEL") if payload is None:
self.PENABLE = getattr(dut, f"{base}_PENABLE") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.PWRITE = getattr(dut, f"{base}_PWRITE") return json.loads(payload)
self.PADDR = getattr(dut, f"{base}_PADDR")
self.PWDATA = getattr(dut, f"{base}_PWDATA")
self.PRDATA = getattr(dut, f"{base}_PRDATA")
self.PREADY = getattr(dut, f"{base}_PREADY")
self.PSLVERR = getattr(dut, f"{base}_PSLVERR")
def _apb3_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_apb", None) or _Apb3SlaveShim(dut) return resolve_handle(handle, indices)
def _apb3_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _Apb3MasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
prefix = master["port_prefix"]
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": SignalHandle(dut, f"{prefix}_PSEL"),
"PENABLE": SignalHandle(dut, f"{prefix}_PENABLE"),
"PWRITE": SignalHandle(dut, f"{prefix}_PWRITE"),
"PADDR": SignalHandle(dut, f"{prefix}_PADDR"),
"PWDATA": SignalHandle(dut, f"{prefix}_PWDATA"),
},
"inputs": {
"PRDATA": SignalHandle(dut, f"{prefix}_PRDATA"),
"PREADY": SignalHandle(dut, f"{prefix}_PREADY"),
"PSLVERR": SignalHandle(dut, f"{prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x2041) ^ 0xCAFEBABE) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0x0BAD_F00D) + width) & mask
@cocotb.test() @cocotb.test()
async def test_apb3_read_write_paths(dut): async def test_apb3_address_decoding(dut) -> None:
"""Exercise APB3 slave interface and observe master fanout.""" """Exercise the APB3 slave interface against sampled register addresses."""
s_apb = _apb3_slave(dut) config = _load_config()
masters = { slave = _Apb3SlaveShim(dut)
"reg1": _apb3_master(dut, "m_apb_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _apb3_master(dut, "m_apb_reg2"),
"reg3": _apb3_master(dut, "m_apb_reg3"),
}
s_apb.PSEL.value = 0 slave.PSEL.value = 0
s_apb.PENABLE.value = 0 slave.PENABLE.value = 0
s_apb.PWRITE.value = 0 slave.PWRITE.value = 0
s_apb.PADDR.value = 0 slave.PADDR.value = 0
s_apb.PWDATA.value = 0 slave.PWDATA.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.PRDATA.value = 0 entry = masters[master_name]
master.PREADY.value = 0 _set_value(entry["inputs"]["PRDATA"], idx, 0)
master.PSLVERR.value = 0 _set_value(entry["inputs"]["PREADY"], idx, 0)
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# Write to reg1 addr_mask = (1 << config["address_width"]) - 1
masters["reg1"].PREADY.value = 1
s_apb.PADDR.value = WRITE_ADDR
s_apb.PWDATA.value = WRITE_DATA
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg1"].PSEL.value) == 1, "reg1 should be selected for write" address = txn["address"] & addr_mask
assert int(masters["reg1"].PWRITE.value) == 1, "Write should propagate to master" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg1"].PADDR.value) == WRITE_ADDR, "Address should reach selected master"
assert int(masters["reg1"].PWDATA.value) == WRITE_DATA, "Write data should fan out"
for name, master in masters.items(): _set_value(entry["inputs"]["PREADY"], index, 1)
if name != "reg1": _set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(master.PSEL.value) == 0, f"{name} must idle during reg1 write"
assert int(s_apb.PREADY.value) == 1, "Ready must reflect selected master" slave.PADDR.value = address
assert int(s_apb.PSLVERR.value) == 0, "Write should not signal error" slave.PWDATA.value = write_data
slave.PWRITE.value = 1
slave.PSEL.value = 1
slave.PENABLE.value = 1
s_apb.PSEL.value = 0 await Timer(1, units="ns")
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
masters["reg1"].PREADY.value = 0
await Timer(1, units="ns")
# Read from reg3 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
masters["reg3"].PRDATA.value = READ_DATA assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction"
masters["reg3"].PREADY.value = 1 assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
masters["reg3"].PSLVERR.value = 0 assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
s_apb.PADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_apb.PSEL.value = 1 if other_name == master_name and other_idx == index:
s_apb.PENABLE.value = 1 continue
s_apb.PWRITE.value = 0 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
await Timer(1, units="ns") assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master"
assert int(slave.PSLVERR.value) == 0, "Write should complete without error"
assert int(masters["reg3"].PSEL.value) == 1, "reg3 should be selected for read" slave.PSEL.value = 0
assert int(masters["reg3"].PWRITE.value) == 0, "Read should clear write" slave.PENABLE.value = 0
assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Address should reach read target" slave.PWRITE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
await Timer(1, units="ns")
for name, master in masters.items(): # ------------------------------------------------------------------
if name != "reg3": # Read phase
assert int(master.PSEL.value) == 0, f"{name} must idle during reg3 read" # ------------------------------------------------------------------
read_data = _read_pattern(address, config["data_width"])
_set_value(entry["inputs"]["PRDATA"], index, read_data)
_set_value(entry["inputs"]["PREADY"], index, 1)
_set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return to slave" slave.PADDR.value = address
assert int(s_apb.PREADY.value) == 1, "Read should acknowledge" slave.PWRITE.value = 0
assert int(s_apb.PSLVERR.value) == 0, "Read should not signal error" slave.PSEL.value = 1
slave.PENABLE.value = 1
s_apb.PSEL.value = 0 await Timer(1, units="ns")
s_apb.PENABLE.value = 0
masters["reg3"].PREADY.value = 0 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
await Timer(1, units="ns") assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should clear write during read"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave"
assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read"
assert int(slave.PSLVERR.value) == 0, "Read should not signal an error"
slave.PSEL.value = 0
slave.PENABLE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
_set_value(entry["inputs"]["PRDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,5 +1,8 @@
"""Pytest wrapper launching the APB3 cocotb smoke test.""" """Pytest wrapper launching the APB3 cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_apb3_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the APB3 design and execute the cocotb smoke test.""" def test_apb3_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each APB3 design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=APB3CpuifFlat, cpuif_cls=APB3CpuifFlat,
control_signal="PSEL",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,18 @@ def test_apb3_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
runner.build( runner.build(
sources=sources, sources=sources,
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, build_dir=sim_build,
) )
runner.test( runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb3.smoke.test_register_access", test_module="tests.cocotb.apb3.smoke.test_register_access",
build_dir=build_dir, build_dir=sim_build,
log_file=str(tmp_path / "sim.log"), log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
) )

View File

@@ -1,15 +1,19 @@
"""APB4 smoke tests using generated multi-register design.""" """APB4 smoke tests generated from SystemRDL sources."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x4 from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
READ_ADDR = 0x8
WRITE_DATA = 0x1234_5678
READ_DATA = 0x89AB_CDEF
class _Apb4SlaveShim: class _Apb4SlaveShim:
"""Lightweight accessor for the APB4 slave side of the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_apb" prefix = "s_apb"
self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PSEL = getattr(dut, f"{prefix}_PSEL")
@@ -24,115 +28,175 @@ class _Apb4SlaveShim:
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
class _Apb4MasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): """Read the JSON payload describing the generated register topology."""
self.PSEL = getattr(dut, f"{base}_PSEL") payload = os.environ.get("RDL_TEST_CONFIG")
self.PENABLE = getattr(dut, f"{base}_PENABLE") if payload is None:
self.PWRITE = getattr(dut, f"{base}_PWRITE") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.PADDR = getattr(dut, f"{base}_PADDR") return json.loads(payload)
self.PPROT = getattr(dut, f"{base}_PPROT")
self.PWDATA = getattr(dut, f"{base}_PWDATA")
self.PSTRB = getattr(dut, f"{base}_PSTRB")
self.PRDATA = getattr(dut, f"{base}_PRDATA")
self.PREADY = getattr(dut, f"{base}_PREADY")
self.PSLVERR = getattr(dut, f"{base}_PSLVERR")
def _apb4_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_apb", None) or _Apb4SlaveShim(dut) """Index into hierarchical cocotb handles."""
return resolve_handle(handle, indices)
def _apb4_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _Apb4MasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
port_prefix = master["port_prefix"]
entry = {
"port_prefix": port_prefix,
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": SignalHandle(dut, f"{port_prefix}_PSEL"),
"PENABLE": SignalHandle(dut, f"{port_prefix}_PENABLE"),
"PWRITE": SignalHandle(dut, f"{port_prefix}_PWRITE"),
"PADDR": SignalHandle(dut, f"{port_prefix}_PADDR"),
"PPROT": SignalHandle(dut, f"{port_prefix}_PPROT"),
"PWDATA": SignalHandle(dut, f"{port_prefix}_PWDATA"),
"PSTRB": SignalHandle(dut, f"{port_prefix}_PSTRB"),
},
"inputs": {
"PRDATA": SignalHandle(dut, f"{port_prefix}_PRDATA"),
"PREADY": SignalHandle(dut, f"{port_prefix}_PREADY"),
"PSLVERR": SignalHandle(dut, f"{port_prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x1021) ^ 0x1357_9BDF) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0xDEAD_BEE5) + width) & mask
@cocotb.test() @cocotb.test()
async def test_apb4_read_write_paths(dut): async def test_apb4_address_decoding(dut) -> None:
"""Drive APB4 slave signals and observe master activity.""" """Drive the APB4 slave interface and verify master fanout across all sampled registers."""
s_apb = _apb4_slave(dut) config = _load_config()
masters = { slave = _Apb4SlaveShim(dut)
"reg1": _apb4_master(dut, "m_apb_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _apb4_master(dut, "m_apb_reg2"),
"reg3": _apb4_master(dut, "m_apb_reg3"),
}
# Default slave side inputs slave.PSEL.value = 0
s_apb.PSEL.value = 0 slave.PENABLE.value = 0
s_apb.PENABLE.value = 0 slave.PWRITE.value = 0
s_apb.PWRITE.value = 0 slave.PADDR.value = 0
s_apb.PADDR.value = 0 slave.PPROT.value = 0
s_apb.PWDATA.value = 0 slave.PWDATA.value = 0
s_apb.PPROT.value = 0 slave.PSTRB.value = 0
s_apb.PSTRB.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.PRDATA.value = 0 entry = masters[master_name]
master.PREADY.value = 0 _set_value(entry["inputs"]["PRDATA"], idx, 0)
master.PSLVERR.value = 0 _set_value(entry["inputs"]["PREADY"], idx, 0)
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# ------------------------------------------------------------------ addr_mask = (1 << config["address_width"]) - 1
# Write transfer to reg2 strobe_mask = (1 << config["byte_width"]) - 1
# ------------------------------------------------------------------
masters["reg2"].PREADY.value = 1
s_apb.PADDR.value = WRITE_ADDR
s_apb.PWDATA.value = WRITE_DATA
s_apb.PSTRB.value = 0xF
s_apb.PPROT.value = 0
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg2"].PSEL.value) == 1, "reg2 must be selected for write" address = txn["address"] & addr_mask
assert int(masters["reg2"].PWRITE.value) == 1, "Write strobes should propagate" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg2"].PADDR.value) == WRITE_ADDR, "Address should fan out"
assert int(masters["reg2"].PWDATA.value) == WRITE_DATA, "Write data should fan out"
for name, master in masters.items(): # Prime master-side inputs for the write phase
if name != "reg2": _set_value(entry["inputs"]["PREADY"], index, 1)
assert int(master.PSEL.value) == 0, f"{name} should remain idle on write" _set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PREADY.value) == 1, "Ready should mirror selected master" slave.PADDR.value = address
assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful write" slave.PWDATA.value = write_data
slave.PSTRB.value = strobe_mask
slave.PPROT.value = 0
slave.PWRITE.value = 1
slave.PSEL.value = 1
slave.PENABLE.value = 1
# Return to idle await Timer(1, units="ns")
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
masters["reg2"].PREADY.value = 0
await Timer(1, units="ns")
# ------------------------------------------------------------------ assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
# Read transfer from reg3 assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent"
# ------------------------------------------------------------------ assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
masters["reg3"].PRDATA.value = READ_DATA assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
masters["reg3"].PREADY.value = 1 assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, f"{master_name} must receive full strobes"
masters["reg3"].PSLVERR.value = 0
s_apb.PADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_apb.PSEL.value = 1 if other_name == master_name and other_idx == index:
s_apb.PENABLE.value = 1 continue
s_apb.PWRITE.value = 0 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
await Timer(1, units="ns") assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master"
assert int(slave.PSLVERR.value) == 0, "No error expected during write"
assert int(masters["reg3"].PSEL.value) == 1, "reg3 must be selected for read" # Return to idle for next transaction
assert int(masters["reg3"].PWRITE.value) == 0, "Read should deassert write" slave.PSEL.value = 0
assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Read address should propagate" slave.PENABLE.value = 0
slave.PWRITE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
await Timer(1, units="ns")
for name, master in masters.items(): # ------------------------------------------------------------------
if name != "reg3": # Read phase
assert int(master.PSEL.value) == 0, f"{name} should remain idle on read" # ------------------------------------------------------------------
read_data = _read_pattern(address, config["data_width"])
_set_value(entry["inputs"]["PRDATA"], index, read_data)
_set_value(entry["inputs"]["PREADY"], index, 1)
_set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return from master" slave.PADDR.value = address
assert int(s_apb.PREADY.value) == 1, "Ready must follow selected master" slave.PWRITE.value = 0
assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful read" slave.PSEL.value = 1
slave.PENABLE.value = 1
# Back to idle await Timer(1, units="ns")
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
masters["reg3"].PREADY.value = 0 assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should deassert write for reads"
await Timer(1, units="ns") assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master"
assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master"
assert int(slave.PSLVERR.value) == 0, "Read should complete without error"
# Reset to idle before progressing
slave.PSEL.value = 0
slave.PENABLE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
_set_value(entry["inputs"]["PRDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,7 +1,10 @@
"""Pytest wrapper launching the APB4 cocotb smoke test.""" """Pytest wrapper launching the APB4 cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import logging
import pytest import pytest
from peakrdl_busdecoder.cpuif.apb4.apb4_cpuif_flat import APB4CpuifFlat from peakrdl_busdecoder.cpuif.apb4.apb4_cpuif_flat import APB4CpuifFlat
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_apb4_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the APB4 design and execute the cocotb smoke test.""" def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each APB4 design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=APB4CpuifFlat, cpuif_cls=APB4CpuifFlat,
control_signal="PSEL",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,39 @@ def test_apb4_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
try:
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
log_file=str(build_root / "build.log"),
)
except SystemExit as e:
# Print build log on failure for easier debugging
log_path = build_root / "build.log"
if log_path.exists():
logging.error("\n\n=== Build Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Build Log ===\n")
if e.code != 0:
raise
runner.build( try:
sources=sources, runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, test_module="tests.cocotb.apb4.smoke.test_register_access",
) build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
runner.test( extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
hdl_toplevel=module_path.stem, )
test_module="tests.cocotb.apb4.smoke.test_register_access", except SystemExit as e:
build_dir=build_dir, # Print simulation log on failure for easier debugging
log_file=str(tmp_path / "sim.log"), log_path = build_root / "simulation.log"
) if log_path.exists():
logging.error("\n\n=== Simulation Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Simulation Log ===\n")
if e.code != 0:
raise

View File

@@ -1,15 +1,19 @@
"""AXI4-Lite smoke test ensuring address decode fanout works.""" """AXI4-Lite smoke test driven from SystemRDL-generated register maps."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x4 from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
READ_ADDR = 0x8
WRITE_DATA = 0x1357_9BDF
READ_DATA = 0x2468_ACED
class _AxilSlaveShim: class _AxilSlaveShim:
"""Accessor for AXI4-Lite slave ports on the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_axil" prefix = "s_axil"
self.AWREADY = getattr(dut, f"{prefix}_AWREADY") self.AWREADY = getattr(dut, f"{prefix}_AWREADY")
@@ -33,129 +37,177 @@ class _AxilSlaveShim:
self.RRESP = getattr(dut, f"{prefix}_RRESP") self.RRESP = getattr(dut, f"{prefix}_RRESP")
class _AxilMasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): payload = os.environ.get("RDL_TEST_CONFIG")
self.AWREADY = getattr(dut, f"{base}_AWREADY") if payload is None:
self.AWVALID = getattr(dut, f"{base}_AWVALID") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.AWADDR = getattr(dut, f"{base}_AWADDR") return json.loads(payload)
self.AWPROT = getattr(dut, f"{base}_AWPROT")
self.WREADY = getattr(dut, f"{base}_WREADY")
self.WVALID = getattr(dut, f"{base}_WVALID")
self.WDATA = getattr(dut, f"{base}_WDATA")
self.WSTRB = getattr(dut, f"{base}_WSTRB")
self.BREADY = getattr(dut, f"{base}_BREADY")
self.BVALID = getattr(dut, f"{base}_BVALID")
self.BRESP = getattr(dut, f"{base}_BRESP")
self.ARREADY = getattr(dut, f"{base}_ARREADY")
self.ARVALID = getattr(dut, f"{base}_ARVALID")
self.ARADDR = getattr(dut, f"{base}_ARADDR")
self.ARPROT = getattr(dut, f"{base}_ARPROT")
self.RREADY = getattr(dut, f"{base}_RREADY")
self.RVALID = getattr(dut, f"{base}_RVALID")
self.RDATA = getattr(dut, f"{base}_RDATA")
self.RRESP = getattr(dut, f"{base}_RRESP")
def _axil_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_axil", None) or _AxilSlaveShim(dut) return resolve_handle(handle, indices)
def _axil_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _AxilMasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
prefix = master["port_prefix"]
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"AWVALID": SignalHandle(dut, f"{prefix}_AWVALID"),
"AWADDR": SignalHandle(dut, f"{prefix}_AWADDR"),
"AWPROT": SignalHandle(dut, f"{prefix}_AWPROT"),
"WVALID": SignalHandle(dut, f"{prefix}_WVALID"),
"WDATA": SignalHandle(dut, f"{prefix}_WDATA"),
"WSTRB": SignalHandle(dut, f"{prefix}_WSTRB"),
"ARVALID": SignalHandle(dut, f"{prefix}_ARVALID"),
"ARADDR": SignalHandle(dut, f"{prefix}_ARADDR"),
"ARPROT": SignalHandle(dut, f"{prefix}_ARPROT"),
},
"inputs": {
"AWREADY": SignalHandle(dut, f"{prefix}_AWREADY"),
"WREADY": SignalHandle(dut, f"{prefix}_WREADY"),
"BVALID": SignalHandle(dut, f"{prefix}_BVALID"),
"BRESP": SignalHandle(dut, f"{prefix}_BRESP"),
"ARREADY": SignalHandle(dut, f"{prefix}_ARREADY"),
"RVALID": SignalHandle(dut, f"{prefix}_RVALID"),
"RDATA": SignalHandle(dut, f"{prefix}_RDATA"),
"RRESP": SignalHandle(dut, f"{prefix}_RRESP"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x3105) ^ 0x1357_9BDF) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0x2468_ACED) + width) & mask
@cocotb.test() @cocotb.test()
async def test_axi4lite_read_write_paths(dut): async def test_axi4lite_address_decoding(dut) -> None:
"""Drive AXI4-Lite slave channels and validate master side wiring.""" """Stimulate AXI4-Lite slave channels and verify master port selection."""
s_axil = _axil_slave(dut) config = _load_config()
masters = { slave = _AxilSlaveShim(dut)
"reg1": _axil_master(dut, "m_axil_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _axil_master(dut, "m_axil_reg2"),
"reg3": _axil_master(dut, "m_axil_reg3"),
}
# Default slave-side inputs slave.AWVALID.value = 0
s_axil.AWVALID.value = 0 slave.AWADDR.value = 0
s_axil.AWADDR.value = 0 slave.AWPROT.value = 0
s_axil.AWPROT.value = 0 slave.WVALID.value = 0
s_axil.WVALID.value = 0 slave.WDATA.value = 0
s_axil.WDATA.value = 0 slave.WSTRB.value = 0
s_axil.WSTRB.value = 0 slave.BREADY.value = 0
s_axil.BREADY.value = 0 slave.ARVALID.value = 0
s_axil.ARVALID.value = 0 slave.ARADDR.value = 0
s_axil.ARADDR.value = 0 slave.ARPROT.value = 0
s_axil.ARPROT.value = 0 slave.RREADY.value = 0
s_axil.RREADY.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.AWREADY.value = 0 entry = masters[master_name]
master.WREADY.value = 0 _set_value(entry["inputs"]["AWREADY"], idx, 0)
master.BVALID.value = 0 _set_value(entry["inputs"]["WREADY"], idx, 0)
master.BRESP.value = 0 _set_value(entry["inputs"]["BVALID"], idx, 0)
master.ARREADY.value = 0 _set_value(entry["inputs"]["BRESP"], idx, 0)
master.RVALID.value = 0 _set_value(entry["inputs"]["ARREADY"], idx, 0)
master.RDATA.value = 0 _set_value(entry["inputs"]["RVALID"], idx, 0)
master.RRESP.value = 0 _set_value(entry["inputs"]["RDATA"], idx, 0)
_set_value(entry["inputs"]["RRESP"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# -------------------------------------------------------------- addr_mask = (1 << config["address_width"]) - 1
# Write transaction targeting reg2 strobe_mask = (1 << config["byte_width"]) - 1
# --------------------------------------------------------------
s_axil.AWADDR.value = WRITE_ADDR
s_axil.AWPROT.value = 0
s_axil.AWVALID.value = 1
s_axil.WDATA.value = WRITE_DATA
s_axil.WSTRB.value = 0xF
s_axil.WVALID.value = 1
s_axil.BREADY.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg2"].AWVALID.value) == 1, "reg2 AWVALID should follow slave" address = txn["address"] & addr_mask
assert int(masters["reg2"].WVALID.value) == 1, "reg2 WVALID should follow slave" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg2"].AWADDR.value) == WRITE_ADDR, "AWADDR should fan out"
assert int(masters["reg2"].WDATA.value) == WRITE_DATA, "WDATA should fan out"
assert int(masters["reg2"].WSTRB.value) == 0xF, "WSTRB should propagate"
for name, master in masters.items(): slave.AWADDR.value = address
if name != "reg2": slave.AWPROT.value = 0
assert int(master.AWVALID.value) == 0, f"{name} AWVALID should stay low" slave.AWVALID.value = 1
assert int(master.WVALID.value) == 0, f"{name} WVALID should stay low" slave.WDATA.value = write_data
slave.WSTRB.value = strobe_mask
slave.WVALID.value = 1
slave.BREADY.value = 1
# Release write channel await Timer(1, units="ns")
s_axil.AWVALID.value = 0
s_axil.WVALID.value = 0
s_axil.BREADY.value = 0
await Timer(1, units="ns")
# -------------------------------------------------------------- assert _get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted"
# Read transaction targeting reg3 assert _get_int(entry["outputs"]["AWADDR"], index) == address, f"{master_name} must receive AWADDR"
# -------------------------------------------------------------- assert _get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted"
masters["reg3"].RVALID.value = 1 assert _get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA"
masters["reg3"].RDATA.value = READ_DATA assert _get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB"
masters["reg3"].RRESP.value = 0
s_axil.ARADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_axil.ARPROT.value = 0 if other_name == master_name and other_idx == index:
s_axil.ARVALID.value = 1 continue
s_axil.RREADY.value = 1 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0
), f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}"
assert (
_get_int(other_entry["outputs"]["WVALID"], other_idx) == 0
), f"{other_name}{other_idx} WVALID should remain low during {txn['label']}"
await Timer(1, units="ns") slave.AWVALID.value = 0
slave.WVALID.value = 0
slave.BREADY.value = 0
await Timer(1, units="ns")
assert int(masters["reg3"].ARVALID.value) == 1, "reg3 ARVALID should follow slave" read_data = _read_pattern(address, config["data_width"])
assert int(masters["reg3"].ARADDR.value) == READ_ADDR, "ARADDR should fan out" _set_value(entry["inputs"]["RVALID"], index, 1)
_set_value(entry["inputs"]["RDATA"], index, read_data)
_set_value(entry["inputs"]["RRESP"], index, 0)
for name, master in masters.items(): slave.ARADDR.value = address
if name != "reg3": slave.ARPROT.value = 0
assert int(master.ARVALID.value) == 0, f"{name} ARVALID should stay low" slave.ARVALID.value = 1
slave.RREADY.value = 1
assert int(s_axil.RVALID.value) == 1, "Slave should raise RVALID when master responds" await Timer(1, units="ns")
assert int(s_axil.RDATA.value) == READ_DATA, "Read data should return to slave"
assert int(s_axil.RRESP.value) == 0, "No error expected for read"
# Return to idle assert _get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID"
s_axil.ARVALID.value = 0 assert _get_int(entry["outputs"]["ARADDR"], index) == address, f"{master_name} must receive ARADDR"
s_axil.RREADY.value = 0
masters["reg3"].RVALID.value = 0 for other_name, other_idx in _all_index_pairs(masters):
await Timer(1, units="ns") if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0
), f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}"
assert int(slave.RVALID.value) == 1, "Slave should observe RVALID when master responds"
assert int(slave.RDATA.value) == read_data, "Read data must fold back to slave"
assert int(slave.RRESP.value) == 0, "Read response should indicate success"
slave.ARVALID.value = 0
slave.RREADY.value = 0
_set_value(entry["inputs"]["RVALID"], index, 0)
_set_value(entry["inputs"]["RDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,5 +1,8 @@
"""Pytest wrapper launching the AXI4-Lite cocotb smoke test.""" """Pytest wrapper launching the AXI4-Lite cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_axi4lite_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the AXI4-Lite design and execute the cocotb smoke test.""" def test_axi4lite_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each AXI4-Lite design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / "rdl" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=AXI4LiteCpuifFlat, cpuif_cls=AXI4LiteCpuifFlat,
control_signal="AWVALID",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,18 @@ def test_axi4lite_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
runner.build( runner.build(
sources=sources, sources=sources,
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, build_dir=sim_build,
) )
runner.test( runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
test_module="tests.cocotb.axi4lite.smoke.test_register_access", test_module="tests.cocotb.axi4lite.smoke.test_register_access",
build_dir=build_dir, build_dir=sim_build,
log_file=str(tmp_path / "sim.log"), log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
) )

View File

@@ -1,3 +1,10 @@
from pathlib import Path """Manifest of SystemRDL sources used by the cocotb simulations."""
rdls = map(Path, ["simple.rdl", "multiple_reg.rdl"]) RDL_CASES: list[tuple[str, str]] = [
("simple.rdl", "simple_test"),
("multiple_reg.rdl", "multi_reg"),
("deep_hierarchy.rdl", "deep_hierarchy"),
("wide_status.rdl", "wide_status"),
("variable_layout.rdl", "variable_layout"),
("asymmetric_bus.rdl", "asymmetric_bus"),
]

View File

@@ -0,0 +1,69 @@
"""Utilities for resolving cocotb signal handles across simulators."""
from __future__ import annotations
from typing import Any, Iterable
class SignalHandle:
"""
Wrapper that resolves array elements even when the simulator does not expose
unpacked arrays through ``handle[idx]``.
"""
def __init__(self, dut, name: str) -> None:
self._dut = dut
self._name = name
self._base = getattr(dut, name, None)
self._cache: dict[tuple[int, ...], Any] = {}
def resolve(self, indices: tuple[int, ...]):
if not indices:
return self._base if self._base is not None else self._lookup(tuple())
if indices not in self._cache:
self._cache[indices] = self._direct_or_lookup(indices)
return self._cache[indices]
def _direct_or_lookup(self, indices: tuple[int, ...]):
if self._base is not None:
ref = self._base
try:
for idx in indices:
ref = ref[idx]
return ref
except (IndexError, TypeError, AttributeError):
pass
return self._lookup(indices)
def _lookup(self, indices: tuple[int, ...]):
suffix = "".join(f"[{idx}]" for idx in indices)
path = f"{self._name}{suffix}"
try:
return getattr(self._dut, path)
except AttributeError:
pass
errors: list[Exception] = []
for extended in (False, True):
try:
return self._dut._id(path, extended=extended)
except (AttributeError, ValueError) as exc:
errors.append(exc)
raise AttributeError(f"Unable to resolve handle '{path}' via dut._id") from errors[-1]
def resolve_handle(handle, indices: Iterable[int]):
"""Resolve either a regular cocotb handle or a ``SignalHandle`` wrapper."""
index_tuple = tuple(indices)
if isinstance(handle, SignalHandle):
return handle.resolve(index_tuple)
ref = handle
for idx in index_tuple:
ref = ref[idx]
return ref

View File

@@ -0,0 +1,105 @@
regfile port_rf {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_speed[3:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_width[8:4];
} control @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} error_count[15:0];
field {
sw = r;
hw = w;
reset = 0x0;
} retry_count[31:16];
} counters @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} qos[7:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} virtual_channel[9:8];
} qos @ 0x8;
};
addrmap asymmetric_bus {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} control[3:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} id[15:4];
} control @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} status_flags[19:0];
} status @ 0x4;
reg {
regwidth = 64;
field {
sw = rw;
hw = rw;
reset = 0x00abcdef;
} timestamp_low[31:0];
field {
sw = rw;
hw = rw;
reset = 0x00123456;
} timestamp_high[55:32];
} timestamp @ 0x8;
reg {
regwidth = 128;
field {
sw = rw;
hw = rw;
reset = 0x0;
} extended_id[63:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} parity[64:64];
} extended @ 0x10;
port_rf port[6] @ 0x40 += 0x20;
};

View File

@@ -0,0 +1,115 @@
addrmap deep_hierarchy {
regfile context_rf {
reg {
field {
sw = rw;
hw = r;
reset = 0x1;
} enable[7:0];
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} status[15:8];
field {
sw = rw;
hw = rw;
reset = 0x55;
} mode[23:16];
} command @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x1234;
} threshold[15:0];
} threshold @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} counter[31:0];
} counter @ 0x8;
};
regfile engine_rf {
context_rf context[3] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} timeout[15:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} priority[19:16];
} config @ 0x30;
reg {
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} error[31:0];
} error_log @ 0x34;
};
addrmap fabric_slice {
engine_rf engines[4] @ 0x0;
regfile monitor_rf {
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} perf_count[31:0];
} perf @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} last_error[31:0];
} last_error @ 0x4;
};
monitor_rf monitor @ 0x400;
reg {
field {
sw = rw;
hw = rw;
reset = 0xdeadbeef;
} fabric_ctrl[31:0];
} fabric_ctrl @ 0x500;
};
fabric_slice slices[2] @ 0x0 += 0x800;
reg {
field {
sw = rw;
hw = rw;
reset = 0x1;
} global_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x4;
} debug_level[3:1];
} global_control @ 0x1000;
};

View File

@@ -0,0 +1,156 @@
reg ctrl_reg_t {
desc = "Control register shared across channels.";
field {
sw = rw;
hw = rw;
reset = 0x1;
} enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x2;
} mode[3:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} prescale[11:4];
};
regfile channel_rf {
ctrl_reg_t control @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} gain[11:0];
field {
sw = rw;
hw = rw;
reset = 0x200;
} offset[23:12];
} calibrate @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} sample_count[15:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} error_count[31:16];
} counters @ 0x8;
};
regfile slice_rf {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} slope[15:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} intercept[31:16];
} calibration @ 0x0;
reg {
regwidth = 64;
field {
sw = r;
hw = w;
reset = 0x0;
} min_val[31:0];
field {
sw = r;
hw = w;
reset = 0x0;
} max_val[63:32];
} range @ 0x4;
};
regfile tile_rf {
channel_rf channel[3] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} tile_mode[1:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} tile_enable[2:2];
} tile_ctrl @ 0x100;
slice_rf slice[2] @ 0x200;
};
regfile summary_rf {
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} total_errors[31:0];
} errors @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} total_samples[31:0];
} samples @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} interrupt_enable[7:0];
} interrupt_enable @ 0x8;
};
addrmap variable_layout {
tile_rf tiles[2] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} watchdog_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x100;
} watchdog_timeout[16:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} watchdog_mode[18:17];
} watchdog @ 0x2000;
summary_rf summary @ 0x3000;
};

View File

@@ -0,0 +1,69 @@
reg status_reg_t {
regwidth = 64;
desc = "Status register capturing wide flags and sticky bits.";
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} flags[62:0];
field {
sw = rw;
hw = r;
reset = 0x1;
} sticky_bit[63:63];
};
reg metrics_reg_t {
regwidth = 64;
desc = "Metrics register pairing counters with thresholds.";
field {
sw = r;
hw = w;
reset = 0x0;
} count[31:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} threshold[63:32];
};
addrmap wide_status {
status_reg_t status_blocks[16] @ 0x0;
metrics_reg_t metrics[4] @ 0x400;
reg {
regwidth = 128;
field {
sw = rw;
hw = rw;
reset = 0x0;
} configuration[127:0];
} configuration @ 0x800;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} version_major[7:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} version_minor[15:8];
field {
sw = rw;
hw = rw;
reset = 0x0100;
} build[31:16];
} version @ 0x900;
};

View File

@@ -1,9 +1,13 @@
"""Common utilities for cocotb testbenches.""" """Common utilities for cocotb testbenches."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from systemrdl import RDLCompiler from systemrdl import RDLCompiler
from systemrdl.node import AddressableNode, AddrmapNode, RegNode
from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif
from peakrdl_busdecoder.exporter import BusDecoderExporter from peakrdl_busdecoder.exporter import BusDecoderExporter
@@ -65,3 +69,206 @@ def get_verilog_sources(module_path: Path, package_path: Path, intf_files: list[
# Add module file # Add module file
sources.append(str(module_path)) sources.append(str(module_path))
return sources return sources
def prepare_cpuif_case(
rdl_source: str,
top_name: str,
output_dir: Path,
cpuif_cls: type[BaseCpuif],
*,
control_signal: str,
max_samples_per_master: int = 3,
exporter_kwargs: dict[str, Any] | None = None,
) -> tuple[Path, Path, dict[str, Any]]:
"""
Compile SystemRDL, export the CPUIF, and build a configuration payload for cocotb tests.
Parameters
----------
rdl_source:
Path to the SystemRDL source file.
top_name:
Name of the top-level addrmap to elaborate.
output_dir:
Directory where generated HDL will be written.
cpuif_cls:
CPUIF implementation class to use during export.
control_signal:
Name of the control signal used to identify master ports
(``"PSEL"`` for APB, ``"AWVALID"`` for AXI4-Lite, etc.).
max_samples_per_master:
Limit for the number of register addresses sampled per master in the test matrix.
exporter_kwargs:
Optional keyword overrides passed through to :class:`BusDecoderExporter`.
Returns
-------
tuple
``(module_path, package_path, config_dict)``, where the configuration dictionary
is JSON-serializable and describes masters, indices, and sampled transactions.
"""
compiler = RDLCompiler()
compiler.compile_file(rdl_source)
root = compiler.elaborate(top_name)
top_node = root.top # type: ignore[assignment]
export_kwargs: dict[str, Any] = {"cpuif_cls": cpuif_cls}
if exporter_kwargs:
export_kwargs.update(exporter_kwargs)
exporter = BusDecoderExporter()
exporter.export(root, str(output_dir), **export_kwargs)
module_name = export_kwargs.get("module_name", top_name)
package_name = export_kwargs.get("package_name", f"{top_name}_pkg")
module_path = Path(output_dir) / f"{module_name}.sv"
package_path = Path(output_dir) / f"{package_name}.sv"
config = _build_case_config(
top_node,
exporter.cpuif,
control_signal,
max_samples_per_master=max_samples_per_master,
)
config["address_width"] = exporter.cpuif.addr_width
config["data_width"] = exporter.cpuif.data_width
config["byte_width"] = exporter.cpuif.data_width // 8
return module_path, package_path, config
def _build_case_config(
top_node: AddrmapNode,
cpuif: BaseCpuif,
control_signal: str,
*,
max_samples_per_master: int,
) -> dict[str, Any]:
master_entries: dict[str, dict[str, Any]] = {}
for child in cpuif.addressable_children:
signal = cpuif.signal(control_signal, child)
# Example: m_apb_tiles_PSEL[N_TILESS] -> m_apb_tiles
base = signal.split("[", 1)[0]
suffix = f"_{control_signal}"
if not base.endswith(suffix):
raise ValueError(f"Unable to derive port prefix from '{signal}'")
port_prefix = base[: -len(suffix)]
master_entries[child.inst_name] = {
"inst_name": child.inst_name,
"port_prefix": port_prefix,
"is_array": bool(child.is_array),
"dimensions": list(child.array_dimensions or []),
"indices": set(),
}
# Map each register to its top-level master and collect addresses
groups: dict[tuple[str, tuple[int, ...]], list[tuple[int, str]]] = defaultdict(list)
def visit(node: AddressableNode) -> None:
if isinstance(node, RegNode):
master = node # type: AddressableNode
while master.parent is not top_node:
parent = master.parent
if not isinstance(parent, AddressableNode):
raise RuntimeError("Encountered unexpected hierarchy while resolving master node")
master = parent
inst_name = master.inst_name
if inst_name not in master_entries:
# Handles cases where the register itself is the master (direct child of top)
signal = cpuif.signal(control_signal, master)
base = signal.split("[", 1)[0]
suffix = f"_{control_signal}"
if not base.endswith(suffix):
raise ValueError(f"Unable to derive port prefix from '{signal}'")
port_prefix = base[: -len(suffix)]
master_entries[inst_name] = {
"inst_name": inst_name,
"port_prefix": port_prefix,
"is_array": bool(master.is_array),
"dimensions": list(master.array_dimensions or []),
"indices": set(),
}
idx_tuple = tuple(master.current_idx or [])
master_entries[inst_name]["indices"].add(idx_tuple)
relative_addr = int(node.absolute_address) - int(top_node.absolute_address)
full_path = node.get_path()
label = full_path.split(".", 1)[1] if "." in full_path else full_path
groups[(inst_name, idx_tuple)].append((relative_addr, label))
for child in node.children(unroll=True):
if isinstance(child, AddressableNode):
visit(child)
visit(top_node)
masters_list = []
for entry in master_entries.values():
indices = entry["indices"] or {()}
entry["indices"] = [list(idx) for idx in sorted(indices)]
masters_list.append(
{
"inst_name": entry["inst_name"],
"port_prefix": entry["port_prefix"],
"is_array": entry["is_array"],
"dimensions": entry["dimensions"],
"indices": entry["indices"],
}
)
transactions = []
for (inst_name, idx_tuple), items in groups.items():
addresses = sorted({addr for addr, _ in items})
samples = _sample_addresses(addresses, max_samples_per_master)
for addr in samples:
label = next(lbl for candidate, lbl in items if candidate == addr)
transactions.append(
{
"address": addr,
"master": inst_name,
"index": list(idx_tuple),
"label": label,
}
)
transactions.sort(key=lambda item: (item["master"], item["index"], item["address"]))
masters_list.sort(key=lambda item: item["inst_name"])
return {
"masters": masters_list,
"transactions": transactions,
}
def _sample_addresses(addresses: list[int], max_samples: int) -> list[int]:
if len(addresses) <= max_samples:
return addresses
samples: list[int] = []
samples.append(addresses[0])
if len(addresses) > 1:
samples.append(addresses[-1])
if len(addresses) > 2:
mid = addresses[len(addresses) // 2]
if mid not in samples:
samples.append(mid)
idx = 1
while len(samples) < max_samples:
pos = (len(addresses) * idx) // max_samples
candidate = addresses[min(pos, len(addresses) - 1)]
if candidate not in samples:
samples.append(candidate)
idx += 1
samples.sort()
return samples

2
uv.lock generated
View File

@@ -608,7 +608,7 @@ wheels = [
[[package]] [[package]]
name = "peakrdl-busdecoder" name = "peakrdl-busdecoder"
version = "0.4.0" version = "0.5.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "jinja2" }, { name = "jinja2" },