From bad845d15eb92f13f91fa49d53661c3f79276d32 Mon Sep 17 00:00:00 2001 From: Arnav Sacheti <36746504+arnavsacheti@users.noreply.github.com> Date: Tue, 3 Feb 2026 00:03:04 -0800 Subject: [PATCH] Fix/better spec enforcing (#41) * revamp * consolidate * version bump --- pyproject.toml | 2 +- .../cpuif/apb3/apb3_tmpl.sv | 2 +- .../cpuif/apb4/apb4_tmpl.sv | 2 +- .../cpuif/axi4lite/axi4_lite_cpuif.py | 16 +- .../cpuif/axi4lite/axi4_lite_tmpl.sv | 24 +- src/peakrdl_busdecoder/cpuif/base_cpuif.py | 4 + src/peakrdl_busdecoder/cpuif/fanin_gen.py | 4 +- .../cpuif/fanin_intermediate_gen.py | 3 + tests/body/test_body.py | 3 - tests/body/test_if_body.py | 3 +- .../cocotb/apb3/smoke/test_register_access.py | 228 +++++++++++----- tests/cocotb/apb3/smoke/test_runner.py | 4 +- .../cocotb/apb3/smoke/test_variable_depth.py | 56 +--- .../cocotb/apb4/smoke/test_register_access.py | 239 ++++++++++++----- tests/cocotb/apb4/smoke/test_runner.py | 3 +- .../cocotb/apb4/smoke/test_variable_depth.py | 62 +---- .../axi4lite/smoke/test_register_access.py | 244 ++++++++++++++---- tests/cocotb/axi4lite/smoke/test_runner.py | 4 +- tests/cocotb_lib/handle_utils.py | 3 +- tests/cocotb_lib/protocol_utils.py | 101 ++++++++ tests/cocotb_lib/utils.py | 3 +- .../generator/test_decode_logic_generator.py | 4 +- tests/generator/test_design_state.py | 13 +- tests/generator/test_questa_compatibility.py | 38 +-- tests/unit/conftest.py | 5 - tests/unit/test_external_nested.py | 56 ++-- tests/unit/test_max_decode_depth.py | 6 +- tests/utils/test_ref_is_internal.py | 4 +- uv.lock | 2 +- 29 files changed, 775 insertions(+), 363 deletions(-) create mode 100644 tests/cocotb_lib/protocol_utils.py diff --git a/pyproject.toml b/pyproject.toml index 9211dcf..c5fe61d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "peakrdl-busdecoder" -version = "0.6.6" +version = "0.6.7" requires-python = ">=3.10" dependencies = [ "jinja2~=3.1", diff --git a/src/peakrdl_busdecoder/cpuif/apb3/apb3_tmpl.sv b/src/peakrdl_busdecoder/cpuif/apb3/apb3_tmpl.sv index 2688072..5a01bbc 100644 --- a/src/peakrdl_busdecoder/cpuif/apb3/apb3_tmpl.sv +++ b/src/peakrdl_busdecoder/cpuif/apb3/apb3_tmpl.sv @@ -37,4 +37,4 @@ assign {{cpuif.signal("PSLVERR")}} = cpuif_rd_err | cpuif_wr_err; //-------------------------------------------------------------------------- // Fanin CPU Bus interface signals //-------------------------------------------------------------------------- -{{fanin|walk(cpuif=cpuif)}} \ No newline at end of file +{{fanin|walk(cpuif=cpuif)}} diff --git a/src/peakrdl_busdecoder/cpuif/apb4/apb4_tmpl.sv b/src/peakrdl_busdecoder/cpuif/apb4/apb4_tmpl.sv index e44ef30..48833a0 100644 --- a/src/peakrdl_busdecoder/cpuif/apb4/apb4_tmpl.sv +++ b/src/peakrdl_busdecoder/cpuif/apb4/apb4_tmpl.sv @@ -38,4 +38,4 @@ assign {{cpuif.signal("PSLVERR")}} = cpuif_rd_err | cpuif_wr_err; //-------------------------------------------------------------------------- // Fanin CPU Bus interface signals //-------------------------------------------------------------------------- -{{fanin|walk(cpuif=cpuif)}} \ No newline at end of file +{{fanin|walk(cpuif=cpuif)}} diff --git a/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_cpuif.py b/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_cpuif.py index bc9aa0f..a0baab9 100644 --- a/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_cpuif.py +++ b/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_cpuif.py @@ -77,8 +77,8 @@ class AXI4LiteCpuif(BaseCpuif): if self.is_interface and node.is_array and node.array_dimensions: # Generate array index string [i0][i1]... for the intermediate signal array_idx = "".join(f"[i{i}]" for i in range(len(node.array_dimensions))) - fanin["cpuif_wr_ack"] = f"{node.inst_name}_fanin_ready{array_idx}" - fanin["cpuif_wr_err"] = f"{node.inst_name}_fanin_err{array_idx}" + fanin["cpuif_wr_ack"] = f"{node.inst_name}_fanin_wr_valid{array_idx}" + fanin["cpuif_wr_err"] = f"{node.inst_name}_fanin_wr_err{array_idx}" else: # Read side: ack comes from RVALID; err if RRESP[1] is set (SLVERR/DECERR) fanin["cpuif_wr_ack"] = self.signal("BVALID", node, "i") @@ -119,4 +119,16 @@ class AXI4LiteCpuif(BaseCpuif): f"assign {inst_name}_fanin_ready{array_idx} = {master_prefix}{indexed_path}.RVALID;", f"assign {inst_name}_fanin_err{array_idx} = {master_prefix}{indexed_path}.RRESP[1];", f"assign {inst_name}_fanin_data{array_idx} = {master_prefix}{indexed_path}.RDATA;", + f"assign {inst_name}_fanin_wr_valid{array_idx} = {master_prefix}{indexed_path}.BVALID;", + f"assign {inst_name}_fanin_wr_err{array_idx} = {master_prefix}{indexed_path}.BRESP[1];", + ] + + def fanin_intermediate_declarations(self, node: AddressableNode) -> list[str]: + if not node.array_dimensions: + return [] + + array_str = "".join(f"[{dim}]" for dim in node.array_dimensions) + return [ + f"logic {node.inst_name}_fanin_wr_valid{array_str};", + f"logic {node.inst_name}_fanin_wr_err{array_str};", ] diff --git a/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_tmpl.sv b/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_tmpl.sv index a7be528..02fcf54 100644 --- a/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_tmpl.sv +++ b/src/peakrdl_busdecoder/cpuif/axi4lite/axi4_lite_tmpl.sv @@ -26,9 +26,21 @@ `endif {% endif -%} +logic axi_wr_valid; +logic axi_wr_invalid; +logic cpuif_wr_ack_int; +logic cpuif_rd_ack_int; + +assign axi_wr_valid = {{cpuif.signal("AWVALID")}} & {{cpuif.signal("WVALID")}}; +assign axi_wr_invalid = {{cpuif.signal("AWVALID")}} ^ {{cpuif.signal("WVALID")}}; + +// Ready/acceptance follows the simplified single-beat requirement +assign {{cpuif.signal("AWREADY")}} = axi_wr_valid; +assign {{cpuif.signal("WREADY")}} = axi_wr_valid; +assign {{cpuif.signal("ARREADY")}} = {{cpuif.signal("ARVALID")}}; assign cpuif_req = {{cpuif.signal("AWVALID")}} | {{cpuif.signal("ARVALID")}}; -assign cpuif_wr_en = {{cpuif.signal("AWVALID")}} & {{cpuif.signal("WVALID")}}; +assign cpuif_wr_en = axi_wr_valid; assign cpuif_rd_en = {{cpuif.signal("ARVALID")}}; assign cpuif_wr_addr = {{cpuif.signal("AWADDR")}}; @@ -42,12 +54,14 @@ assign cpuif_wr_byte_en = {{cpuif.signal("WSTRB")}}; // Read: ack=RVALID, err=RRESP[1] (SLVERR/DECERR), data=RDATA // assign {{cpuif.signal("RDATA")}} = cpuif_rd_data; -assign {{cpuif.signal("RVALID")}} = cpuif_rd_ack; +assign cpuif_rd_ack_int = cpuif_rd_ack | cpuif_rd_sel.cpuif_err; +assign {{cpuif.signal("RVALID")}} = cpuif_rd_ack_int; assign {{cpuif.signal("RRESP")}} = (cpuif_rd_err | cpuif_rd_sel.cpuif_err | cpuif_wr_sel.cpuif_err) ? 2'b10 : 2'b00; // Write: ack=BVALID, err=BRESP[1] -assign {{cpuif.signal("BVALID")}} = cpuif_wr_ack; -assign {{cpuif.signal("BRESP")}} = (cpuif_wr_err | cpuif_wr_sel.cpuif_err | cpuif_rd_sel.cpuif_err) ? 2'b10 : 2'b00; +assign cpuif_wr_ack_int = cpuif_wr_ack | cpuif_wr_sel.cpuif_err | axi_wr_invalid; +assign {{cpuif.signal("BVALID")}} = cpuif_wr_ack_int; +assign {{cpuif.signal("BRESP")}} = (cpuif_wr_err | cpuif_wr_sel.cpuif_err | cpuif_rd_sel.cpuif_err | axi_wr_invalid) ? 2'b10 : 2'b00; //-------------------------------------------------------------------------- // Fanout CPU Bus interface signals @@ -64,4 +78,4 @@ assign {{cpuif.signal("BRESP")}} = (cpuif_wr_err | cpuif_wr_sel.cpuif_err | cpu //-------------------------------------------------------------------------- // Fanin CPU Bus interface signals //-------------------------------------------------------------------------- -{{fanin|walk(cpuif=cpuif)}} \ No newline at end of file +{{fanin|walk(cpuif=cpuif)}} diff --git a/src/peakrdl_busdecoder/cpuif/base_cpuif.py b/src/peakrdl_busdecoder/cpuif/base_cpuif.py index 185b4c9..b3734c2 100644 --- a/src/peakrdl_busdecoder/cpuif/base_cpuif.py +++ b/src/peakrdl_busdecoder/cpuif/base_cpuif.py @@ -136,3 +136,7 @@ class BaseCpuif: List of assignment strings """ return [] # Default: no intermediate assignments needed + + def fanin_intermediate_declarations(self, node: AddressableNode) -> list[str]: + """Optional extra intermediate signal declarations for interface arrays.""" + return [] diff --git a/src/peakrdl_busdecoder/cpuif/fanin_gen.py b/src/peakrdl_busdecoder/cpuif/fanin_gen.py index 282bd2b..5248cb3 100644 --- a/src/peakrdl_busdecoder/cpuif/fanin_gen.py +++ b/src/peakrdl_busdecoder/cpuif/fanin_gen.py @@ -72,12 +72,12 @@ class FaninGenerator(BusDecoderListener): def __str__(self) -> str: wr_ifb = IfBody() with wr_ifb.cm("cpuif_wr_sel.cpuif_err") as b: - self._cpuif.fanin_wr(error=True) + b += self._cpuif.fanin_wr(error=True) self._stack[-1] += wr_ifb rd_ifb = IfBody() with rd_ifb.cm("cpuif_rd_sel.cpuif_err") as b: - self._cpuif.fanin_rd(error=True) + b += self._cpuif.fanin_rd(error=True) self._stack[-1] += rd_ifb return "\n".join(map(str, self._stack)) diff --git a/src/peakrdl_busdecoder/cpuif/fanin_intermediate_gen.py b/src/peakrdl_busdecoder/cpuif/fanin_intermediate_gen.py index eb409d1..9a146b2 100644 --- a/src/peakrdl_busdecoder/cpuif/fanin_intermediate_gen.py +++ b/src/peakrdl_busdecoder/cpuif/fanin_intermediate_gen.py @@ -94,6 +94,9 @@ class FaninIntermediateGenerator(BusDecoderListener): f"logic [{self._cpuif.data_width - 1}:0] {inst_name}_fanin_data{array_str};" ) + # Allow CPU interface to add extra intermediate declarations (e.g., write responses) + self._declarations.extend(self._cpuif.fanin_intermediate_declarations(node)) + def _generate_intermediate_assignments(self, node: AddressableNode) -> str: """Generate assignments from interface array to intermediate signals.""" inst_name = node.inst_name diff --git a/tests/body/test_body.py b/tests/body/test_body.py index 623af29..726eb93 100644 --- a/tests/body/test_body.py +++ b/tests/body/test_body.py @@ -1,4 +1,3 @@ - from peakrdl_busdecoder.body import Body @@ -46,5 +45,3 @@ class TestBody: outer += "outer2" expected = "outer1\ninner1\ninner2\nouter2" assert str(outer) == expected - - diff --git a/tests/body/test_if_body.py b/tests/body/test_if_body.py index d02c9a9..5e6b90b 100644 --- a/tests/body/test_if_body.py +++ b/tests/body/test_if_body.py @@ -1,4 +1,6 @@ from peakrdl_busdecoder.body import IfBody + + class TestIfBody: """Test the IfBody class.""" @@ -82,4 +84,3 @@ class TestIfBody: assert "if (outer_cond)" in result assert "if (inner_cond)" in result assert "nested_statement;" in result - diff --git a/tests/cocotb/apb3/smoke/test_register_access.py b/tests/cocotb/apb3/smoke/test_register_access.py index b14f189..35695ca 100644 --- a/tests/cocotb/apb3/smoke/test_register_access.py +++ b/tests/cocotb/apb3/smoke/test_register_access.py @@ -2,14 +2,20 @@ from __future__ import annotations -import json -import os -from typing import Any, Iterable +from typing import Any import cocotb -from cocotb.triggers import Timer +from cocotb.triggers import RisingEdge, Timer -from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle +from tests.cocotb_lib.handle_utils import SignalHandle +from tests.cocotb_lib.protocol_utils import ( + all_index_pairs, + find_invalid_address, + get_int, + load_config, + set_value, + start_clock, +) class _Apb3SlaveShim: @@ -17,6 +23,8 @@ class _Apb3SlaveShim: def __init__(self, dut): prefix = "s_apb" + self.PCLK = getattr(dut, f"{prefix}_PCLK", None) + self.PRESETn = getattr(dut, f"{prefix}_PRESETn", None) self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PENABLE = getattr(dut, f"{prefix}_PENABLE") self.PWRITE = getattr(dut, f"{prefix}_PWRITE") @@ -27,25 +35,6 @@ class _Apb3SlaveShim: self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") -def _load_config() -> dict[str, Any]: - payload = os.environ.get("RDL_TEST_CONFIG") - if payload is None: - raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") - return json.loads(payload) - - -def _resolve(handle, indices: Iterable[int]): - return resolve_handle(handle, indices) - - -def _set_value(handle, indices: Iterable[int], value: int) -> None: - _resolve(handle, indices).value = value - - -def _get_int(handle, indices: Iterable[int]) -> int: - return int(_resolve(handle, indices).value) - - def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: table: dict[str, dict[str, Any]] = {} for master in masters_cfg: @@ -71,12 +60,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic return table -def _all_index_pairs(table: dict[str, dict[str, Any]]): - for name, entry in table.items(): - for idx in entry["indices"]: - yield name, idx - - def _write_pattern(address: int, width: int) -> int: mask = (1 << width) - 1 return ((address * 0x2041) ^ 0xCAFEBABE) & mask @@ -90,23 +73,30 @@ def _read_pattern(address: int, width: int) -> int: @cocotb.test() async def test_apb3_address_decoding(dut) -> None: """Exercise the APB3 slave interface against sampled register addresses.""" - config = _load_config() + config = load_config() slave = _Apb3SlaveShim(dut) masters = _build_master_table(dut, config["masters"]) + await start_clock(slave.PCLK) + if slave.PRESETn is not None: + slave.PRESETn.value = 1 + slave.PSEL.value = 0 slave.PENABLE.value = 0 slave.PWRITE.value = 0 slave.PADDR.value = 0 slave.PWDATA.value = 0 - for master_name, idx in _all_index_pairs(masters): + for master_name, idx in all_index_pairs(masters): entry = masters[master_name] - _set_value(entry["inputs"]["PRDATA"], idx, 0) - _set_value(entry["inputs"]["PREADY"], idx, 0) - _set_value(entry["inputs"]["PSLVERR"], idx, 0) + set_value(entry["inputs"]["PRDATA"], idx, 0) + set_value(entry["inputs"]["PREADY"], idx, 0) + set_value(entry["inputs"]["PSLVERR"], idx, 0) - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") addr_mask = (1 << config["address_width"]) - 1 @@ -118,85 +108,203 @@ async def test_apb3_address_decoding(dut) -> None: address = txn["address"] & addr_mask write_data = _write_pattern(address, config["data_width"]) - _set_value(entry["inputs"]["PREADY"], index, 1) - _set_value(entry["inputs"]["PSLVERR"], index, 0) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PSLVERR"], index, 0) + # ------------------------------------------------------------------ + # Setup phase + # ------------------------------------------------------------------ slave.PADDR.value = address slave.PWDATA.value = write_data slave.PWRITE.value = 1 slave.PSEL.value = 1 - slave.PENABLE.value = 1 + slave.PENABLE.value = 0 dut._log.info( f"Starting transaction {txn['label']} to {master_name}{index} at address 0x{address:08X}" ) master_address = (address - entry["inst_address"]) % entry["inst_size"] - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" - assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction" - assert _get_int(entry["outputs"]["PADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" + assert get_int(entry["outputs"]["PENABLE"], index) == 0, ( + f"{master_name} must hold PENABLE low in setup" + ) + assert get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction" + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( f"{master_name} must receive write address" ) - assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, ( + assert get_int(entry["outputs"]["PWDATA"], index) == write_data, ( f"{master_name} must receive write data" ) - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( f"{other_name}{other_idx} should remain idle during {txn['label']}" ) + # ------------------------------------------------------------------ + # Access phase + # ------------------------------------------------------------------ + set_value(entry["inputs"]["PREADY"], index, 1) + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted" + assert get_int(entry["outputs"]["PENABLE"], index) == 1, ( + f"{master_name} must assert PENABLE in access" + ) + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( + f"{master_name} must keep write address stable" + ) + assert get_int(entry["outputs"]["PWDATA"], index) == write_data, ( + f"{master_name} must keep write data stable" + ) + assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master" assert int(slave.PSLVERR.value) == 0, "Write should complete without error" slave.PSEL.value = 0 slave.PENABLE.value = 0 slave.PWRITE.value = 0 - _set_value(entry["inputs"]["PREADY"], index, 0) - await Timer(1, unit="ns") + set_value(entry["inputs"]["PREADY"], index, 0) + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") # ------------------------------------------------------------------ # Read phase # ------------------------------------------------------------------ read_data = _read_pattern(address, config["data_width"]) - _set_value(entry["inputs"]["PRDATA"], index, read_data) - _set_value(entry["inputs"]["PREADY"], index, 1) - _set_value(entry["inputs"]["PSLVERR"], index, 0) + set_value(entry["inputs"]["PRDATA"], index, read_data) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PSLVERR"], index, 0) + # ------------------------------------------------------------------ + # Setup phase + # ------------------------------------------------------------------ slave.PADDR.value = address slave.PWRITE.value = 0 slave.PSEL.value = 1 - slave.PENABLE.value = 1 + slave.PENABLE.value = 0 - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" - assert _get_int(entry["outputs"]["PWRITE"], index) == 0, ( + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" + assert get_int(entry["outputs"]["PENABLE"], index) == 0, ( + f"{master_name} must hold PENABLE low in setup" + ) + assert get_int(entry["outputs"]["PWRITE"], index) == 0, ( f"{master_name} should clear write during read" ) - assert _get_int(entry["outputs"]["PADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( f"{master_name} must receive read address" ) - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( f"{other_name}{other_idx} must stay idle during read of {txn['label']}" ) + # ------------------------------------------------------------------ + # Access phase + # ------------------------------------------------------------------ + set_value(entry["inputs"]["PREADY"], index, 1) + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted" + assert get_int(entry["outputs"]["PENABLE"], index) == 1, ( + f"{master_name} must assert PENABLE in access" + ) + assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave" assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read" assert int(slave.PSLVERR.value) == 0, "Read should not signal an error" slave.PSEL.value = 0 slave.PENABLE.value = 0 - _set_value(entry["inputs"]["PREADY"], index, 0) - _set_value(entry["inputs"]["PRDATA"], index, 0) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PRDATA"], index, 0) + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + +@cocotb.test() +async def test_apb3_invalid_address_response(dut) -> None: + """Ensure invalid addresses yield an error response and no master select.""" + config = load_config() + slave = _Apb3SlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) + + await start_clock(slave.PCLK) + if slave.PRESETn is not None: + slave.PRESETn.value = 1 + + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + slave.PADDR.value = 0 + slave.PWDATA.value = 0 + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + set_value(entry["inputs"]["PREADY"], idx, 0) + set_value(entry["inputs"]["PSLVERR"], idx, 0) + set_value(entry["inputs"]["PRDATA"], idx, 0) + + invalid_addr = find_invalid_address(config) + if invalid_addr is None: + dut._log.warning("No unmapped address found; skipping invalid address test") + return + + slave.PADDR.value = invalid_addr + slave.PWRITE.value = 1 + slave.PWDATA.value = _write_pattern(invalid_addr, config["data_width"]) + slave.PSEL.value = 1 + slave.PENABLE.value = 0 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: await Timer(1, unit="ns") + + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + assert get_int(entry["outputs"]["PSEL"], idx) == 0, ( + f"{master_name}{idx} must stay idle for invalid address" + ) + + assert int(slave.PREADY.value) == 1, "Invalid address should still complete the transfer" + assert int(slave.PSLVERR.value) == 1, "Invalid address should raise PSLVERR" diff --git a/tests/cocotb/apb3/smoke/test_runner.py b/tests/cocotb/apb3/smoke/test_runner.py index 03c662b..a1cc9d0 100644 --- a/tests/cocotb/apb3/smoke/test_runner.py +++ b/tests/cocotb/apb3/smoke/test_runner.py @@ -3,8 +3,8 @@ from __future__ import annotations import json -from pathlib import Path import logging +from pathlib import Path import pytest @@ -16,7 +16,7 @@ except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner from tests.cocotb_lib import RDL_CASES -from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log +from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation diff --git a/tests/cocotb/apb3/smoke/test_variable_depth.py b/tests/cocotb/apb3/smoke/test_variable_depth.py index c1b98c7..f502b8e 100644 --- a/tests/cocotb/apb3/smoke/test_variable_depth.py +++ b/tests/cocotb/apb3/smoke/test_variable_depth.py @@ -3,6 +3,8 @@ import cocotb from cocotb.triggers import Timer +from tests.cocotb_lib.protocol_utils import apb_access, apb_setup + class _Apb3SlaveShim: def __init__(self, dut): @@ -60,13 +62,8 @@ async def test_depth_1(dut): # Write to address 0x0 (should select inner1) inner1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0x12345678 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0x12345678) + await apb_access(s_apb) assert int(inner1.PSEL.value) == 1, "inner1 must be selected" assert int(inner1.PWRITE.value) == 1, "Write should propagate" @@ -101,13 +98,8 @@ async def test_depth_2(dut): # Write to address 0x0 (should select reg1) reg1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0xABCDEF01 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0xABCDEF01) + await apb_access(s_apb) assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0" assert int(inner2.PSEL.value) == 0, "inner2 should not be selected" @@ -120,13 +112,8 @@ async def test_depth_2(dut): # Write to address 0x10 (should select inner2) inner2.PREADY.value = 1 - s_apb.PADDR.value = 0x10 - s_apb.PWDATA.value = 0x23456789 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x10, True, 0x23456789) + await apb_access(s_apb) assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" @@ -158,13 +145,8 @@ async def test_depth_0(dut): # Write to address 0x0 (should select reg1) reg1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0x11111111 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0x11111111) + await apb_access(s_apb) assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0" assert int(reg2.PSEL.value) == 0, "reg2 should not be selected" @@ -178,13 +160,8 @@ async def test_depth_0(dut): # Write to address 0x10 (should select reg2) reg2.PREADY.value = 1 - s_apb.PADDR.value = 0x10 - s_apb.PWDATA.value = 0x22222222 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x10, True, 0x22222222) + await apb_access(s_apb) assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" @@ -198,13 +175,8 @@ async def test_depth_0(dut): # Write to address 0x14 (should select reg2b) reg2b.PREADY.value = 1 - s_apb.PADDR.value = 0x14 - s_apb.PWDATA.value = 0x33333333 - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x14, True, 0x33333333) + await apb_access(s_apb) assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" diff --git a/tests/cocotb/apb4/smoke/test_register_access.py b/tests/cocotb/apb4/smoke/test_register_access.py index f2d27e4..3aa1919 100644 --- a/tests/cocotb/apb4/smoke/test_register_access.py +++ b/tests/cocotb/apb4/smoke/test_register_access.py @@ -2,15 +2,20 @@ from __future__ import annotations -import json -import logging -import os -from typing import Any, Iterable +from typing import Any import cocotb -from cocotb.triggers import Timer +from cocotb.triggers import RisingEdge, Timer -from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle +from tests.cocotb_lib.handle_utils import SignalHandle +from tests.cocotb_lib.protocol_utils import ( + all_index_pairs, + find_invalid_address, + get_int, + load_config, + set_value, + start_clock, +) class _Apb4SlaveShim: @@ -18,6 +23,8 @@ class _Apb4SlaveShim: def __init__(self, dut): prefix = "s_apb" + self.PCLK = getattr(dut, f"{prefix}_PCLK", None) + self.PRESETn = getattr(dut, f"{prefix}_PRESETn", None) self.PSEL = getattr(dut, f"{prefix}_PSEL") self.PENABLE = getattr(dut, f"{prefix}_PENABLE") self.PWRITE = getattr(dut, f"{prefix}_PWRITE") @@ -30,27 +37,6 @@ class _Apb4SlaveShim: self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR") -def _load_config() -> dict[str, Any]: - """Read the JSON payload describing the generated register topology.""" - payload = os.environ.get("RDL_TEST_CONFIG") - if payload is None: - raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") - return json.loads(payload) - - -def _resolve(handle, indices: Iterable[int]): - """Index into hierarchical cocotb handles.""" - return resolve_handle(handle, indices) - - -def _set_value(handle, indices: Iterable[int], value: int) -> None: - _resolve(handle, indices).value = value - - -def _get_int(handle, indices: Iterable[int]) -> int: - return int(_resolve(handle, indices).value) - - def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: table: dict[str, dict[str, Any]] = {} for master in masters_cfg: @@ -79,12 +65,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic return table -def _all_index_pairs(table: dict[str, dict[str, Any]]): - for name, entry in table.items(): - for idx in entry["indices"]: - yield name, idx - - def _write_pattern(address: int, width: int) -> int: mask = (1 << width) - 1 return ((address * 0x1021) ^ 0x1357_9BDF) & mask @@ -98,10 +78,14 @@ def _read_pattern(address: int, width: int) -> int: @cocotb.test() async def test_apb4_address_decoding(dut) -> None: """Drive the APB4 slave interface and verify master fanout across all sampled registers.""" - config = _load_config() + config = load_config() slave = _Apb4SlaveShim(dut) masters = _build_master_table(dut, config["masters"]) + await start_clock(slave.PCLK) + if slave.PRESETn is not None: + slave.PRESETn.value = 1 + slave.PSEL.value = 0 slave.PENABLE.value = 0 slave.PWRITE.value = 0 @@ -110,13 +94,16 @@ async def test_apb4_address_decoding(dut) -> None: slave.PWDATA.value = 0 slave.PSTRB.value = 0 - for master_name, idx in _all_index_pairs(masters): + for master_name, idx in all_index_pairs(masters): entry = masters[master_name] - _set_value(entry["inputs"]["PRDATA"], idx, 0) - _set_value(entry["inputs"]["PREADY"], idx, 0) - _set_value(entry["inputs"]["PSLVERR"], idx, 0) + set_value(entry["inputs"]["PRDATA"], idx, 0) + set_value(entry["inputs"]["PREADY"], idx, 0) + set_value(entry["inputs"]["PSLVERR"], idx, 0) - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") addr_mask = (1 << config["address_width"]) - 1 strobe_mask = (1 << config["byte_width"]) - 1 @@ -130,44 +117,78 @@ async def test_apb4_address_decoding(dut) -> None: write_data = _write_pattern(address, config["data_width"]) # Prime master-side inputs for the write phase - _set_value(entry["inputs"]["PREADY"], index, 1) - _set_value(entry["inputs"]["PSLVERR"], index, 0) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PSLVERR"], index, 0) + # ------------------------------------------------------------------ + # Setup phase + # ------------------------------------------------------------------ slave.PADDR.value = address slave.PWDATA.value = write_data slave.PSTRB.value = strobe_mask slave.PPROT.value = 0 slave.PWRITE.value = 1 slave.PSEL.value = 1 - slave.PENABLE.value = 1 + slave.PENABLE.value = 0 dut._log.info( f"Starting transaction {txn['label']} to {master_name}{index} at address 0x{address:08X}" ) master_address = (address - entry["inst_address"]) % entry["inst_size"] - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" - assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent" - assert _get_int(entry["outputs"]["PADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write" + assert get_int(entry["outputs"]["PENABLE"], index) == 0, ( + f"{master_name} must hold PENABLE low in setup" + ) + assert get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent" + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( f"{master_name} must receive write address" ) - assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, ( + assert get_int(entry["outputs"]["PWDATA"], index) == write_data, ( f"{master_name} must receive write data" ) - assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, ( + assert get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, ( f"{master_name} must receive full strobes" ) - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( f"{other_name}{other_idx} should remain idle during {txn['label']}" ) + # ------------------------------------------------------------------ + # Access phase + # ------------------------------------------------------------------ + set_value(entry["inputs"]["PREADY"], index, 1) + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted" + assert get_int(entry["outputs"]["PENABLE"], index) == 1, ( + f"{master_name} must assert PENABLE in access" + ) + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( + f"{master_name} must keep write address stable" + ) + assert get_int(entry["outputs"]["PWDATA"], index) == write_data, ( + f"{master_name} must keep write data stable" + ) + assert get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, ( + f"{master_name} must keep write strobes stable" + ) + assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master" assert int(slave.PSLVERR.value) == 0, "No error expected during write" @@ -175,40 +196,68 @@ async def test_apb4_address_decoding(dut) -> None: slave.PSEL.value = 0 slave.PENABLE.value = 0 slave.PWRITE.value = 0 - _set_value(entry["inputs"]["PREADY"], index, 0) - await Timer(1, unit="ns") + set_value(entry["inputs"]["PREADY"], index, 0) + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") # ------------------------------------------------------------------ # Read phase # ------------------------------------------------------------------ read_data = _read_pattern(address, config["data_width"]) - _set_value(entry["inputs"]["PRDATA"], index, read_data) - _set_value(entry["inputs"]["PREADY"], index, 1) - _set_value(entry["inputs"]["PSLVERR"], index, 0) + set_value(entry["inputs"]["PRDATA"], index, read_data) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PSLVERR"], index, 0) + # ------------------------------------------------------------------ + # Setup phase + # ------------------------------------------------------------------ slave.PADDR.value = address slave.PWRITE.value = 0 slave.PSEL.value = 1 - slave.PENABLE.value = 1 + slave.PENABLE.value = 0 - await Timer(1, unit="ns") + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" - assert _get_int(entry["outputs"]["PWRITE"], index) == 0, ( + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read" + assert get_int(entry["outputs"]["PENABLE"], index) == 0, ( + f"{master_name} must hold PENABLE low in setup" + ) + assert get_int(entry["outputs"]["PWRITE"], index) == 0, ( f"{master_name} should deassert write for reads" ) - assert _get_int(entry["outputs"]["PADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["PADDR"], index) == master_address, ( f"{master_name} must receive read address" ) - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, ( f"{other_name}{other_idx} must stay idle during read of {txn['label']}" ) + # ------------------------------------------------------------------ + # Access phase + # ------------------------------------------------------------------ + set_value(entry["inputs"]["PREADY"], index, 1) + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted" + assert get_int(entry["outputs"]["PENABLE"], index) == 1, ( + f"{master_name} must assert PENABLE in access" + ) + assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master" assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master" assert int(slave.PSLVERR.value) == 0, "Read should complete without error" @@ -216,6 +265,68 @@ async def test_apb4_address_decoding(dut) -> None: # Reset to idle before progressing slave.PSEL.value = 0 slave.PENABLE.value = 0 - _set_value(entry["inputs"]["PREADY"], index, 0) - _set_value(entry["inputs"]["PRDATA"], index, 0) + set_value(entry["inputs"]["PREADY"], index, 0) + set_value(entry["inputs"]["PRDATA"], index, 0) + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + +@cocotb.test() +async def test_apb4_invalid_address_response(dut) -> None: + """Ensure invalid addresses yield an error response and no master select.""" + config = load_config() + slave = _Apb4SlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) + + await start_clock(slave.PCLK) + if slave.PRESETn is not None: + slave.PRESETn.value = 1 + + slave.PSEL.value = 0 + slave.PENABLE.value = 0 + slave.PWRITE.value = 0 + slave.PADDR.value = 0 + slave.PPROT.value = 0 + slave.PWDATA.value = 0 + slave.PSTRB.value = 0 + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + set_value(entry["inputs"]["PREADY"], idx, 0) + set_value(entry["inputs"]["PSLVERR"], idx, 0) + set_value(entry["inputs"]["PRDATA"], idx, 0) + + invalid_addr = find_invalid_address(config) + if invalid_addr is None: + dut._log.warning("No unmapped address found; skipping invalid address test") + return + + slave.PADDR.value = invalid_addr + slave.PWRITE.value = 1 + slave.PWDATA.value = _write_pattern(invalid_addr, config["data_width"]) + slave.PSTRB.value = (1 << config["byte_width"]) - 1 + slave.PSEL.value = 1 + slave.PENABLE.value = 0 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: await Timer(1, unit="ns") + + slave.PENABLE.value = 1 + + if slave.PCLK is not None: + await RisingEdge(slave.PCLK) + else: + await Timer(1, unit="ns") + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + assert get_int(entry["outputs"]["PSEL"], idx) == 0, ( + f"{master_name}{idx} must stay idle for invalid address" + ) + + assert int(slave.PREADY.value) == 1, "Invalid address should still complete the transfer" + assert int(slave.PSLVERR.value) == 1, "Invalid address should raise PSLVERR" diff --git a/tests/cocotb/apb4/smoke/test_runner.py b/tests/cocotb/apb4/smoke/test_runner.py index 1428ac5..630f63b 100644 --- a/tests/cocotb/apb4/smoke/test_runner.py +++ b/tests/cocotb/apb4/smoke/test_runner.py @@ -5,7 +5,6 @@ from __future__ import annotations import json import logging from pathlib import Path -import logging import pytest @@ -17,7 +16,7 @@ except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner from tests.cocotb_lib import RDL_CASES -from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log +from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation diff --git a/tests/cocotb/apb4/smoke/test_variable_depth.py b/tests/cocotb/apb4/smoke/test_variable_depth.py index b928cb3..81ea716 100644 --- a/tests/cocotb/apb4/smoke/test_variable_depth.py +++ b/tests/cocotb/apb4/smoke/test_variable_depth.py @@ -3,6 +3,8 @@ import cocotb from cocotb.triggers import Timer +from tests.cocotb_lib.protocol_utils import apb_access, apb_setup + class _Apb4SlaveShim: def __init__(self, dut): @@ -66,14 +68,8 @@ async def test_depth_1(dut): # Write to address 0x0 (should select inner1) inner1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0x12345678 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0x12345678) + await apb_access(s_apb) assert int(inner1.PSEL.value) == 1, "inner1 must be selected" assert int(inner1.PWRITE.value) == 1, "Write should propagate" @@ -110,14 +106,8 @@ async def test_depth_2(dut): # Write to address 0x0 (should select reg1) reg1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0xABCDEF01 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0xABCDEF01) + await apb_access(s_apb) assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0" assert int(inner2.PSEL.value) == 0, "inner2 should not be selected" @@ -130,14 +120,8 @@ async def test_depth_2(dut): # Write to address 0x10 (should select inner2) inner2.PREADY.value = 1 - s_apb.PADDR.value = 0x10 - s_apb.PWDATA.value = 0x23456789 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x10, True, 0x23456789) + await apb_access(s_apb) assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" @@ -171,14 +155,8 @@ async def test_depth_0(dut): # Write to address 0x0 (should select reg1) reg1.PREADY.value = 1 - s_apb.PADDR.value = 0x0 - s_apb.PWDATA.value = 0x11111111 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x0, True, 0x11111111) + await apb_access(s_apb) assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0" assert int(reg2.PSEL.value) == 0, "reg2 should not be selected" @@ -192,14 +170,8 @@ async def test_depth_0(dut): # Write to address 0x10 (should select reg2) reg2.PREADY.value = 1 - s_apb.PADDR.value = 0x10 - s_apb.PWDATA.value = 0x22222222 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x10, True, 0x22222222) + await apb_access(s_apb) assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" @@ -213,14 +185,8 @@ async def test_depth_0(dut): # Write to address 0x14 (should select reg2b) reg2b.PREADY.value = 1 - s_apb.PADDR.value = 0x14 - s_apb.PWDATA.value = 0x33333333 - s_apb.PSTRB.value = 0xF - s_apb.PWRITE.value = 1 - s_apb.PSEL.value = 1 - s_apb.PENABLE.value = 1 - - await Timer(1, unit="ns") + await apb_setup(s_apb, 0x14, True, 0x33333333) + await apb_access(s_apb) assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14" assert int(reg1.PSEL.value) == 0, "reg1 should not be selected" diff --git a/tests/cocotb/axi4lite/smoke/test_register_access.py b/tests/cocotb/axi4lite/smoke/test_register_access.py index a6be8a3..8b5a924 100644 --- a/tests/cocotb/axi4lite/smoke/test_register_access.py +++ b/tests/cocotb/axi4lite/smoke/test_register_access.py @@ -2,14 +2,19 @@ from __future__ import annotations -import json -import os -from typing import Any, Iterable +from typing import Any import cocotb from cocotb.triggers import Timer -from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle +from tests.cocotb_lib.handle_utils import SignalHandle +from tests.cocotb_lib.protocol_utils import ( + all_index_pairs, + find_invalid_address, + get_int, + load_config, + set_value, +) class _AxilSlaveShim: @@ -38,25 +43,6 @@ class _AxilSlaveShim: self.RRESP = getattr(dut, f"{prefix}_RRESP") -def _load_config() -> dict[str, Any]: - payload = os.environ.get("RDL_TEST_CONFIG") - if payload is None: - raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") - return json.loads(payload) - - -def _resolve(handle, indices: Iterable[int]): - return resolve_handle(handle, indices) - - -def _set_value(handle, indices: Iterable[int], value: int) -> None: - _resolve(handle, indices).value = value - - -def _get_int(handle, indices: Iterable[int]) -> int: - return int(_resolve(handle, indices).value) - - def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: table: dict[str, dict[str, Any]] = {} for master in masters_cfg: @@ -91,12 +77,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic return table -def _all_index_pairs(table: dict[str, dict[str, Any]]): - for name, entry in table.items(): - for idx in entry["indices"]: - yield name, idx - - def _write_pattern(address: int, width: int) -> int: mask = (1 << width) - 1 return ((address * 0x3105) ^ 0x1357_9BDF) & mask @@ -110,7 +90,7 @@ def _read_pattern(address: int, width: int) -> int: @cocotb.test() async def test_axi4lite_address_decoding(dut) -> None: """Stimulate AXI4-Lite slave channels and verify master port selection.""" - config = _load_config() + config = load_config() slave = _AxilSlaveShim(dut) masters = _build_master_table(dut, config["masters"]) @@ -126,16 +106,16 @@ async def test_axi4lite_address_decoding(dut) -> None: slave.ARPROT.value = 0 slave.RREADY.value = 0 - for master_name, idx in _all_index_pairs(masters): + for master_name, idx in all_index_pairs(masters): entry = masters[master_name] - _set_value(entry["inputs"]["AWREADY"], idx, 0) - _set_value(entry["inputs"]["WREADY"], idx, 0) - _set_value(entry["inputs"]["BVALID"], idx, 0) - _set_value(entry["inputs"]["BRESP"], idx, 0) - _set_value(entry["inputs"]["ARREADY"], idx, 0) - _set_value(entry["inputs"]["RVALID"], idx, 0) - _set_value(entry["inputs"]["RDATA"], idx, 0) - _set_value(entry["inputs"]["RRESP"], idx, 0) + set_value(entry["inputs"]["AWREADY"], idx, 0) + set_value(entry["inputs"]["WREADY"], idx, 0) + set_value(entry["inputs"]["BVALID"], idx, 0) + set_value(entry["inputs"]["BRESP"], idx, 0) + set_value(entry["inputs"]["ARREADY"], idx, 0) + set_value(entry["inputs"]["RVALID"], idx, 0) + set_value(entry["inputs"]["RDATA"], idx, 0) + set_value(entry["inputs"]["RRESP"], idx, 0) await Timer(1, unit="ns") @@ -150,6 +130,9 @@ async def test_axi4lite_address_decoding(dut) -> None: address = txn["address"] & addr_mask write_data = _write_pattern(address, config["data_width"]) + set_value(entry["inputs"]["BVALID"], index, 1) + set_value(entry["inputs"]["BRESP"], index, 0) + slave.AWADDR.value = address slave.AWPROT.value = 0 slave.AWVALID.value = 1 @@ -165,34 +148,40 @@ async def test_axi4lite_address_decoding(dut) -> None: await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted" - assert _get_int(entry["outputs"]["AWADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted" + assert get_int(entry["outputs"]["AWADDR"], index) == master_address, ( f"{master_name} must receive AWADDR" ) - assert _get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted" - assert _get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA" - assert _get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB" + assert get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted" + assert get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA" + assert get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB" + assert int(slave.AWREADY.value) == 1, "AWREADY should assert when write address/data are valid" + assert int(slave.WREADY.value) == 1, "WREADY should assert when write address/data are valid" - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0, ( f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}" ) - assert _get_int(other_entry["outputs"]["WVALID"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["WVALID"], other_idx) == 0, ( f"{other_name}{other_idx} WVALID should remain low during {txn['label']}" ) + assert int(slave.BVALID.value) == 1, "Slave should observe BVALID from selected master" + assert int(slave.BRESP.value) == 0, "BRESP should indicate OKAY on write" + slave.AWVALID.value = 0 slave.WVALID.value = 0 slave.BREADY.value = 0 + set_value(entry["inputs"]["BVALID"], index, 0) await Timer(1, unit="ns") read_data = _read_pattern(address, config["data_width"]) - _set_value(entry["inputs"]["RVALID"], index, 1) - _set_value(entry["inputs"]["RDATA"], index, read_data) - _set_value(entry["inputs"]["RRESP"], index, 0) + set_value(entry["inputs"]["RVALID"], index, 1) + set_value(entry["inputs"]["RDATA"], index, read_data) + set_value(entry["inputs"]["RRESP"], index, 0) slave.ARADDR.value = address slave.ARPROT.value = 0 @@ -201,16 +190,17 @@ async def test_axi4lite_address_decoding(dut) -> None: await Timer(1, unit="ns") - assert _get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID" - assert _get_int(entry["outputs"]["ARADDR"], index) == master_address, ( + assert get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID" + assert get_int(entry["outputs"]["ARADDR"], index) == master_address, ( f"{master_name} must receive ARADDR" ) + assert int(slave.ARREADY.value) == 1, "ARREADY should assert when ARVALID is high" - for other_name, other_idx in _all_index_pairs(masters): + for other_name, other_idx in all_index_pairs(masters): if other_name == master_name and other_idx == index: continue other_entry = masters[other_name] - assert _get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0, ( + assert get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0, ( f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}" ) @@ -220,6 +210,148 @@ async def test_axi4lite_address_decoding(dut) -> None: slave.ARVALID.value = 0 slave.RREADY.value = 0 - _set_value(entry["inputs"]["RVALID"], index, 0) - _set_value(entry["inputs"]["RDATA"], index, 0) + set_value(entry["inputs"]["RVALID"], index, 0) + set_value(entry["inputs"]["RDATA"], index, 0) await Timer(1, unit="ns") + + +@cocotb.test() +async def test_axi4lite_invalid_write_handshake(dut) -> None: + """Ensure mismatched AW/W valid signals raise an error and are ignored.""" + config = load_config() + slave = _AxilSlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) + + slave.AWVALID.value = 0 + slave.AWADDR.value = 0 + slave.AWPROT.value = 0 + slave.WVALID.value = 0 + slave.WDATA.value = 0 + slave.WSTRB.value = 0 + slave.BREADY.value = 0 + slave.ARVALID.value = 0 + slave.ARADDR.value = 0 + slave.ARPROT.value = 0 + slave.RREADY.value = 0 + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + set_value(entry["inputs"]["AWREADY"], idx, 0) + set_value(entry["inputs"]["WREADY"], idx, 0) + set_value(entry["inputs"]["BVALID"], idx, 0) + set_value(entry["inputs"]["BRESP"], idx, 0) + set_value(entry["inputs"]["ARREADY"], idx, 0) + set_value(entry["inputs"]["RVALID"], idx, 0) + set_value(entry["inputs"]["RDATA"], idx, 0) + set_value(entry["inputs"]["RRESP"], idx, 0) + + await Timer(1, unit="ns") + + if not config["transactions"]: + dut._log.warning("No transactions available; skipping invalid handshake test") + return + + bad_addr = config["transactions"][0]["address"] & ((1 << config["address_width"]) - 1) + slave.AWADDR.value = bad_addr + slave.AWPROT.value = 0 + slave.AWVALID.value = 1 + slave.WVALID.value = 0 + slave.BREADY.value = 1 + + await Timer(1, unit="ns") + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + assert get_int(entry["outputs"]["AWVALID"], idx) == 0, ( + f"{master_name}{idx} must not see AWVALID on invalid handshake" + ) + assert get_int(entry["outputs"]["WVALID"], idx) == 0, ( + f"{master_name}{idx} must not see WVALID on invalid handshake" + ) + + assert int(slave.AWREADY.value) == 0, "AWREADY must remain low on invalid write handshake" + assert int(slave.WREADY.value) == 0, "WREADY must remain low on invalid write handshake" + assert int(slave.BVALID.value) == 1, "Invalid write handshake should return BVALID" + assert int(slave.BRESP.value) == 2, "Invalid write handshake should return SLVERR" + + +@cocotb.test() +async def test_axi4lite_invalid_address_response(dut) -> None: + """Ensure unmapped addresses return error responses and do not select a master.""" + config = load_config() + slave = _AxilSlaveShim(dut) + masters = _build_master_table(dut, config["masters"]) + + slave.AWVALID.value = 0 + slave.AWADDR.value = 0 + slave.AWPROT.value = 0 + slave.WVALID.value = 0 + slave.WDATA.value = 0 + slave.WSTRB.value = 0 + slave.BREADY.value = 0 + slave.ARVALID.value = 0 + slave.ARADDR.value = 0 + slave.ARPROT.value = 0 + slave.RREADY.value = 0 + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + set_value(entry["inputs"]["AWREADY"], idx, 0) + set_value(entry["inputs"]["WREADY"], idx, 0) + set_value(entry["inputs"]["BVALID"], idx, 0) + set_value(entry["inputs"]["BRESP"], idx, 0) + set_value(entry["inputs"]["ARREADY"], idx, 0) + set_value(entry["inputs"]["RVALID"], idx, 0) + set_value(entry["inputs"]["RDATA"], idx, 0) + set_value(entry["inputs"]["RRESP"], idx, 0) + + await Timer(1, unit="ns") + + invalid_addr = find_invalid_address(config) + if invalid_addr is None: + dut._log.warning("No unmapped address found; skipping invalid address test") + return + + # Invalid read + slave.ARADDR.value = invalid_addr + slave.ARPROT.value = 0 + slave.ARVALID.value = 1 + slave.RREADY.value = 1 + + await Timer(1, unit="ns") + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + assert get_int(entry["outputs"]["ARVALID"], idx) == 0, ( + f"{master_name}{idx} must stay idle for invalid read address" + ) + + assert int(slave.RVALID.value) == 1, "Invalid read should return RVALID" + assert int(slave.RRESP.value) == 2, "Invalid read should return SLVERR" + + slave.ARVALID.value = 0 + slave.RREADY.value = 0 + await Timer(1, unit="ns") + + # Invalid write + slave.AWADDR.value = invalid_addr + slave.AWPROT.value = 0 + slave.AWVALID.value = 1 + slave.WDATA.value = 0xA5A5_5A5A + slave.WSTRB.value = (1 << config["byte_width"]) - 1 + slave.WVALID.value = 1 + slave.BREADY.value = 1 + + await Timer(1, unit="ns") + + for master_name, idx in all_index_pairs(masters): + entry = masters[master_name] + assert get_int(entry["outputs"]["AWVALID"], idx) == 0, ( + f"{master_name}{idx} must stay idle for invalid write address" + ) + assert get_int(entry["outputs"]["WVALID"], idx) == 0, ( + f"{master_name}{idx} must stay idle for invalid write address" + ) + + assert int(slave.BVALID.value) == 1, "Invalid write should return BVALID" + assert int(slave.BRESP.value) == 2, "Invalid write should return SLVERR" diff --git a/tests/cocotb/axi4lite/smoke/test_runner.py b/tests/cocotb/axi4lite/smoke/test_runner.py index 6f70f38..9d21992 100644 --- a/tests/cocotb/axi4lite/smoke/test_runner.py +++ b/tests/cocotb/axi4lite/smoke/test_runner.py @@ -3,8 +3,8 @@ from __future__ import annotations import json -from pathlib import Path import logging +from pathlib import Path import pytest @@ -16,7 +16,7 @@ except ImportError: # pragma: no cover from cocotb_tools.runner import get_runner from tests.cocotb_lib import RDL_CASES -from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log +from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case @pytest.mark.simulation diff --git a/tests/cocotb_lib/handle_utils.py b/tests/cocotb_lib/handle_utils.py index e29f79e..d242e08 100644 --- a/tests/cocotb_lib/handle_utils.py +++ b/tests/cocotb_lib/handle_utils.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Iterable +from collections.abc import Iterable +from typing import Any class SignalHandle: diff --git a/tests/cocotb_lib/protocol_utils.py b/tests/cocotb_lib/protocol_utils.py new file mode 100644 index 0000000..2094a92 --- /dev/null +++ b/tests/cocotb_lib/protocol_utils.py @@ -0,0 +1,101 @@ +"""Shared helpers for cocotb smoke tests.""" + +from __future__ import annotations + +import json +import os +from collections.abc import Iterable +from typing import Any + +import cocotb +from cocotb.clock import Clock +from cocotb.triggers import RisingEdge, Timer + +from tests.cocotb_lib.handle_utils import resolve_handle + + +def load_config() -> dict[str, Any]: + """Read the JSON payload describing the generated register topology.""" + payload = os.environ.get("RDL_TEST_CONFIG") + if payload is None: + raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided") + return json.loads(payload) + + +def resolve(handle, indices: Iterable[int]): + """Index into hierarchical cocotb handles.""" + return resolve_handle(handle, indices) + + +def set_value(handle, indices: Iterable[int], value: int) -> None: + resolve(handle, indices).value = value + + +def get_int(handle, indices: Iterable[int]) -> int: + return int(resolve(handle, indices).value) + + +def all_index_pairs(table: dict[str, dict[str, Any]]): + for name, entry in table.items(): + for idx in entry["indices"]: + yield name, idx + + +def find_invalid_address(config: dict[str, Any]) -> int | None: + """Return an address outside any master/array span, or None if fully covered.""" + addr_width = config["address_width"] + max_addr = 1 << addr_width + ranges: list[tuple[int, int]] = [] + + for master in config["masters"]: + inst_address = master["inst_address"] + inst_size = master["inst_size"] + n_elems = 1 + if master.get("is_array"): + for dim in master.get("dimensions", []): + n_elems *= dim + span = inst_size * n_elems + ranges.append((inst_address, inst_address + span)) + + ranges.sort() + + cursor = 0 + for start, end in ranges: + if cursor < start: + return cursor + cursor = max(cursor, end) + + if cursor < max_addr: + return cursor + return None + + +async def start_clock(clk_handle, period_ns: int = 2) -> None: + """Start a simple clock if handle is present.""" + if clk_handle is None: + return + clk_handle.value = 0 + cocotb.start_soon(Clock(clk_handle, period_ns, unit="ns").start()) + await RisingEdge(clk_handle) + + +async def apb_setup(slave, addr: int, write: bool, data: int, *, strobe_mask: int | None = None) -> None: + """APB setup phase helper.""" + if hasattr(slave, "PPROT"): + slave.PPROT.value = 0 + if hasattr(slave, "PSTRB"): + if strobe_mask is None: + strobe_mask = (1 << len(slave.PSTRB)) - 1 + slave.PSTRB.value = strobe_mask + slave.PADDR.value = addr + slave.PWDATA.value = data + slave.PWRITE.value = 1 if write else 0 + slave.PSEL.value = 1 + slave.PENABLE.value = 0 + await Timer(1, unit="ns") + + +async def apb_access(slave) -> None: + """APB access phase helper.""" + slave.PENABLE.value = 1 + await Timer(1, unit="ns") diff --git a/tests/cocotb_lib/utils.py b/tests/cocotb_lib/utils.py index 31d7c75..85e0778 100644 --- a/tests/cocotb_lib/utils.py +++ b/tests/cocotb_lib/utils.py @@ -2,9 +2,9 @@ from __future__ import annotations +import re from collections import defaultdict from pathlib import Path -import re from typing import Any from systemrdl import RDLCompiler @@ -49,6 +49,7 @@ def colorize_cocotb_log(text: str) -> str: Returns: A string with colorized log lines. """ + def _color_line(match: re.Match) -> str: prefix = match.group("prefix") time = match.group("time") diff --git a/tests/generator/test_decode_logic_generator.py b/tests/generator/test_decode_logic_generator.py index 98325d3..59727c4 100644 --- a/tests/generator/test_decode_logic_generator.py +++ b/tests/generator/test_decode_logic_generator.py @@ -28,7 +28,7 @@ class TestDecodeLogicGenerator: # Basic sanity check - it should initialize assert gen is not None - assert gen._flavor == DecodeLogicFlavor.READ + assert gen._flavor == DecodeLogicFlavor.READ def test_decode_logic_write(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test decode logic generation for write operations.""" @@ -48,7 +48,7 @@ class TestDecodeLogicGenerator: gen = DecodeLogicGenerator(ds, DecodeLogicFlavor.WRITE) assert gen is not None - assert gen._flavor == DecodeLogicFlavor.WRITE + assert gen._flavor == DecodeLogicFlavor.WRITE def test_cpuif_addr_predicate(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test address predicate generation.""" diff --git a/tests/generator/test_design_state.py b/tests/generator/test_design_state.py index 636a085..9c13134 100644 --- a/tests/generator/test_design_state.py +++ b/tests/generator/test_design_state.py @@ -1,4 +1,3 @@ - from collections.abc import Callable from systemrdl.node import AddrmapNode @@ -9,7 +8,7 @@ from peakrdl_busdecoder.design_state import DesignState class TestDesignState: """Test the DesignState class.""" - def test_design_state_basic(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_basic(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test basic DesignState initialization.""" rdl_source = """ addrmap test { @@ -31,7 +30,7 @@ class TestDesignState: assert ds.cpuif_data_width == 32 # Should infer from 32-bit field assert ds.addr_width > 0 - def test_design_state_custom_module_name(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_custom_module_name(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test DesignState with custom module name.""" rdl_source = """ addrmap test { @@ -50,7 +49,7 @@ class TestDesignState: assert ds.module_name == "custom_module" assert ds.package_name == "custom_module_pkg" - def test_design_state_custom_package_name(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_custom_package_name(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test DesignState with custom package name.""" rdl_source = """ addrmap test { @@ -68,7 +67,7 @@ class TestDesignState: assert ds.package_name == "custom_pkg" - def test_design_state_custom_address_width(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_custom_address_width(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test DesignState with custom address width.""" rdl_source = """ addrmap test { @@ -86,7 +85,7 @@ class TestDesignState: assert ds.addr_width == 16 - def test_design_state_unroll_arrays(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_unroll_arrays(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test DesignState with cpuif_unroll option.""" rdl_source = """ addrmap test { @@ -104,7 +103,7 @@ class TestDesignState: assert ds.cpuif_unroll is True - def test_design_state_64bit_registers(self, compile_rdl:Callable[..., AddrmapNode])->None: + def test_design_state_64bit_registers(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """Test DesignState with wider data width.""" rdl_source = """ addrmap test { diff --git a/tests/generator/test_questa_compatibility.py b/tests/generator/test_questa_compatibility.py index c46bb9c..6a24f33 100644 --- a/tests/generator/test_questa_compatibility.py +++ b/tests/generator/test_questa_compatibility.py @@ -12,13 +12,13 @@ from peakrdl_busdecoder.cpuif.apb4 import APB4Cpuif def test_instance_array_questa_compatibility(compile_rdl: Callable[..., AddrmapNode]) -> None: """Test that instance arrays generate Questa-compatible code. - + This test ensures that: - Struct members for arrays use unpacked array syntax (name[dim]) - NOT packed bit-vector syntax ([dim-1:0]name) - Struct is unpacked (not packed) - Array indexing with loop variables works correctly - + This fixes the error: "Nonconstant index into instance array" """ rdl_source = """ @@ -32,29 +32,29 @@ def test_instance_array_questa_compatibility(compile_rdl: Callable[..., AddrmapN }; """ top = compile_rdl(rdl_source, top="test_map") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Read the generated module module_file = Path(tmpdir) / "test_map.sv" content = module_file.read_text() - + # Should use unpacked struct assert "typedef struct {" in content assert "typedef struct packed" not in content - + # Should use unpacked array syntax for array members assert "logic my_reg[4];" in content - + # Should NOT use packed bit-vector syntax assert "[3:0]my_reg" not in content - + # Should have proper array indexing in decode logic assert "cpuif_wr_sel.my_reg[i0] = 1'b1;" in content assert "cpuif_rd_sel.my_reg[i0] = 1'b1;" in content - + # Should have proper array indexing in fanout/fanin logic assert "cpuif_wr_sel.my_reg[gi0]" in content or "cpuif_rd_sel.my_reg[gi0]" in content assert "cpuif_wr_sel.my_reg[i0]" in content or "cpuif_rd_sel.my_reg[i0]" in content @@ -73,21 +73,21 @@ def test_multidimensional_array_questa_compatibility(compile_rdl: Callable[..., }; """ top = compile_rdl(rdl_source, top="test_map") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Read the generated module module_file = Path(tmpdir) / "test_map.sv" content = module_file.read_text() - + # Should use unpacked struct with multidimensional array assert "typedef struct {" in content - + # Should use unpacked array syntax for multidimensional arrays assert "logic my_reg[2][3];" in content - + # Should NOT use packed bit-vector syntax assert "[1:0][2:0]my_reg" not in content assert "[5:0]my_reg" not in content @@ -110,22 +110,22 @@ def test_nested_instance_array_questa_compatibility(compile_rdl: Callable[..., A }; """ top = compile_rdl(rdl_source, top="outer_map") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Read the generated module module_file = Path(tmpdir) / "outer_map.sv" content = module_file.read_text() - + # Should use unpacked struct assert "typedef struct {" in content - + # Inner should be an array # The exact syntax may vary, but it should be unpacked # Look for the pattern of unpacked arrays, not packed bit-vectors assert "inner[3]" in content or "logic inner" in content - + # Should NOT use packed bit-vector syntax like [2:0]inner assert "[2:0]inner" not in content diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 047a0f7..50e5a65 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,10 +1,5 @@ """Pytest fixtures for unit tests.""" - - -from pathlib import Path -from tempfile import NamedTemporaryFile -from typing import Iterable, Mapping, Optional from collections.abc import Callable import pytest diff --git a/tests/unit/test_external_nested.py b/tests/unit/test_external_nested.py index 425200e..3566ae4 100644 --- a/tests/unit/test_external_nested.py +++ b/tests/unit/test_external_nested.py @@ -12,7 +12,7 @@ from peakrdl_busdecoder.cpuif.apb4 import APB4Cpuif def test_external_nested_components_generate_correct_decoder(external_nested_rdl: AddrmapNode) -> None: """Test that external nested components generate correct decoder logic. - + The decoder should: - Generate select signals for multicast and port[16] - NOT generate select signals for multicast.common[] or multicast.response @@ -25,21 +25,21 @@ def test_external_nested_components_generate_correct_decoder(external_nested_rdl tmpdir, cpuif_cls=APB4Cpuif, ) - + # Read the generated module module_file = Path(tmpdir) / "buffer_t.sv" content = module_file.read_text() - + # Should have correct select signals assert "cpuif_wr_sel.multicast = 1'b1;" in content assert "cpuif_wr_sel.port[i0] = 1'b1;" in content - + # Should NOT have invalid nested paths assert "cpuif_wr_sel.multicast.common" not in content assert "cpuif_wr_sel.multicast.response" not in content assert "cpuif_rd_sel.multicast.common" not in content assert "cpuif_rd_sel.multicast.response" not in content - + # Verify struct is flat (no nested structs for external children) assert "typedef struct" in content assert "logic multicast;" in content @@ -48,7 +48,7 @@ def test_external_nested_components_generate_correct_decoder(external_nested_rdl def test_external_nested_components_generate_correct_interfaces(external_nested_rdl: AddrmapNode) -> None: """Test that external nested components generate correct interface ports. - + The module should have: - One master interface for multicast - Array of 16 master interfaces for port[] @@ -61,15 +61,15 @@ def test_external_nested_components_generate_correct_interfaces(external_nested_ tmpdir, cpuif_cls=APB4Cpuif, ) - + # Read the generated module module_file = Path(tmpdir) / "buffer_t.sv" content = module_file.read_text() - + # Should have master interfaces for top-level external children assert "m_apb_multicast" in content assert "m_apb_port [16]" in content or "m_apb_port[16]" in content - + # Should NOT have interfaces for nested external children assert "m_apb_multicast_common" not in content assert "m_apb_multicast_response" not in content @@ -79,7 +79,7 @@ def test_external_nested_components_generate_correct_interfaces(external_nested_ def test_non_external_nested_components_are_descended(compile_rdl: Callable[..., AddrmapNode]) -> None: """Test that non-external nested components are still descended into. - + This is a regression test to ensure we didn't break normal nested component handling. """ @@ -98,16 +98,16 @@ def test_non_external_nested_components_are_descended(compile_rdl: Callable[..., }; """ top = compile_rdl(rdl_source, top="outer_block") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() # Use depth=0 to descend all the way down to registers exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif, max_decode_depth=0) - + # Read the generated module module_file = Path(tmpdir) / "outer_block.sv" content = module_file.read_text() - + # Should descend into inner and reference inner_reg assert "inner" in content assert "inner_reg" in content @@ -123,7 +123,7 @@ def test_max_decode_depth_parameter_exists(compile_rdl: Callable[..., AddrmapNod }; """ top = compile_rdl(rdl_source, top="simple") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() # Should not raise an exception @@ -133,7 +133,7 @@ def test_max_decode_depth_parameter_exists(compile_rdl: Callable[..., AddrmapNod cpuif_cls=APB4Cpuif, max_decode_depth=2, ) - + # Verify output was generated module_file = Path(tmpdir) / "simple.sv" assert module_file.exists() @@ -141,7 +141,7 @@ def test_max_decode_depth_parameter_exists(compile_rdl: Callable[..., AddrmapNod def test_unaligned_external_component_supported(compile_rdl: Callable[..., AddrmapNode]) -> None: """Test that external components can be at unaligned addresses. - + This test verifies that external components don't need to be aligned to a power-of-2 multiple of their size, as the busdecoder supports unaligned access. @@ -161,16 +161,16 @@ def test_unaligned_external_component_supported(compile_rdl: Callable[..., Addrm }; """ top = compile_rdl(rdl_source, top="buffer_t") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() # Should not raise an alignment error exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Verify output was generated module_file = Path(tmpdir) / "buffer_t.sv" assert module_file.exists() - + content = module_file.read_text() # Verify the external component is in the generated code assert "multicast" in content @@ -178,7 +178,7 @@ def test_unaligned_external_component_supported(compile_rdl: Callable[..., Addrm def test_unaligned_external_component_array_supported(compile_rdl: Callable[..., AddrmapNode]) -> None: """Test that external component arrays with non-power-of-2 strides are supported. - + This test verifies that external component arrays can have arbitrary strides, not just power-of-2 strides. """ @@ -197,16 +197,16 @@ def test_unaligned_external_component_array_supported(compile_rdl: Callable[..., }; """ top = compile_rdl(rdl_source, top="buffer_t") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() # Should not raise an alignment error exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Verify output was generated module_file = Path(tmpdir) / "buffer_t.sv" assert module_file.exists() - + content = module_file.read_text() # Verify the external component array is in the generated code assert "port" in content @@ -214,7 +214,7 @@ def test_unaligned_external_component_array_supported(compile_rdl: Callable[..., def test_unaligned_external_nested_in_addrmap(compile_rdl: Callable[..., AddrmapNode]) -> None: """Test that addrmaps containing external components can be at unaligned addresses. - + This verifies that not just external components themselves, but also non-external addrmaps/regfiles that contain external components can be at unaligned addresses. @@ -235,18 +235,16 @@ def test_unaligned_external_nested_in_addrmap(compile_rdl: Callable[..., Addrmap }; """ top = compile_rdl(rdl_source, top="outer_block") - + with TemporaryDirectory() as tmpdir: exporter = BusDecoderExporter() # Should not raise an alignment error exporter.export(top, tmpdir, cpuif_cls=APB4Cpuif) - + # Verify output was generated module_file = Path(tmpdir) / "outer_block.sv" assert module_file.exists() - + content = module_file.read_text() # Verify the nested components are in the generated code assert "inner" in content - - diff --git a/tests/unit/test_max_decode_depth.py b/tests/unit/test_max_decode_depth.py index d6c479c..a37b915 100644 --- a/tests/unit/test_max_decode_depth.py +++ b/tests/unit/test_max_decode_depth.py @@ -36,7 +36,7 @@ def test_depth_1_generates_top_level_interface_only(compile_rdl: Callable[..., A assert "m_apb_inner1" in content # Should NOT have interface for reg1 assert "m_apb_reg1" not in content - + # Struct should have inner1 but not nested structure assert "logic inner1;" in content @@ -77,7 +77,7 @@ def test_depth_2_generates_second_level_interfaces(compile_rdl: Callable[..., Ad # Should NOT have interface for inner1 or reg2 assert "m_apb_inner1" not in content assert "m_apb_reg2" not in content - + # Struct should be hierarchical with inner1.reg1 and inner1.inner2 assert "cpuif_sel_inner1_t" in content assert "logic reg1;" in content @@ -125,7 +125,7 @@ def test_depth_0_decodes_all_levels(compile_rdl: Callable[..., AddrmapNode]) -> # Should NOT have interfaces for addrmaps assert "m_apb_inner1" not in content assert "m_apb_inner2" not in content - + # Struct should be fully hierarchical assert "cpuif_sel_inner1_t" in content assert "cpuif_sel_inner2_t" in content diff --git a/tests/utils/test_ref_is_internal.py b/tests/utils/test_ref_is_internal.py index a838535..16ad4e9 100644 --- a/tests/utils/test_ref_is_internal.py +++ b/tests/utils/test_ref_is_internal.py @@ -16,9 +16,7 @@ def _find_child_by_name(node: AddrmapNode, inst_name: str): class TestRefIsInternal: """Tests for ref_is_internal utility.""" - def test_external_components_flagged( - self, compile_rdl: Callable[..., AddrmapNode] - ) -> None: + def test_external_components_flagged(self, compile_rdl: Callable[..., AddrmapNode]) -> None: """External components should be treated as non-internal.""" rdl_source = """ reg reg_t { diff --git a/uv.lock b/uv.lock index d35ba2b..7924cd6 100644 --- a/uv.lock +++ b/uv.lock @@ -658,7 +658,7 @@ wheels = [ [[package]] name = "peakrdl-busdecoder" -version = "0.6.6" +version = "0.6.7" source = { editable = "." } dependencies = [ { name = "jinja2" },