This commit is contained in:
Arnav Sacheti
2025-10-27 22:53:38 -07:00
parent d7481e71ba
commit bbbeab85c5
12 changed files with 1193 additions and 322 deletions

View File

@@ -1,15 +1,18 @@
"""APB3 smoke tests for generated multi-register design.""" """APB3 smoke tests generated from SystemRDL sources."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x0
READ_ADDR = 0x8
WRITE_DATA = 0xCAFEBABE
READ_DATA = 0x0BAD_F00D
class _Apb3SlaveShim: class _Apb3SlaveShim:
"""Accessor for the APB3 slave signals on the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_apb" prefix = "s_apb"
self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PSEL = getattr(dut, f"{prefix}_PSEL")
@@ -22,102 +25,164 @@ class _Apb3SlaveShim:
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
class _Apb3MasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): payload = os.environ.get("RDL_TEST_CONFIG")
self.PSEL = getattr(dut, f"{base}_PSEL") if payload is None:
self.PENABLE = getattr(dut, f"{base}_PENABLE") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.PWRITE = getattr(dut, f"{base}_PWRITE") return json.loads(payload)
self.PADDR = getattr(dut, f"{base}_PADDR")
self.PWDATA = getattr(dut, f"{base}_PWDATA")
self.PRDATA = getattr(dut, f"{base}_PRDATA")
self.PREADY = getattr(dut, f"{base}_PREADY")
self.PSLVERR = getattr(dut, f"{base}_PSLVERR")
def _apb3_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_apb", None) or _Apb3SlaveShim(dut) ref = handle
for idx in indices:
ref = ref[idx]
return ref
def _apb3_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _Apb3MasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
prefix = master["port_prefix"]
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": getattr(dut, f"{prefix}_PSEL"),
"PENABLE": getattr(dut, f"{prefix}_PENABLE"),
"PWRITE": getattr(dut, f"{prefix}_PWRITE"),
"PADDR": getattr(dut, f"{prefix}_PADDR"),
"PWDATA": getattr(dut, f"{prefix}_PWDATA"),
},
"inputs": {
"PRDATA": getattr(dut, f"{prefix}_PRDATA"),
"PREADY": getattr(dut, f"{prefix}_PREADY"),
"PSLVERR": getattr(dut, f"{prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x2041) ^ 0xCAFEBABE) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0x0BAD_F00D) + width) & mask
@cocotb.test() @cocotb.test()
async def test_apb3_read_write_paths(dut): async def test_apb3_address_decoding(dut) -> None:
"""Exercise APB3 slave interface and observe master fanout.""" """Exercise the APB3 slave interface against sampled register addresses."""
s_apb = _apb3_slave(dut) config = _load_config()
masters = { slave = _Apb3SlaveShim(dut)
"reg1": _apb3_master(dut, "m_apb_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _apb3_master(dut, "m_apb_reg2"),
"reg3": _apb3_master(dut, "m_apb_reg3"),
}
s_apb.PSEL.value = 0 slave.PSEL.value = 0
s_apb.PENABLE.value = 0 slave.PENABLE.value = 0
s_apb.PWRITE.value = 0 slave.PWRITE.value = 0
s_apb.PADDR.value = 0 slave.PADDR.value = 0
s_apb.PWDATA.value = 0 slave.PWDATA.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.PRDATA.value = 0 entry = masters[master_name]
master.PREADY.value = 0 _set_value(entry["inputs"]["PRDATA"], idx, 0)
master.PSLVERR.value = 0 _set_value(entry["inputs"]["PREADY"], idx, 0)
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# Write to reg1 addr_mask = (1 << config["address_width"]) - 1
masters["reg1"].PREADY.value = 1
s_apb.PADDR.value = WRITE_ADDR
s_apb.PWDATA.value = WRITE_DATA
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg1"].PSEL.value) == 1, "reg1 should be selected for write" address = txn["address"] & addr_mask
assert int(masters["reg1"].PWRITE.value) == 1, "Write should propagate to master" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg1"].PADDR.value) == WRITE_ADDR, "Address should reach selected master"
assert int(masters["reg1"].PWDATA.value) == WRITE_DATA, "Write data should fan out"
for name, master in masters.items(): _set_value(entry["inputs"]["PREADY"], index, 1)
if name != "reg1": _set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(master.PSEL.value) == 0, f"{name} must idle during reg1 write"
assert int(s_apb.PREADY.value) == 1, "Ready must reflect selected master" slave.PADDR.value = address
assert int(s_apb.PSLVERR.value) == 0, "Write should not signal error" slave.PWDATA.value = write_data
slave.PWRITE.value = 1
slave.PSEL.value = 1
slave.PENABLE.value = 1
s_apb.PSEL.value = 0 await Timer(1, units="ns")
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
masters["reg1"].PREADY.value = 0
await Timer(1, units="ns")
# Read from reg3 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
masters["reg3"].PRDATA.value = READ_DATA assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction"
masters["reg3"].PREADY.value = 1 assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
masters["reg3"].PSLVERR.value = 0 assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
s_apb.PADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_apb.PSEL.value = 1 if other_name == master_name and other_idx == index:
s_apb.PENABLE.value = 1 continue
s_apb.PWRITE.value = 0 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
await Timer(1, units="ns") assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master"
assert int(slave.PSLVERR.value) == 0, "Write should complete without error"
assert int(masters["reg3"].PSEL.value) == 1, "reg3 should be selected for read" slave.PSEL.value = 0
assert int(masters["reg3"].PWRITE.value) == 0, "Read should clear write" slave.PENABLE.value = 0
assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Address should reach read target" slave.PWRITE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
await Timer(1, units="ns")
for name, master in masters.items(): # ------------------------------------------------------------------
if name != "reg3": # Read phase
assert int(master.PSEL.value) == 0, f"{name} must idle during reg3 read" # ------------------------------------------------------------------
read_data = _read_pattern(address, config["data_width"])
_set_value(entry["inputs"]["PRDATA"], index, read_data)
_set_value(entry["inputs"]["PREADY"], index, 1)
_set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return to slave" slave.PADDR.value = address
assert int(s_apb.PREADY.value) == 1, "Read should acknowledge" slave.PWRITE.value = 0
assert int(s_apb.PSLVERR.value) == 0, "Read should not signal error" slave.PSEL.value = 1
slave.PENABLE.value = 1
s_apb.PSEL.value = 0 await Timer(1, units="ns")
s_apb.PENABLE.value = 0
masters["reg3"].PREADY.value = 0 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
await Timer(1, units="ns") assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should clear write during read"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave"
assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read"
assert int(slave.PSLVERR.value) == 0, "Read should not signal an error"
slave.PSEL.value = 0
slave.PENABLE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
_set_value(entry["inputs"]["PRDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,5 +1,8 @@
"""Pytest wrapper launching the APB3 cocotb smoke test.""" """Pytest wrapper launching the APB3 cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_apb3_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the APB3 design and execute the cocotb smoke test.""" def test_apb3_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each APB3 design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=APB3CpuifFlat, cpuif_cls=APB3CpuifFlat,
control_signal="PSEL",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,18 @@ def test_apb3_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
runner.build( runner.build(
sources=sources, sources=sources,
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, build_dir=sim_build,
) )
runner.test( runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb3.smoke.test_register_access", test_module="tests.cocotb.apb3.smoke.test_register_access",
build_dir=build_dir, build_dir=sim_build,
log_file=str(tmp_path / "sim.log"), log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
) )

View File

@@ -1,15 +1,18 @@
"""APB4 smoke tests using generated multi-register design.""" """APB4 smoke tests generated from SystemRDL sources."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x4
READ_ADDR = 0x8
WRITE_DATA = 0x1234_5678
READ_DATA = 0x89AB_CDEF
class _Apb4SlaveShim: class _Apb4SlaveShim:
"""Lightweight accessor for the APB4 slave side of the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_apb" prefix = "s_apb"
self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PSEL = getattr(dut, f"{prefix}_PSEL")
@@ -24,115 +27,178 @@ class _Apb4SlaveShim:
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
class _Apb4MasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): """Read the JSON payload describing the generated register topology."""
self.PSEL = getattr(dut, f"{base}_PSEL") payload = os.environ.get("RDL_TEST_CONFIG")
self.PENABLE = getattr(dut, f"{base}_PENABLE") if payload is None:
self.PWRITE = getattr(dut, f"{base}_PWRITE") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.PADDR = getattr(dut, f"{base}_PADDR") return json.loads(payload)
self.PPROT = getattr(dut, f"{base}_PPROT")
self.PWDATA = getattr(dut, f"{base}_PWDATA")
self.PSTRB = getattr(dut, f"{base}_PSTRB")
self.PRDATA = getattr(dut, f"{base}_PRDATA")
self.PREADY = getattr(dut, f"{base}_PREADY")
self.PSLVERR = getattr(dut, f"{base}_PSLVERR")
def _apb4_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_apb", None) or _Apb4SlaveShim(dut) """Index into hierarchical cocotb handles."""
ref = handle
for idx in indices:
ref = ref[idx]
return ref
def _apb4_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _Apb4MasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
port_prefix = master["port_prefix"]
entry = {
"port_prefix": port_prefix,
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": getattr(dut, f"{port_prefix}_PSEL"),
"PENABLE": getattr(dut, f"{port_prefix}_PENABLE"),
"PWRITE": getattr(dut, f"{port_prefix}_PWRITE"),
"PADDR": getattr(dut, f"{port_prefix}_PADDR"),
"PPROT": getattr(dut, f"{port_prefix}_PPROT"),
"PWDATA": getattr(dut, f"{port_prefix}_PWDATA"),
"PSTRB": getattr(dut, f"{port_prefix}_PSTRB"),
},
"inputs": {
"PRDATA": getattr(dut, f"{port_prefix}_PRDATA"),
"PREADY": getattr(dut, f"{port_prefix}_PREADY"),
"PSLVERR": getattr(dut, f"{port_prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x1021) ^ 0x1357_9BDF) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0xDEAD_BEE5) + width) & mask
@cocotb.test() @cocotb.test()
async def test_apb4_read_write_paths(dut): async def test_apb4_address_decoding(dut) -> None:
"""Drive APB4 slave signals and observe master activity.""" """Drive the APB4 slave interface and verify master fanout across all sampled registers."""
s_apb = _apb4_slave(dut) config = _load_config()
masters = { slave = _Apb4SlaveShim(dut)
"reg1": _apb4_master(dut, "m_apb_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _apb4_master(dut, "m_apb_reg2"),
"reg3": _apb4_master(dut, "m_apb_reg3"),
}
# Default slave side inputs slave.PSEL.value = 0
s_apb.PSEL.value = 0 slave.PENABLE.value = 0
s_apb.PENABLE.value = 0 slave.PWRITE.value = 0
s_apb.PWRITE.value = 0 slave.PADDR.value = 0
s_apb.PADDR.value = 0 slave.PPROT.value = 0
s_apb.PWDATA.value = 0 slave.PWDATA.value = 0
s_apb.PPROT.value = 0 slave.PSTRB.value = 0
s_apb.PSTRB.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.PRDATA.value = 0 entry = masters[master_name]
master.PREADY.value = 0 _set_value(entry["inputs"]["PRDATA"], idx, 0)
master.PSLVERR.value = 0 _set_value(entry["inputs"]["PREADY"], idx, 0)
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# ------------------------------------------------------------------ addr_mask = (1 << config["address_width"]) - 1
# Write transfer to reg2 strobe_mask = (1 << config["byte_width"]) - 1
# ------------------------------------------------------------------
masters["reg2"].PREADY.value = 1
s_apb.PADDR.value = WRITE_ADDR
s_apb.PWDATA.value = WRITE_DATA
s_apb.PSTRB.value = 0xF
s_apb.PPROT.value = 0
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg2"].PSEL.value) == 1, "reg2 must be selected for write" address = txn["address"] & addr_mask
assert int(masters["reg2"].PWRITE.value) == 1, "Write strobes should propagate" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg2"].PADDR.value) == WRITE_ADDR, "Address should fan out"
assert int(masters["reg2"].PWDATA.value) == WRITE_DATA, "Write data should fan out"
for name, master in masters.items(): # Prime master-side inputs for the write phase
if name != "reg2": _set_value(entry["inputs"]["PREADY"], index, 1)
assert int(master.PSEL.value) == 0, f"{name} should remain idle on write" _set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PREADY.value) == 1, "Ready should mirror selected master" slave.PADDR.value = address
assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful write" slave.PWDATA.value = write_data
slave.PSTRB.value = strobe_mask
slave.PPROT.value = 0
slave.PWRITE.value = 1
slave.PSEL.value = 1
slave.PENABLE.value = 1
# Return to idle await Timer(1, units="ns")
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
masters["reg2"].PREADY.value = 0
await Timer(1, units="ns")
# ------------------------------------------------------------------ assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
# Read transfer from reg3 assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent"
# ------------------------------------------------------------------ assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
masters["reg3"].PRDATA.value = READ_DATA assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
masters["reg3"].PREADY.value = 1 assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, f"{master_name} must receive full strobes"
masters["reg3"].PSLVERR.value = 0
s_apb.PADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_apb.PSEL.value = 1 if other_name == master_name and other_idx == index:
s_apb.PENABLE.value = 1 continue
s_apb.PWRITE.value = 0 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
await Timer(1, units="ns") assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master"
assert int(slave.PSLVERR.value) == 0, "No error expected during write"
assert int(masters["reg3"].PSEL.value) == 1, "reg3 must be selected for read" # Return to idle for next transaction
assert int(masters["reg3"].PWRITE.value) == 0, "Read should deassert write" slave.PSEL.value = 0
assert int(masters["reg3"].PADDR.value) == READ_ADDR, "Read address should propagate" slave.PENABLE.value = 0
slave.PWRITE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
await Timer(1, units="ns")
for name, master in masters.items(): # ------------------------------------------------------------------
if name != "reg3": # Read phase
assert int(master.PSEL.value) == 0, f"{name} should remain idle on read" # ------------------------------------------------------------------
read_data = _read_pattern(address, config["data_width"])
_set_value(entry["inputs"]["PRDATA"], index, read_data)
_set_value(entry["inputs"]["PREADY"], index, 1)
_set_value(entry["inputs"]["PSLVERR"], index, 0)
assert int(s_apb.PRDATA.value) == READ_DATA, "Read data should return from master" slave.PADDR.value = address
assert int(s_apb.PREADY.value) == 1, "Ready must follow selected master" slave.PWRITE.value = 0
assert int(s_apb.PSLVERR.value) == 0, "No error expected on successful read" slave.PSEL.value = 1
slave.PENABLE.value = 1
# Back to idle await Timer(1, units="ns")
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0 assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
masters["reg3"].PREADY.value = 0 assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should deassert write for reads"
await Timer(1, units="ns") assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master"
assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master"
assert int(slave.PSLVERR.value) == 0, "Read should complete without error"
# Reset to idle before progressing
slave.PSEL.value = 0
slave.PENABLE.value = 0
_set_value(entry["inputs"]["PREADY"], index, 0)
_set_value(entry["inputs"]["PRDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,5 +1,8 @@
"""Pytest wrapper launching the APB4 cocotb smoke test.""" """Pytest wrapper launching the APB4 cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_apb4_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the APB4 design and execute the cocotb smoke test.""" def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each APB4 design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=APB4CpuifFlat, cpuif_cls=APB4CpuifFlat,
control_signal="PSEL",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,18 @@ def test_apb4_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
runner.build( runner.build(
sources=sources, sources=sources,
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, build_dir=sim_build,
) )
runner.test( runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb4.smoke.test_register_access", test_module="tests.cocotb.apb4.smoke.test_register_access",
build_dir=build_dir, build_dir=sim_build,
log_file=str(tmp_path / "sim.log"), log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
) )

View File

@@ -1,15 +1,18 @@
"""AXI4-Lite smoke test ensuring address decode fanout works.""" """AXI4-Lite smoke test driven from SystemRDL-generated register maps."""
from __future__ import annotations
import json
import os
from typing import Any, Iterable
import cocotb import cocotb
from cocotb.triggers import Timer from cocotb.triggers import Timer
WRITE_ADDR = 0x4
READ_ADDR = 0x8
WRITE_DATA = 0x1357_9BDF
READ_DATA = 0x2468_ACED
class _AxilSlaveShim: class _AxilSlaveShim:
"""Accessor for AXI4-Lite slave ports on the DUT."""
def __init__(self, dut): def __init__(self, dut):
prefix = "s_axil" prefix = "s_axil"
self.AWREADY = getattr(dut, f"{prefix}_AWREADY") self.AWREADY = getattr(dut, f"{prefix}_AWREADY")
@@ -33,129 +36,180 @@ class _AxilSlaveShim:
self.RRESP = getattr(dut, f"{prefix}_RRESP") self.RRESP = getattr(dut, f"{prefix}_RRESP")
class _AxilMasterShim: def _load_config() -> dict[str, Any]:
def __init__(self, dut, base: str): payload = os.environ.get("RDL_TEST_CONFIG")
self.AWREADY = getattr(dut, f"{base}_AWREADY") if payload is None:
self.AWVALID = getattr(dut, f"{base}_AWVALID") raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
self.AWADDR = getattr(dut, f"{base}_AWADDR") return json.loads(payload)
self.AWPROT = getattr(dut, f"{base}_AWPROT")
self.WREADY = getattr(dut, f"{base}_WREADY")
self.WVALID = getattr(dut, f"{base}_WVALID")
self.WDATA = getattr(dut, f"{base}_WDATA")
self.WSTRB = getattr(dut, f"{base}_WSTRB")
self.BREADY = getattr(dut, f"{base}_BREADY")
self.BVALID = getattr(dut, f"{base}_BVALID")
self.BRESP = getattr(dut, f"{base}_BRESP")
self.ARREADY = getattr(dut, f"{base}_ARREADY")
self.ARVALID = getattr(dut, f"{base}_ARVALID")
self.ARADDR = getattr(dut, f"{base}_ARADDR")
self.ARPROT = getattr(dut, f"{base}_ARPROT")
self.RREADY = getattr(dut, f"{base}_RREADY")
self.RVALID = getattr(dut, f"{base}_RVALID")
self.RDATA = getattr(dut, f"{base}_RDATA")
self.RRESP = getattr(dut, f"{base}_RRESP")
def _axil_slave(dut): def _resolve(handle, indices: Iterable[int]):
return getattr(dut, "s_axil", None) or _AxilSlaveShim(dut) ref = handle
for idx in indices:
ref = ref[idx]
return ref
def _axil_master(dut, base: str): def _set_value(handle, indices: Iterable[int], value: int) -> None:
return getattr(dut, base, None) or _AxilMasterShim(dut, base) _resolve(handle, indices).value = value
def _get_int(handle, indices: Iterable[int]) -> int:
return int(_resolve(handle, indices).value)
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
table: dict[str, dict[str, Any]] = {}
for master in masters_cfg:
prefix = master["port_prefix"]
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"AWVALID": getattr(dut, f"{prefix}_AWVALID"),
"AWADDR": getattr(dut, f"{prefix}_AWADDR"),
"AWPROT": getattr(dut, f"{prefix}_AWPROT"),
"WVALID": getattr(dut, f"{prefix}_WVALID"),
"WDATA": getattr(dut, f"{prefix}_WDATA"),
"WSTRB": getattr(dut, f"{prefix}_WSTRB"),
"ARVALID": getattr(dut, f"{prefix}_ARVALID"),
"ARADDR": getattr(dut, f"{prefix}_ARADDR"),
"ARPROT": getattr(dut, f"{prefix}_ARPROT"),
},
"inputs": {
"AWREADY": getattr(dut, f"{prefix}_AWREADY"),
"WREADY": getattr(dut, f"{prefix}_WREADY"),
"BVALID": getattr(dut, f"{prefix}_BVALID"),
"BRESP": getattr(dut, f"{prefix}_BRESP"),
"ARREADY": getattr(dut, f"{prefix}_ARREADY"),
"RVALID": getattr(dut, f"{prefix}_RVALID"),
"RDATA": getattr(dut, f"{prefix}_RDATA"),
"RRESP": getattr(dut, f"{prefix}_RRESP"),
},
}
table[master["inst_name"]] = entry
return table
def _all_index_pairs(table: dict[str, dict[str, Any]]):
for name, entry in table.items():
for idx in entry["indices"]:
yield name, idx
def _write_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address * 0x3105) ^ 0x1357_9BDF) & mask
def _read_pattern(address: int, width: int) -> int:
mask = (1 << width) - 1
return ((address ^ 0x2468_ACED) + width) & mask
@cocotb.test() @cocotb.test()
async def test_axi4lite_read_write_paths(dut): async def test_axi4lite_address_decoding(dut) -> None:
"""Drive AXI4-Lite slave channels and validate master side wiring.""" """Stimulate AXI4-Lite slave channels and verify master port selection."""
s_axil = _axil_slave(dut) config = _load_config()
masters = { slave = _AxilSlaveShim(dut)
"reg1": _axil_master(dut, "m_axil_reg1"), masters = _build_master_table(dut, config["masters"])
"reg2": _axil_master(dut, "m_axil_reg2"),
"reg3": _axil_master(dut, "m_axil_reg3"),
}
# Default slave-side inputs slave.AWVALID.value = 0
s_axil.AWVALID.value = 0 slave.AWADDR.value = 0
s_axil.AWADDR.value = 0 slave.AWPROT.value = 0
s_axil.AWPROT.value = 0 slave.WVALID.value = 0
s_axil.WVALID.value = 0 slave.WDATA.value = 0
s_axil.WDATA.value = 0 slave.WSTRB.value = 0
s_axil.WSTRB.value = 0 slave.BREADY.value = 0
s_axil.BREADY.value = 0 slave.ARVALID.value = 0
s_axil.ARVALID.value = 0 slave.ARADDR.value = 0
s_axil.ARADDR.value = 0 slave.ARPROT.value = 0
s_axil.ARPROT.value = 0 slave.RREADY.value = 0
s_axil.RREADY.value = 0
for master in masters.values(): for master_name, idx in _all_index_pairs(masters):
master.AWREADY.value = 0 entry = masters[master_name]
master.WREADY.value = 0 _set_value(entry["inputs"]["AWREADY"], idx, 0)
master.BVALID.value = 0 _set_value(entry["inputs"]["WREADY"], idx, 0)
master.BRESP.value = 0 _set_value(entry["inputs"]["BVALID"], idx, 0)
master.ARREADY.value = 0 _set_value(entry["inputs"]["BRESP"], idx, 0)
master.RVALID.value = 0 _set_value(entry["inputs"]["ARREADY"], idx, 0)
master.RDATA.value = 0 _set_value(entry["inputs"]["RVALID"], idx, 0)
master.RRESP.value = 0 _set_value(entry["inputs"]["RDATA"], idx, 0)
_set_value(entry["inputs"]["RRESP"], idx, 0)
await Timer(1, units="ns") await Timer(1, units="ns")
# -------------------------------------------------------------- addr_mask = (1 << config["address_width"]) - 1
# Write transaction targeting reg2 strobe_mask = (1 << config["byte_width"]) - 1
# --------------------------------------------------------------
s_axil.AWADDR.value = WRITE_ADDR
s_axil.AWPROT.value = 0
s_axil.AWVALID.value = 1
s_axil.WDATA.value = WRITE_DATA
s_axil.WSTRB.value = 0xF
s_axil.WVALID.value = 1
s_axil.BREADY.value = 1
await Timer(1, units="ns") for txn in config["transactions"]:
master_name = txn["master"]
index = tuple(txn["index"])
entry = masters[master_name]
assert int(masters["reg2"].AWVALID.value) == 1, "reg2 AWVALID should follow slave" address = txn["address"] & addr_mask
assert int(masters["reg2"].WVALID.value) == 1, "reg2 WVALID should follow slave" write_data = _write_pattern(address, config["data_width"])
assert int(masters["reg2"].AWADDR.value) == WRITE_ADDR, "AWADDR should fan out"
assert int(masters["reg2"].WDATA.value) == WRITE_DATA, "WDATA should fan out"
assert int(masters["reg2"].WSTRB.value) == 0xF, "WSTRB should propagate"
for name, master in masters.items(): slave.AWADDR.value = address
if name != "reg2": slave.AWPROT.value = 0
assert int(master.AWVALID.value) == 0, f"{name} AWVALID should stay low" slave.AWVALID.value = 1
assert int(master.WVALID.value) == 0, f"{name} WVALID should stay low" slave.WDATA.value = write_data
slave.WSTRB.value = strobe_mask
slave.WVALID.value = 1
slave.BREADY.value = 1
# Release write channel await Timer(1, units="ns")
s_axil.AWVALID.value = 0
s_axil.WVALID.value = 0
s_axil.BREADY.value = 0
await Timer(1, units="ns")
# -------------------------------------------------------------- assert _get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted"
# Read transaction targeting reg3 assert _get_int(entry["outputs"]["AWADDR"], index) == address, f"{master_name} must receive AWADDR"
# -------------------------------------------------------------- assert _get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted"
masters["reg3"].RVALID.value = 1 assert _get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA"
masters["reg3"].RDATA.value = READ_DATA assert _get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB"
masters["reg3"].RRESP.value = 0
s_axil.ARADDR.value = READ_ADDR for other_name, other_idx in _all_index_pairs(masters):
s_axil.ARPROT.value = 0 if other_name == master_name and other_idx == index:
s_axil.ARVALID.value = 1 continue
s_axil.RREADY.value = 1 other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0
), f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}"
assert (
_get_int(other_entry["outputs"]["WVALID"], other_idx) == 0
), f"{other_name}{other_idx} WVALID should remain low during {txn['label']}"
await Timer(1, units="ns") slave.AWVALID.value = 0
slave.WVALID.value = 0
slave.BREADY.value = 0
await Timer(1, units="ns")
assert int(masters["reg3"].ARVALID.value) == 1, "reg3 ARVALID should follow slave" read_data = _read_pattern(address, config["data_width"])
assert int(masters["reg3"].ARADDR.value) == READ_ADDR, "ARADDR should fan out" _set_value(entry["inputs"]["RVALID"], index, 1)
_set_value(entry["inputs"]["RDATA"], index, read_data)
_set_value(entry["inputs"]["RRESP"], index, 0)
for name, master in masters.items(): slave.ARADDR.value = address
if name != "reg3": slave.ARPROT.value = 0
assert int(master.ARVALID.value) == 0, f"{name} ARVALID should stay low" slave.ARVALID.value = 1
slave.RREADY.value = 1
assert int(s_axil.RVALID.value) == 1, "Slave should raise RVALID when master responds" await Timer(1, units="ns")
assert int(s_axil.RDATA.value) == READ_DATA, "Read data should return to slave"
assert int(s_axil.RRESP.value) == 0, "No error expected for read"
# Return to idle assert _get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID"
s_axil.ARVALID.value = 0 assert _get_int(entry["outputs"]["ARADDR"], index) == address, f"{master_name} must receive ARADDR"
s_axil.RREADY.value = 0
masters["reg3"].RVALID.value = 0 for other_name, other_idx in _all_index_pairs(masters):
await Timer(1, units="ns") if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0
), f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}"
assert int(slave.RVALID.value) == 1, "Slave should observe RVALID when master responds"
assert int(slave.RDATA.value) == read_data, "Read data must fold back to slave"
assert int(slave.RRESP.value) == 0, "Read response should indicate success"
slave.ARVALID.value = 0
slave.RREADY.value = 0
_set_value(entry["inputs"]["RVALID"], index, 0)
_set_value(entry["inputs"]["RDATA"], index, 0)
await Timer(1, units="ns")

View File

@@ -1,5 +1,8 @@
"""Pytest wrapper launching the AXI4-Lite cocotb smoke test.""" """Pytest wrapper launching the AXI4-Lite cocotb smoke tests."""
from __future__ import annotations
import json
from pathlib import Path from pathlib import Path
import pytest import pytest
@@ -11,20 +14,25 @@ try: # pragma: no cover - optional dependency shim
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner from cocotb_tools.runner import get_runner
from tests.cocotb_lib.utils import compile_rdl_and_export, get_verilog_sources from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
@pytest.mark.simulation @pytest.mark.simulation
@pytest.mark.verilator @pytest.mark.verilator
def test_axi4lite_smoke(tmp_path: Path) -> None: @pytest.mark.parametrize(("rdl_file", "top_name"), RDL_CASES, ids=[case[1] for case in RDL_CASES])
"""Compile the AXI4-Lite design and execute the cocotb smoke test.""" def test_axi4lite_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
"""Compile each AXI4-Lite design variant and execute the cocotb smoke test."""
repo_root = Path(__file__).resolve().parents[4] repo_root = Path(__file__).resolve().parents[4]
rdl_path = repo_root / "tests" / "cocotb_lib" / rdl_file
build_root = tmp_path / top_name
module_path, package_path = compile_rdl_and_export( module_path, package_path, config = prepare_cpuif_case(
str(repo_root / "tests" / "cocotb_lib" / "multiple_reg.rdl"), str(rdl_path),
"multi_reg", top_name,
tmp_path, build_root,
cpuif_cls=AXI4LiteCpuifFlat, cpuif_cls=AXI4LiteCpuifFlat,
control_signal="AWVALID",
) )
sources = get_verilog_sources( sources = get_verilog_sources(
@@ -34,17 +42,18 @@ def test_axi4lite_smoke(tmp_path: Path) -> None:
) )
runner = get_runner("verilator") runner = get_runner("verilator")
build_dir = tmp_path / "sim_build" sim_build = build_root / "sim_build"
runner.build( runner.build(
sources=sources, sources=sources,
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
build_dir=build_dir, build_dir=sim_build,
) )
runner.test( runner.test(
hdl_toplevel=module_path.stem, hdl_toplevel=module_path.stem,
test_module="tests.cocotb.axi4lite.smoke.test_register_access", test_module="tests.cocotb.axi4lite.smoke.test_register_access",
build_dir=build_dir, build_dir=sim_build,
log_file=str(tmp_path / "sim.log"), log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
) )

View File

@@ -1,3 +1,10 @@
from pathlib import Path """Manifest of SystemRDL sources used by the cocotb simulations."""
rdls = map(Path, ["simple.rdl", "multiple_reg.rdl"]) RDL_CASES: list[tuple[str, str]] = [
("simple.rdl", "simple_test"),
("multiple_reg.rdl", "multi_reg"),
("deep_hierarchy.rdl", "deep_hierarchy"),
("wide_status.rdl", "wide_status"),
("variable_layout.rdl", "variable_layout"),
("asymmetric_bus.rdl", "asymmetric_bus"),
]

View File

@@ -0,0 +1,105 @@
regfile port_rf {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_speed[3:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} port_width[8:4];
} control @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} error_count[15:0];
field {
sw = r;
hw = w;
reset = 0x0;
} retry_count[31:16];
} counters @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} qos[7:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} virtual_channel[9:8];
} qos @ 0x8;
};
addrmap asymmetric_bus {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} control[3:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} id[15:4];
} control @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} status_flags[19:0];
} status @ 0x4;
reg {
regwidth = 64;
field {
sw = rw;
hw = rw;
reset = 0x00abcdef;
} timestamp_low[31:0];
field {
sw = rw;
hw = rw;
reset = 0x00123456;
} timestamp_high[55:32];
} timestamp @ 0x8;
reg {
regwidth = 128;
field {
sw = rw;
hw = rw;
reset = 0x0;
} extended_id[63:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} parity[64:64];
} extended @ 0x10;
port_rf port[6] @ 0x40 += 0x20;
};

View File

@@ -0,0 +1,115 @@
addrmap deep_hierarchy {
regfile context_rf {
reg {
field {
sw = rw;
hw = r;
reset = 0x1;
} enable[7:0];
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} status[15:8];
field {
sw = rw;
hw = rw;
reset = 0x55;
} mode[23:16];
} command @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x1234;
} threshold[15:0];
} threshold @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} counter[31:0];
} counter @ 0x8;
};
regfile engine_rf {
context_rf context[3] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} timeout[15:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} priority[19:16];
} config @ 0x30;
reg {
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} error[31:0];
} error_log @ 0x34;
};
addrmap fabric_slice {
engine_rf engines[4] @ 0x0;
regfile monitor_rf {
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} perf_count[31:0];
} perf @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} last_error[31:0];
} last_error @ 0x4;
};
monitor_rf monitor @ 0x400;
reg {
field {
sw = rw;
hw = rw;
reset = 0xdeadbeef;
} fabric_ctrl[31:0];
} fabric_ctrl @ 0x500;
};
fabric_slice slices[2] @ 0x0 += 0x800;
reg {
field {
sw = rw;
hw = rw;
reset = 0x1;
} global_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x4;
} debug_level[3:1];
} global_control @ 0x1000;
};

View File

@@ -1,9 +1,13 @@
"""Common utilities for cocotb testbenches.""" """Common utilities for cocotb testbenches."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from systemrdl import RDLCompiler from systemrdl import RDLCompiler
from systemrdl.node import AddressableNode, AddrmapNode, RegNode
from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif
from peakrdl_busdecoder.exporter import BusDecoderExporter from peakrdl_busdecoder.exporter import BusDecoderExporter
@@ -65,3 +69,206 @@ def get_verilog_sources(module_path: Path, package_path: Path, intf_files: list[
# Add module file # Add module file
sources.append(str(module_path)) sources.append(str(module_path))
return sources return sources
def prepare_cpuif_case(
rdl_source: str,
top_name: str,
output_dir: Path,
cpuif_cls: type[BaseCpuif],
*,
control_signal: str,
max_samples_per_master: int = 3,
exporter_kwargs: dict[str, Any] | None = None,
) -> tuple[Path, Path, dict[str, Any]]:
"""
Compile SystemRDL, export the CPUIF, and build a configuration payload for cocotb tests.
Parameters
----------
rdl_source:
Path to the SystemRDL source file.
top_name:
Name of the top-level addrmap to elaborate.
output_dir:
Directory where generated HDL will be written.
cpuif_cls:
CPUIF implementation class to use during export.
control_signal:
Name of the control signal used to identify master ports
(``"PSEL"`` for APB, ``"AWVALID"`` for AXI4-Lite, etc.).
max_samples_per_master:
Limit for the number of register addresses sampled per master in the test matrix.
exporter_kwargs:
Optional keyword overrides passed through to :class:`BusDecoderExporter`.
Returns
-------
tuple
``(module_path, package_path, config_dict)``, where the configuration dictionary
is JSON-serializable and describes masters, indices, and sampled transactions.
"""
compiler = RDLCompiler()
compiler.compile_file(rdl_source)
root = compiler.elaborate(top_name)
top_node = root.top # type: ignore[assignment]
export_kwargs: dict[str, Any] = {"cpuif_cls": cpuif_cls}
if exporter_kwargs:
export_kwargs.update(exporter_kwargs)
exporter = BusDecoderExporter()
exporter.export(root, str(output_dir), **export_kwargs)
module_name = export_kwargs.get("module_name", top_name)
package_name = export_kwargs.get("package_name", f"{top_name}_pkg")
module_path = Path(output_dir) / f"{module_name}.sv"
package_path = Path(output_dir) / f"{package_name}.sv"
config = _build_case_config(
top_node,
exporter.cpuif,
control_signal,
max_samples_per_master=max_samples_per_master,
)
config["address_width"] = exporter.cpuif.addr_width
config["data_width"] = exporter.cpuif.data_width
config["byte_width"] = exporter.cpuif.data_width // 8
return module_path, package_path, config
def _build_case_config(
top_node: AddrmapNode,
cpuif: BaseCpuif,
control_signal: str,
*,
max_samples_per_master: int,
) -> dict[str, Any]:
master_entries: dict[str, dict[str, Any]] = {}
for child in cpuif.addressable_children:
signal = cpuif.signal(control_signal, child)
# Example: m_apb_tiles_PSEL[N_TILESS] -> m_apb_tiles
base = signal.split("[", 1)[0]
suffix = f"_{control_signal}"
if not base.endswith(suffix):
raise ValueError(f"Unable to derive port prefix from '{signal}'")
port_prefix = base[: -len(suffix)]
master_entries[child.inst_name] = {
"inst_name": child.inst_name,
"port_prefix": port_prefix,
"is_array": bool(child.is_array),
"dimensions": list(child.array_dimensions or []),
"indices": set(),
}
# Map each register to its top-level master and collect addresses
groups: dict[tuple[str, tuple[int, ...]], list[tuple[int, str]]] = defaultdict(list)
def visit(node: AddressableNode) -> None:
if isinstance(node, RegNode):
master = node # type: AddressableNode
while master.parent is not top_node:
parent = master.parent
if not isinstance(parent, AddressableNode):
raise RuntimeError("Encountered unexpected hierarchy while resolving master node")
master = parent
inst_name = master.inst_name
if inst_name not in master_entries:
# Handles cases where the register itself is the master (direct child of top)
signal = cpuif.signal(control_signal, master)
base = signal.split("[", 1)[0]
suffix = f"_{control_signal}"
if not base.endswith(suffix):
raise ValueError(f"Unable to derive port prefix from '{signal}'")
port_prefix = base[: -len(suffix)]
master_entries[inst_name] = {
"inst_name": inst_name,
"port_prefix": port_prefix,
"is_array": bool(master.is_array),
"dimensions": list(master.array_dimensions or []),
"indices": set(),
}
idx_tuple = tuple(master.current_idx or [])
master_entries[inst_name]["indices"].add(idx_tuple)
relative_addr = int(node.absolute_address) - int(top_node.absolute_address)
full_path = node.get_path()
label = full_path.split(".", 1)[1] if "." in full_path else full_path
groups[(inst_name, idx_tuple)].append((relative_addr, label))
for child in node.children(unroll=True):
if isinstance(child, AddressableNode):
visit(child)
visit(top_node)
masters_list = []
for entry in master_entries.values():
indices = entry["indices"] or {()}
entry["indices"] = [list(idx) for idx in sorted(indices)]
masters_list.append(
{
"inst_name": entry["inst_name"],
"port_prefix": entry["port_prefix"],
"is_array": entry["is_array"],
"dimensions": entry["dimensions"],
"indices": entry["indices"],
}
)
transactions = []
for (inst_name, idx_tuple), items in groups.items():
addresses = sorted({addr for addr, _ in items})
samples = _sample_addresses(addresses, max_samples_per_master)
for addr in samples:
label = next(lbl for candidate, lbl in items if candidate == addr)
transactions.append(
{
"address": addr,
"master": inst_name,
"index": list(idx_tuple),
"label": label,
}
)
transactions.sort(key=lambda item: (item["master"], item["index"], item["address"]))
masters_list.sort(key=lambda item: item["inst_name"])
return {
"masters": masters_list,
"transactions": transactions,
}
def _sample_addresses(addresses: list[int], max_samples: int) -> list[int]:
if len(addresses) <= max_samples:
return addresses
samples: list[int] = []
samples.append(addresses[0])
if len(addresses) > 1:
samples.append(addresses[-1])
if len(addresses) > 2:
mid = addresses[len(addresses) // 2]
if mid not in samples:
samples.append(mid)
idx = 1
while len(samples) < max_samples:
pos = (len(addresses) * idx) // max_samples
candidate = addresses[min(pos, len(addresses) - 1)]
if candidate not in samples:
samples.append(candidate)
idx += 1
samples.sort()
return samples

View File

@@ -0,0 +1,156 @@
reg ctrl_reg_t {
desc = "Control register shared across channels.";
field {
sw = rw;
hw = rw;
reset = 0x1;
} enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x2;
} mode[3:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} prescale[11:4];
};
regfile channel_rf {
ctrl_reg_t control @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} gain[11:0];
field {
sw = rw;
hw = rw;
reset = 0x200;
} offset[23:12];
} calibrate @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} sample_count[15:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} error_count[31:16];
} counters @ 0x8;
};
regfile slice_rf {
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} slope[15:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} intercept[31:16];
} calibration @ 0x0;
reg {
regwidth = 64;
field {
sw = r;
hw = w;
reset = 0x0;
} min_val[31:0];
field {
sw = r;
hw = w;
reset = 0x0;
} max_val[63:32];
} range @ 0x4;
};
regfile tile_rf {
channel_rf channel[3] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} tile_mode[1:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} tile_enable[2:2];
} tile_ctrl @ 0x100;
slice_rf slice[2] @ 0x200;
};
regfile summary_rf {
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} total_errors[31:0];
} errors @ 0x0;
reg {
field {
sw = r;
hw = w;
reset = 0x0;
} total_samples[31:0];
} samples @ 0x4;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} interrupt_enable[7:0];
} interrupt_enable @ 0x8;
};
addrmap variable_layout {
tile_rf tiles[2] @ 0x0;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} watchdog_enable[0:0];
field {
sw = rw;
hw = rw;
reset = 0x100;
} watchdog_timeout[16:1];
field {
sw = rw;
hw = rw;
reset = 0x0;
} watchdog_mode[18:17];
} watchdog @ 0x2000;
summary_rf summary @ 0x3000;
};

View File

@@ -0,0 +1,69 @@
reg status_reg_t {
regwidth = 64;
desc = "Status register capturing wide flags and sticky bits.";
field {
sw = r;
hw = w;
onread = rclr;
reset = 0x0;
} flags[62:0];
field {
sw = rw;
hw = r;
reset = 0x1;
} sticky_bit[63:63];
};
reg metrics_reg_t {
regwidth = 64;
desc = "Metrics register pairing counters with thresholds.";
field {
sw = r;
hw = w;
reset = 0x0;
} count[31:0];
field {
sw = rw;
hw = rw;
reset = 0x0;
} threshold[63:32];
};
addrmap wide_status {
status_reg_t status_blocks[16] @ 0x0;
metrics_reg_t metrics[4] @ 0x400;
reg {
regwidth = 128;
field {
sw = rw;
hw = rw;
reset = 0x0;
} configuration[127:0];
} configuration @ 0x800;
reg {
field {
sw = rw;
hw = rw;
reset = 0x0;
} version_major[7:0];
field {
sw = rw;
hw = rw;
reset = 0x1;
} version_minor[15:8];
field {
sw = rw;
hw = rw;
reset = 0x0100;
} build[31:16];
} version @ 0x900;
};