@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "peakrdl-busdecoder"
|
||||
version = "0.6.6"
|
||||
version = "0.6.7"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"jinja2~=3.1",
|
||||
|
||||
@@ -77,8 +77,8 @@ class AXI4LiteCpuif(BaseCpuif):
|
||||
if self.is_interface and node.is_array and node.array_dimensions:
|
||||
# Generate array index string [i0][i1]... for the intermediate signal
|
||||
array_idx = "".join(f"[i{i}]" for i in range(len(node.array_dimensions)))
|
||||
fanin["cpuif_wr_ack"] = f"{node.inst_name}_fanin_ready{array_idx}"
|
||||
fanin["cpuif_wr_err"] = f"{node.inst_name}_fanin_err{array_idx}"
|
||||
fanin["cpuif_wr_ack"] = f"{node.inst_name}_fanin_wr_valid{array_idx}"
|
||||
fanin["cpuif_wr_err"] = f"{node.inst_name}_fanin_wr_err{array_idx}"
|
||||
else:
|
||||
# Read side: ack comes from RVALID; err if RRESP[1] is set (SLVERR/DECERR)
|
||||
fanin["cpuif_wr_ack"] = self.signal("BVALID", node, "i")
|
||||
@@ -119,4 +119,16 @@ class AXI4LiteCpuif(BaseCpuif):
|
||||
f"assign {inst_name}_fanin_ready{array_idx} = {master_prefix}{indexed_path}.RVALID;",
|
||||
f"assign {inst_name}_fanin_err{array_idx} = {master_prefix}{indexed_path}.RRESP[1];",
|
||||
f"assign {inst_name}_fanin_data{array_idx} = {master_prefix}{indexed_path}.RDATA;",
|
||||
f"assign {inst_name}_fanin_wr_valid{array_idx} = {master_prefix}{indexed_path}.BVALID;",
|
||||
f"assign {inst_name}_fanin_wr_err{array_idx} = {master_prefix}{indexed_path}.BRESP[1];",
|
||||
]
|
||||
|
||||
def fanin_intermediate_declarations(self, node: AddressableNode) -> list[str]:
|
||||
if not node.array_dimensions:
|
||||
return []
|
||||
|
||||
array_str = "".join(f"[{dim}]" for dim in node.array_dimensions)
|
||||
return [
|
||||
f"logic {node.inst_name}_fanin_wr_valid{array_str};",
|
||||
f"logic {node.inst_name}_fanin_wr_err{array_str};",
|
||||
]
|
||||
|
||||
@@ -26,9 +26,21 @@
|
||||
`endif
|
||||
{% endif -%}
|
||||
|
||||
logic axi_wr_valid;
|
||||
logic axi_wr_invalid;
|
||||
logic cpuif_wr_ack_int;
|
||||
logic cpuif_rd_ack_int;
|
||||
|
||||
assign axi_wr_valid = {{cpuif.signal("AWVALID")}} & {{cpuif.signal("WVALID")}};
|
||||
assign axi_wr_invalid = {{cpuif.signal("AWVALID")}} ^ {{cpuif.signal("WVALID")}};
|
||||
|
||||
// Ready/acceptance follows the simplified single-beat requirement
|
||||
assign {{cpuif.signal("AWREADY")}} = axi_wr_valid;
|
||||
assign {{cpuif.signal("WREADY")}} = axi_wr_valid;
|
||||
assign {{cpuif.signal("ARREADY")}} = {{cpuif.signal("ARVALID")}};
|
||||
|
||||
assign cpuif_req = {{cpuif.signal("AWVALID")}} | {{cpuif.signal("ARVALID")}};
|
||||
assign cpuif_wr_en = {{cpuif.signal("AWVALID")}} & {{cpuif.signal("WVALID")}};
|
||||
assign cpuif_wr_en = axi_wr_valid;
|
||||
assign cpuif_rd_en = {{cpuif.signal("ARVALID")}};
|
||||
|
||||
assign cpuif_wr_addr = {{cpuif.signal("AWADDR")}};
|
||||
@@ -42,12 +54,14 @@ assign cpuif_wr_byte_en = {{cpuif.signal("WSTRB")}};
|
||||
// Read: ack=RVALID, err=RRESP[1] (SLVERR/DECERR), data=RDATA
|
||||
//
|
||||
assign {{cpuif.signal("RDATA")}} = cpuif_rd_data;
|
||||
assign {{cpuif.signal("RVALID")}} = cpuif_rd_ack;
|
||||
assign cpuif_rd_ack_int = cpuif_rd_ack | cpuif_rd_sel.cpuif_err;
|
||||
assign {{cpuif.signal("RVALID")}} = cpuif_rd_ack_int;
|
||||
assign {{cpuif.signal("RRESP")}} = (cpuif_rd_err | cpuif_rd_sel.cpuif_err | cpuif_wr_sel.cpuif_err) ? 2'b10 : 2'b00;
|
||||
|
||||
// Write: ack=BVALID, err=BRESP[1]
|
||||
assign {{cpuif.signal("BVALID")}} = cpuif_wr_ack;
|
||||
assign {{cpuif.signal("BRESP")}} = (cpuif_wr_err | cpuif_wr_sel.cpuif_err | cpuif_rd_sel.cpuif_err) ? 2'b10 : 2'b00;
|
||||
assign cpuif_wr_ack_int = cpuif_wr_ack | cpuif_wr_sel.cpuif_err | axi_wr_invalid;
|
||||
assign {{cpuif.signal("BVALID")}} = cpuif_wr_ack_int;
|
||||
assign {{cpuif.signal("BRESP")}} = (cpuif_wr_err | cpuif_wr_sel.cpuif_err | cpuif_rd_sel.cpuif_err | axi_wr_invalid) ? 2'b10 : 2'b00;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
// Fanout CPU Bus interface signals
|
||||
|
||||
@@ -136,3 +136,7 @@ class BaseCpuif:
|
||||
List of assignment strings
|
||||
"""
|
||||
return [] # Default: no intermediate assignments needed
|
||||
|
||||
def fanin_intermediate_declarations(self, node: AddressableNode) -> list[str]:
|
||||
"""Optional extra intermediate signal declarations for interface arrays."""
|
||||
return []
|
||||
|
||||
@@ -72,12 +72,12 @@ class FaninGenerator(BusDecoderListener):
|
||||
def __str__(self) -> str:
|
||||
wr_ifb = IfBody()
|
||||
with wr_ifb.cm("cpuif_wr_sel.cpuif_err") as b:
|
||||
self._cpuif.fanin_wr(error=True)
|
||||
b += self._cpuif.fanin_wr(error=True)
|
||||
self._stack[-1] += wr_ifb
|
||||
|
||||
rd_ifb = IfBody()
|
||||
with rd_ifb.cm("cpuif_rd_sel.cpuif_err") as b:
|
||||
self._cpuif.fanin_rd(error=True)
|
||||
b += self._cpuif.fanin_rd(error=True)
|
||||
self._stack[-1] += rd_ifb
|
||||
|
||||
return "\n".join(map(str, self._stack))
|
||||
|
||||
@@ -94,6 +94,9 @@ class FaninIntermediateGenerator(BusDecoderListener):
|
||||
f"logic [{self._cpuif.data_width - 1}:0] {inst_name}_fanin_data{array_str};"
|
||||
)
|
||||
|
||||
# Allow CPU interface to add extra intermediate declarations (e.g., write responses)
|
||||
self._declarations.extend(self._cpuif.fanin_intermediate_declarations(node))
|
||||
|
||||
def _generate_intermediate_assignments(self, node: AddressableNode) -> str:
|
||||
"""Generate assignments from interface array to intermediate signals."""
|
||||
inst_name = node.inst_name
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from peakrdl_busdecoder.body import Body
|
||||
|
||||
|
||||
@@ -46,5 +45,3 @@ class TestBody:
|
||||
outer += "outer2"
|
||||
expected = "outer1\ninner1\ninner2\nouter2"
|
||||
assert str(outer) == expected
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from peakrdl_busdecoder.body import IfBody
|
||||
|
||||
|
||||
class TestIfBody:
|
||||
"""Test the IfBody class."""
|
||||
|
||||
@@ -82,4 +84,3 @@ class TestIfBody:
|
||||
assert "if (outer_cond)" in result
|
||||
assert "if (inner_cond)" in result
|
||||
assert "nested_statement;" in result
|
||||
|
||||
|
||||
@@ -2,14 +2,20 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Iterable
|
||||
from typing import Any
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Timer
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle
|
||||
from tests.cocotb_lib.protocol_utils import (
|
||||
all_index_pairs,
|
||||
find_invalid_address,
|
||||
get_int,
|
||||
load_config,
|
||||
set_value,
|
||||
start_clock,
|
||||
)
|
||||
|
||||
|
||||
class _Apb3SlaveShim:
|
||||
@@ -17,6 +23,8 @@ class _Apb3SlaveShim:
|
||||
|
||||
def __init__(self, dut):
|
||||
prefix = "s_apb"
|
||||
self.PCLK = getattr(dut, f"{prefix}_PCLK", None)
|
||||
self.PRESETn = getattr(dut, f"{prefix}_PRESETn", None)
|
||||
self.PSEL = getattr(dut, f"{prefix}_PSEL")
|
||||
self.PENABLE = getattr(dut, f"{prefix}_PENABLE")
|
||||
self.PWRITE = getattr(dut, f"{prefix}_PWRITE")
|
||||
@@ -27,25 +35,6 @@ class _Apb3SlaveShim:
|
||||
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
|
||||
|
||||
|
||||
def _load_config() -> dict[str, Any]:
|
||||
payload = os.environ.get("RDL_TEST_CONFIG")
|
||||
if payload is None:
|
||||
raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
def _resolve(handle, indices: Iterable[int]):
|
||||
return resolve_handle(handle, indices)
|
||||
|
||||
|
||||
def _set_value(handle, indices: Iterable[int], value: int) -> None:
|
||||
_resolve(handle, indices).value = value
|
||||
|
||||
|
||||
def _get_int(handle, indices: Iterable[int]) -> int:
|
||||
return int(_resolve(handle, indices).value)
|
||||
|
||||
|
||||
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
table: dict[str, dict[str, Any]] = {}
|
||||
for master in masters_cfg:
|
||||
@@ -71,12 +60,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
|
||||
return table
|
||||
|
||||
|
||||
def _all_index_pairs(table: dict[str, dict[str, Any]]):
|
||||
for name, entry in table.items():
|
||||
for idx in entry["indices"]:
|
||||
yield name, idx
|
||||
|
||||
|
||||
def _write_pattern(address: int, width: int) -> int:
|
||||
mask = (1 << width) - 1
|
||||
return ((address * 0x2041) ^ 0xCAFEBABE) & mask
|
||||
@@ -90,23 +73,30 @@ def _read_pattern(address: int, width: int) -> int:
|
||||
@cocotb.test()
|
||||
async def test_apb3_address_decoding(dut) -> None:
|
||||
"""Exercise the APB3 slave interface against sampled register addresses."""
|
||||
config = _load_config()
|
||||
config = load_config()
|
||||
slave = _Apb3SlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
await start_clock(slave.PCLK)
|
||||
if slave.PRESETn is not None:
|
||||
slave.PRESETn.value = 1
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
slave.PADDR.value = 0
|
||||
slave.PWDATA.value = 0
|
||||
|
||||
for master_name, idx in _all_index_pairs(masters):
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
_set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
_set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
addr_mask = (1 << config["address_width"]) - 1
|
||||
|
||||
@@ -118,85 +108,203 @@ async def test_apb3_address_decoding(dut) -> None:
|
||||
address = txn["address"] & addr_mask
|
||||
write_data = _write_pattern(address, config["data_width"])
|
||||
|
||||
_set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
_set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Setup phase
|
||||
# ------------------------------------------------------------------
|
||||
slave.PADDR.value = address
|
||||
slave.PWDATA.value = write_data
|
||||
slave.PWRITE.value = 1
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
dut._log.info(
|
||||
f"Starting transaction {txn['label']} to {master_name}{index} at address 0x{address:08X}"
|
||||
)
|
||||
master_address = (address - entry["inst_address"]) % entry["inst_size"]
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
|
||||
assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction"
|
||||
assert _get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 0, (
|
||||
f"{master_name} must hold PENABLE low in setup"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction"
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must receive write address"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
assert get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
f"{master_name} must receive write data"
|
||||
)
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} should remain idle during {txn['label']}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Access phase
|
||||
# ------------------------------------------------------------------
|
||||
set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 1, (
|
||||
f"{master_name} must assert PENABLE in access"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must keep write address stable"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
f"{master_name} must keep write data stable"
|
||||
)
|
||||
|
||||
assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master"
|
||||
assert int(slave.PSLVERR.value) == 0, "Write should complete without error"
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
_set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
await Timer(1, unit="ns")
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Read phase
|
||||
# ------------------------------------------------------------------
|
||||
read_data = _read_pattern(address, config["data_width"])
|
||||
_set_value(entry["inputs"]["PRDATA"], index, read_data)
|
||||
_set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
_set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], index, read_data)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Setup phase
|
||||
# ------------------------------------------------------------------
|
||||
slave.PADDR.value = address
|
||||
slave.PWRITE.value = 0
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
|
||||
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, (
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 0, (
|
||||
f"{master_name} must hold PENABLE low in setup"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWRITE"], index) == 0, (
|
||||
f"{master_name} should clear write during read"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must receive read address"
|
||||
)
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Access phase
|
||||
# ------------------------------------------------------------------
|
||||
set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 1, (
|
||||
f"{master_name} must assert PENABLE in access"
|
||||
)
|
||||
|
||||
assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave"
|
||||
assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read"
|
||||
assert int(slave.PSLVERR.value) == 0, "Read should not signal an error"
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
_set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
_set_value(entry["inputs"]["PRDATA"], index, 0)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], index, 0)
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def test_apb3_invalid_address_response(dut) -> None:
|
||||
"""Ensure invalid addresses yield an error response and no master select."""
|
||||
config = load_config()
|
||||
slave = _Apb3SlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
await start_clock(slave.PCLK)
|
||||
if slave.PRESETn is not None:
|
||||
slave.PRESETn.value = 1
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
slave.PADDR.value = 0
|
||||
slave.PWDATA.value = 0
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
|
||||
invalid_addr = find_invalid_address(config)
|
||||
if invalid_addr is None:
|
||||
dut._log.warning("No unmapped address found; skipping invalid address test")
|
||||
return
|
||||
|
||||
slave.PADDR.value = invalid_addr
|
||||
slave.PWRITE.value = 1
|
||||
slave.PWDATA.value = _write_pattern(invalid_addr, config["data_width"])
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
assert get_int(entry["outputs"]["PSEL"], idx) == 0, (
|
||||
f"{master_name}{idx} must stay idle for invalid address"
|
||||
)
|
||||
|
||||
assert int(slave.PREADY.value) == 1, "Invalid address should still complete the transfer"
|
||||
assert int(slave.PSLVERR.value) == 1, "Invalid address should raise PSLVERR"
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -16,7 +16,7 @@ except ImportError: # pragma: no cover
|
||||
from cocotb_tools.runner import get_runner
|
||||
|
||||
from tests.cocotb_lib import RDL_CASES
|
||||
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
|
||||
from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case
|
||||
|
||||
|
||||
@pytest.mark.simulation
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
import cocotb
|
||||
from cocotb.triggers import Timer
|
||||
|
||||
from tests.cocotb_lib.protocol_utils import apb_access, apb_setup
|
||||
|
||||
|
||||
class _Apb3SlaveShim:
|
||||
def __init__(self, dut):
|
||||
@@ -60,13 +62,8 @@ async def test_depth_1(dut):
|
||||
|
||||
# Write to address 0x0 (should select inner1)
|
||||
inner1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0x12345678
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0x12345678)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(inner1.PSEL.value) == 1, "inner1 must be selected"
|
||||
assert int(inner1.PWRITE.value) == 1, "Write should propagate"
|
||||
@@ -101,13 +98,8 @@ async def test_depth_2(dut):
|
||||
|
||||
# Write to address 0x0 (should select reg1)
|
||||
reg1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0xABCDEF01
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0xABCDEF01)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
|
||||
assert int(inner2.PSEL.value) == 0, "inner2 should not be selected"
|
||||
@@ -120,13 +112,8 @@ async def test_depth_2(dut):
|
||||
|
||||
# Write to address 0x10 (should select inner2)
|
||||
inner2.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x10
|
||||
s_apb.PWDATA.value = 0x23456789
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x10, True, 0x23456789)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
@@ -158,13 +145,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x0 (should select reg1)
|
||||
reg1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0x11111111
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0x11111111)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
|
||||
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"
|
||||
@@ -178,13 +160,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x10 (should select reg2)
|
||||
reg2.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x10
|
||||
s_apb.PWDATA.value = 0x22222222
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x10, True, 0x22222222)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
@@ -198,13 +175,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x14 (should select reg2b)
|
||||
reg2b.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x14
|
||||
s_apb.PWDATA.value = 0x33333333
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x14, True, 0x33333333)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
|
||||
@@ -2,15 +2,20 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Iterable
|
||||
from typing import Any
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Timer
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle
|
||||
from tests.cocotb_lib.protocol_utils import (
|
||||
all_index_pairs,
|
||||
find_invalid_address,
|
||||
get_int,
|
||||
load_config,
|
||||
set_value,
|
||||
start_clock,
|
||||
)
|
||||
|
||||
|
||||
class _Apb4SlaveShim:
|
||||
@@ -18,6 +23,8 @@ class _Apb4SlaveShim:
|
||||
|
||||
def __init__(self, dut):
|
||||
prefix = "s_apb"
|
||||
self.PCLK = getattr(dut, f"{prefix}_PCLK", None)
|
||||
self.PRESETn = getattr(dut, f"{prefix}_PRESETn", None)
|
||||
self.PSEL = getattr(dut, f"{prefix}_PSEL")
|
||||
self.PENABLE = getattr(dut, f"{prefix}_PENABLE")
|
||||
self.PWRITE = getattr(dut, f"{prefix}_PWRITE")
|
||||
@@ -30,27 +37,6 @@ class _Apb4SlaveShim:
|
||||
self.PSLVERR = getattr(dut, f"{prefix}_PSLVERR")
|
||||
|
||||
|
||||
def _load_config() -> dict[str, Any]:
|
||||
"""Read the JSON payload describing the generated register topology."""
|
||||
payload = os.environ.get("RDL_TEST_CONFIG")
|
||||
if payload is None:
|
||||
raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
def _resolve(handle, indices: Iterable[int]):
|
||||
"""Index into hierarchical cocotb handles."""
|
||||
return resolve_handle(handle, indices)
|
||||
|
||||
|
||||
def _set_value(handle, indices: Iterable[int], value: int) -> None:
|
||||
_resolve(handle, indices).value = value
|
||||
|
||||
|
||||
def _get_int(handle, indices: Iterable[int]) -> int:
|
||||
return int(_resolve(handle, indices).value)
|
||||
|
||||
|
||||
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
table: dict[str, dict[str, Any]] = {}
|
||||
for master in masters_cfg:
|
||||
@@ -79,12 +65,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
|
||||
return table
|
||||
|
||||
|
||||
def _all_index_pairs(table: dict[str, dict[str, Any]]):
|
||||
for name, entry in table.items():
|
||||
for idx in entry["indices"]:
|
||||
yield name, idx
|
||||
|
||||
|
||||
def _write_pattern(address: int, width: int) -> int:
|
||||
mask = (1 << width) - 1
|
||||
return ((address * 0x1021) ^ 0x1357_9BDF) & mask
|
||||
@@ -98,10 +78,14 @@ def _read_pattern(address: int, width: int) -> int:
|
||||
@cocotb.test()
|
||||
async def test_apb4_address_decoding(dut) -> None:
|
||||
"""Drive the APB4 slave interface and verify master fanout across all sampled registers."""
|
||||
config = _load_config()
|
||||
config = load_config()
|
||||
slave = _Apb4SlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
await start_clock(slave.PCLK)
|
||||
if slave.PRESETn is not None:
|
||||
slave.PRESETn.value = 1
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
@@ -110,13 +94,16 @@ async def test_apb4_address_decoding(dut) -> None:
|
||||
slave.PWDATA.value = 0
|
||||
slave.PSTRB.value = 0
|
||||
|
||||
for master_name, idx in _all_index_pairs(masters):
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
_set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
_set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
_set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
addr_mask = (1 << config["address_width"]) - 1
|
||||
strobe_mask = (1 << config["byte_width"]) - 1
|
||||
@@ -130,44 +117,78 @@ async def test_apb4_address_decoding(dut) -> None:
|
||||
write_data = _write_pattern(address, config["data_width"])
|
||||
|
||||
# Prime master-side inputs for the write phase
|
||||
_set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
_set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Setup phase
|
||||
# ------------------------------------------------------------------
|
||||
slave.PADDR.value = address
|
||||
slave.PWDATA.value = write_data
|
||||
slave.PSTRB.value = strobe_mask
|
||||
slave.PPROT.value = 0
|
||||
slave.PWRITE.value = 1
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
dut._log.info(
|
||||
f"Starting transaction {txn['label']} to {master_name}{index} at address 0x{address:08X}"
|
||||
)
|
||||
master_address = (address - entry["inst_address"]) % entry["inst_size"]
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
|
||||
assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent"
|
||||
assert _get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 0, (
|
||||
f"{master_name} must hold PENABLE low in setup"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent"
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must receive write address"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
assert get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
f"{master_name} must receive write data"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, (
|
||||
assert get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, (
|
||||
f"{master_name} must receive full strobes"
|
||||
)
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} should remain idle during {txn['label']}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Access phase
|
||||
# ------------------------------------------------------------------
|
||||
set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 1, (
|
||||
f"{master_name} must assert PENABLE in access"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must keep write address stable"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWDATA"], index) == write_data, (
|
||||
f"{master_name} must keep write data stable"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, (
|
||||
f"{master_name} must keep write strobes stable"
|
||||
)
|
||||
|
||||
assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master"
|
||||
assert int(slave.PSLVERR.value) == 0, "No error expected during write"
|
||||
|
||||
@@ -175,40 +196,68 @@ async def test_apb4_address_decoding(dut) -> None:
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
_set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
await Timer(1, unit="ns")
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Read phase
|
||||
# ------------------------------------------------------------------
|
||||
read_data = _read_pattern(address, config["data_width"])
|
||||
_set_value(entry["inputs"]["PRDATA"], index, read_data)
|
||||
_set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
_set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], index, read_data)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], index, 0)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Setup phase
|
||||
# ------------------------------------------------------------------
|
||||
slave.PADDR.value = address
|
||||
slave.PWRITE.value = 0
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
|
||||
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, (
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 0, (
|
||||
f"{master_name} must hold PENABLE low in setup"
|
||||
)
|
||||
assert get_int(entry["outputs"]["PWRITE"], index) == 0, (
|
||||
f"{master_name} should deassert write for reads"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["PADDR"], index) == master_address, (
|
||||
f"{master_name} must receive read address"
|
||||
)
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Access phase
|
||||
# ------------------------------------------------------------------
|
||||
set_value(entry["inputs"]["PREADY"], index, 1)
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must keep PSEL asserted"
|
||||
assert get_int(entry["outputs"]["PENABLE"], index) == 1, (
|
||||
f"{master_name} must assert PENABLE in access"
|
||||
)
|
||||
|
||||
assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master"
|
||||
assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master"
|
||||
assert int(slave.PSLVERR.value) == 0, "Read should complete without error"
|
||||
@@ -216,6 +265,68 @@ async def test_apb4_address_decoding(dut) -> None:
|
||||
# Reset to idle before progressing
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
_set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
_set_value(entry["inputs"]["PRDATA"], index, 0)
|
||||
set_value(entry["inputs"]["PREADY"], index, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], index, 0)
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def test_apb4_invalid_address_response(dut) -> None:
|
||||
"""Ensure invalid addresses yield an error response and no master select."""
|
||||
config = load_config()
|
||||
slave = _Apb4SlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
await start_clock(slave.PCLK)
|
||||
if slave.PRESETn is not None:
|
||||
slave.PRESETn.value = 1
|
||||
|
||||
slave.PSEL.value = 0
|
||||
slave.PENABLE.value = 0
|
||||
slave.PWRITE.value = 0
|
||||
slave.PADDR.value = 0
|
||||
slave.PPROT.value = 0
|
||||
slave.PWDATA.value = 0
|
||||
slave.PSTRB.value = 0
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
set_value(entry["inputs"]["PREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["PSLVERR"], idx, 0)
|
||||
set_value(entry["inputs"]["PRDATA"], idx, 0)
|
||||
|
||||
invalid_addr = find_invalid_address(config)
|
||||
if invalid_addr is None:
|
||||
dut._log.warning("No unmapped address found; skipping invalid address test")
|
||||
return
|
||||
|
||||
slave.PADDR.value = invalid_addr
|
||||
slave.PWRITE.value = 1
|
||||
slave.PWDATA.value = _write_pattern(invalid_addr, config["data_width"])
|
||||
slave.PSTRB.value = (1 << config["byte_width"]) - 1
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
slave.PENABLE.value = 1
|
||||
|
||||
if slave.PCLK is not None:
|
||||
await RisingEdge(slave.PCLK)
|
||||
else:
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
assert get_int(entry["outputs"]["PSEL"], idx) == 0, (
|
||||
f"{master_name}{idx} must stay idle for invalid address"
|
||||
)
|
||||
|
||||
assert int(slave.PREADY.value) == 1, "Invalid address should still complete the transfer"
|
||||
assert int(slave.PSLVERR.value) == 1, "Invalid address should raise PSLVERR"
|
||||
|
||||
@@ -5,7 +5,6 @@ from __future__ import annotations
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -17,7 +16,7 @@ except ImportError: # pragma: no cover
|
||||
from cocotb_tools.runner import get_runner
|
||||
|
||||
from tests.cocotb_lib import RDL_CASES
|
||||
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
|
||||
from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case
|
||||
|
||||
|
||||
@pytest.mark.simulation
|
||||
|
||||
@@ -3,6 +3,8 @@
|
||||
import cocotb
|
||||
from cocotb.triggers import Timer
|
||||
|
||||
from tests.cocotb_lib.protocol_utils import apb_access, apb_setup
|
||||
|
||||
|
||||
class _Apb4SlaveShim:
|
||||
def __init__(self, dut):
|
||||
@@ -66,14 +68,8 @@ async def test_depth_1(dut):
|
||||
|
||||
# Write to address 0x0 (should select inner1)
|
||||
inner1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0x12345678
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0x12345678)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(inner1.PSEL.value) == 1, "inner1 must be selected"
|
||||
assert int(inner1.PWRITE.value) == 1, "Write should propagate"
|
||||
@@ -110,14 +106,8 @@ async def test_depth_2(dut):
|
||||
|
||||
# Write to address 0x0 (should select reg1)
|
||||
reg1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0xABCDEF01
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0xABCDEF01)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
|
||||
assert int(inner2.PSEL.value) == 0, "inner2 should not be selected"
|
||||
@@ -130,14 +120,8 @@ async def test_depth_2(dut):
|
||||
|
||||
# Write to address 0x10 (should select inner2)
|
||||
inner2.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x10
|
||||
s_apb.PWDATA.value = 0x23456789
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x10, True, 0x23456789)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
@@ -171,14 +155,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x0 (should select reg1)
|
||||
reg1.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x0
|
||||
s_apb.PWDATA.value = 0x11111111
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x0, True, 0x11111111)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
|
||||
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"
|
||||
@@ -192,14 +170,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x10 (should select reg2)
|
||||
reg2.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x10
|
||||
s_apb.PWDATA.value = 0x22222222
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x10, True, 0x22222222)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
@@ -213,14 +185,8 @@ async def test_depth_0(dut):
|
||||
|
||||
# Write to address 0x14 (should select reg2b)
|
||||
reg2b.PREADY.value = 1
|
||||
s_apb.PADDR.value = 0x14
|
||||
s_apb.PWDATA.value = 0x33333333
|
||||
s_apb.PSTRB.value = 0xF
|
||||
s_apb.PWRITE.value = 1
|
||||
s_apb.PSEL.value = 1
|
||||
s_apb.PENABLE.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
await apb_setup(s_apb, 0x14, True, 0x33333333)
|
||||
await apb_access(s_apb)
|
||||
|
||||
assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14"
|
||||
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
|
||||
|
||||
@@ -2,14 +2,19 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Iterable
|
||||
from typing import Any
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Timer
|
||||
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
|
||||
from tests.cocotb_lib.handle_utils import SignalHandle
|
||||
from tests.cocotb_lib.protocol_utils import (
|
||||
all_index_pairs,
|
||||
find_invalid_address,
|
||||
get_int,
|
||||
load_config,
|
||||
set_value,
|
||||
)
|
||||
|
||||
|
||||
class _AxilSlaveShim:
|
||||
@@ -38,25 +43,6 @@ class _AxilSlaveShim:
|
||||
self.RRESP = getattr(dut, f"{prefix}_RRESP")
|
||||
|
||||
|
||||
def _load_config() -> dict[str, Any]:
|
||||
payload = os.environ.get("RDL_TEST_CONFIG")
|
||||
if payload is None:
|
||||
raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
def _resolve(handle, indices: Iterable[int]):
|
||||
return resolve_handle(handle, indices)
|
||||
|
||||
|
||||
def _set_value(handle, indices: Iterable[int], value: int) -> None:
|
||||
_resolve(handle, indices).value = value
|
||||
|
||||
|
||||
def _get_int(handle, indices: Iterable[int]) -> int:
|
||||
return int(_resolve(handle, indices).value)
|
||||
|
||||
|
||||
def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||||
table: dict[str, dict[str, Any]] = {}
|
||||
for master in masters_cfg:
|
||||
@@ -91,12 +77,6 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
|
||||
return table
|
||||
|
||||
|
||||
def _all_index_pairs(table: dict[str, dict[str, Any]]):
|
||||
for name, entry in table.items():
|
||||
for idx in entry["indices"]:
|
||||
yield name, idx
|
||||
|
||||
|
||||
def _write_pattern(address: int, width: int) -> int:
|
||||
mask = (1 << width) - 1
|
||||
return ((address * 0x3105) ^ 0x1357_9BDF) & mask
|
||||
@@ -110,7 +90,7 @@ def _read_pattern(address: int, width: int) -> int:
|
||||
@cocotb.test()
|
||||
async def test_axi4lite_address_decoding(dut) -> None:
|
||||
"""Stimulate AXI4-Lite slave channels and verify master port selection."""
|
||||
config = _load_config()
|
||||
config = load_config()
|
||||
slave = _AxilSlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
@@ -126,16 +106,16 @@ async def test_axi4lite_address_decoding(dut) -> None:
|
||||
slave.ARPROT.value = 0
|
||||
slave.RREADY.value = 0
|
||||
|
||||
for master_name, idx in _all_index_pairs(masters):
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
_set_value(entry["inputs"]["AWREADY"], idx, 0)
|
||||
_set_value(entry["inputs"]["WREADY"], idx, 0)
|
||||
_set_value(entry["inputs"]["BVALID"], idx, 0)
|
||||
_set_value(entry["inputs"]["BRESP"], idx, 0)
|
||||
_set_value(entry["inputs"]["ARREADY"], idx, 0)
|
||||
_set_value(entry["inputs"]["RVALID"], idx, 0)
|
||||
_set_value(entry["inputs"]["RDATA"], idx, 0)
|
||||
_set_value(entry["inputs"]["RRESP"], idx, 0)
|
||||
set_value(entry["inputs"]["AWREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["WREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["BVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["BRESP"], idx, 0)
|
||||
set_value(entry["inputs"]["ARREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["RVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["RDATA"], idx, 0)
|
||||
set_value(entry["inputs"]["RRESP"], idx, 0)
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
@@ -150,6 +130,9 @@ async def test_axi4lite_address_decoding(dut) -> None:
|
||||
address = txn["address"] & addr_mask
|
||||
write_data = _write_pattern(address, config["data_width"])
|
||||
|
||||
set_value(entry["inputs"]["BVALID"], index, 1)
|
||||
set_value(entry["inputs"]["BRESP"], index, 0)
|
||||
|
||||
slave.AWADDR.value = address
|
||||
slave.AWPROT.value = 0
|
||||
slave.AWVALID.value = 1
|
||||
@@ -165,34 +148,40 @@ async def test_axi4lite_address_decoding(dut) -> None:
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted"
|
||||
assert _get_int(entry["outputs"]["AWADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["AWVALID"], index) == 1, f"{master_name} should see AWVALID asserted"
|
||||
assert get_int(entry["outputs"]["AWADDR"], index) == master_address, (
|
||||
f"{master_name} must receive AWADDR"
|
||||
)
|
||||
assert _get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted"
|
||||
assert _get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA"
|
||||
assert _get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB"
|
||||
assert get_int(entry["outputs"]["WVALID"], index) == 1, f"{master_name} should see WVALID asserted"
|
||||
assert get_int(entry["outputs"]["WDATA"], index) == write_data, f"{master_name} must receive WDATA"
|
||||
assert get_int(entry["outputs"]["WSTRB"], index) == strobe_mask, f"{master_name} must receive WSTRB"
|
||||
assert int(slave.AWREADY.value) == 1, "AWREADY should assert when write address/data are valid"
|
||||
assert int(slave.WREADY.value) == 1, "WREADY should assert when write address/data are valid"
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}"
|
||||
)
|
||||
assert _get_int(other_entry["outputs"]["WVALID"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["WVALID"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} WVALID should remain low during {txn['label']}"
|
||||
)
|
||||
|
||||
assert int(slave.BVALID.value) == 1, "Slave should observe BVALID from selected master"
|
||||
assert int(slave.BRESP.value) == 0, "BRESP should indicate OKAY on write"
|
||||
|
||||
slave.AWVALID.value = 0
|
||||
slave.WVALID.value = 0
|
||||
slave.BREADY.value = 0
|
||||
set_value(entry["inputs"]["BVALID"], index, 0)
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
read_data = _read_pattern(address, config["data_width"])
|
||||
_set_value(entry["inputs"]["RVALID"], index, 1)
|
||||
_set_value(entry["inputs"]["RDATA"], index, read_data)
|
||||
_set_value(entry["inputs"]["RRESP"], index, 0)
|
||||
set_value(entry["inputs"]["RVALID"], index, 1)
|
||||
set_value(entry["inputs"]["RDATA"], index, read_data)
|
||||
set_value(entry["inputs"]["RRESP"], index, 0)
|
||||
|
||||
slave.ARADDR.value = address
|
||||
slave.ARPROT.value = 0
|
||||
@@ -201,16 +190,17 @@ async def test_axi4lite_address_decoding(dut) -> None:
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
assert _get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID"
|
||||
assert _get_int(entry["outputs"]["ARADDR"], index) == master_address, (
|
||||
assert get_int(entry["outputs"]["ARVALID"], index) == 1, f"{master_name} should assert ARVALID"
|
||||
assert get_int(entry["outputs"]["ARADDR"], index) == master_address, (
|
||||
f"{master_name} must receive ARADDR"
|
||||
)
|
||||
assert int(slave.ARREADY.value) == 1, "ARREADY should assert when ARVALID is high"
|
||||
|
||||
for other_name, other_idx in _all_index_pairs(masters):
|
||||
for other_name, other_idx in all_index_pairs(masters):
|
||||
if other_name == master_name and other_idx == index:
|
||||
continue
|
||||
other_entry = masters[other_name]
|
||||
assert _get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0, (
|
||||
assert get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0, (
|
||||
f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}"
|
||||
)
|
||||
|
||||
@@ -220,6 +210,148 @@ async def test_axi4lite_address_decoding(dut) -> None:
|
||||
|
||||
slave.ARVALID.value = 0
|
||||
slave.RREADY.value = 0
|
||||
_set_value(entry["inputs"]["RVALID"], index, 0)
|
||||
_set_value(entry["inputs"]["RDATA"], index, 0)
|
||||
set_value(entry["inputs"]["RVALID"], index, 0)
|
||||
set_value(entry["inputs"]["RDATA"], index, 0)
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def test_axi4lite_invalid_write_handshake(dut) -> None:
|
||||
"""Ensure mismatched AW/W valid signals raise an error and are ignored."""
|
||||
config = load_config()
|
||||
slave = _AxilSlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
slave.AWVALID.value = 0
|
||||
slave.AWADDR.value = 0
|
||||
slave.AWPROT.value = 0
|
||||
slave.WVALID.value = 0
|
||||
slave.WDATA.value = 0
|
||||
slave.WSTRB.value = 0
|
||||
slave.BREADY.value = 0
|
||||
slave.ARVALID.value = 0
|
||||
slave.ARADDR.value = 0
|
||||
slave.ARPROT.value = 0
|
||||
slave.RREADY.value = 0
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
set_value(entry["inputs"]["AWREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["WREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["BVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["BRESP"], idx, 0)
|
||||
set_value(entry["inputs"]["ARREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["RVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["RDATA"], idx, 0)
|
||||
set_value(entry["inputs"]["RRESP"], idx, 0)
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
if not config["transactions"]:
|
||||
dut._log.warning("No transactions available; skipping invalid handshake test")
|
||||
return
|
||||
|
||||
bad_addr = config["transactions"][0]["address"] & ((1 << config["address_width"]) - 1)
|
||||
slave.AWADDR.value = bad_addr
|
||||
slave.AWPROT.value = 0
|
||||
slave.AWVALID.value = 1
|
||||
slave.WVALID.value = 0
|
||||
slave.BREADY.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
assert get_int(entry["outputs"]["AWVALID"], idx) == 0, (
|
||||
f"{master_name}{idx} must not see AWVALID on invalid handshake"
|
||||
)
|
||||
assert get_int(entry["outputs"]["WVALID"], idx) == 0, (
|
||||
f"{master_name}{idx} must not see WVALID on invalid handshake"
|
||||
)
|
||||
|
||||
assert int(slave.AWREADY.value) == 0, "AWREADY must remain low on invalid write handshake"
|
||||
assert int(slave.WREADY.value) == 0, "WREADY must remain low on invalid write handshake"
|
||||
assert int(slave.BVALID.value) == 1, "Invalid write handshake should return BVALID"
|
||||
assert int(slave.BRESP.value) == 2, "Invalid write handshake should return SLVERR"
|
||||
|
||||
|
||||
@cocotb.test()
|
||||
async def test_axi4lite_invalid_address_response(dut) -> None:
|
||||
"""Ensure unmapped addresses return error responses and do not select a master."""
|
||||
config = load_config()
|
||||
slave = _AxilSlaveShim(dut)
|
||||
masters = _build_master_table(dut, config["masters"])
|
||||
|
||||
slave.AWVALID.value = 0
|
||||
slave.AWADDR.value = 0
|
||||
slave.AWPROT.value = 0
|
||||
slave.WVALID.value = 0
|
||||
slave.WDATA.value = 0
|
||||
slave.WSTRB.value = 0
|
||||
slave.BREADY.value = 0
|
||||
slave.ARVALID.value = 0
|
||||
slave.ARADDR.value = 0
|
||||
slave.ARPROT.value = 0
|
||||
slave.RREADY.value = 0
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
set_value(entry["inputs"]["AWREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["WREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["BVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["BRESP"], idx, 0)
|
||||
set_value(entry["inputs"]["ARREADY"], idx, 0)
|
||||
set_value(entry["inputs"]["RVALID"], idx, 0)
|
||||
set_value(entry["inputs"]["RDATA"], idx, 0)
|
||||
set_value(entry["inputs"]["RRESP"], idx, 0)
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
invalid_addr = find_invalid_address(config)
|
||||
if invalid_addr is None:
|
||||
dut._log.warning("No unmapped address found; skipping invalid address test")
|
||||
return
|
||||
|
||||
# Invalid read
|
||||
slave.ARADDR.value = invalid_addr
|
||||
slave.ARPROT.value = 0
|
||||
slave.ARVALID.value = 1
|
||||
slave.RREADY.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
assert get_int(entry["outputs"]["ARVALID"], idx) == 0, (
|
||||
f"{master_name}{idx} must stay idle for invalid read address"
|
||||
)
|
||||
|
||||
assert int(slave.RVALID.value) == 1, "Invalid read should return RVALID"
|
||||
assert int(slave.RRESP.value) == 2, "Invalid read should return SLVERR"
|
||||
|
||||
slave.ARVALID.value = 0
|
||||
slave.RREADY.value = 0
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
# Invalid write
|
||||
slave.AWADDR.value = invalid_addr
|
||||
slave.AWPROT.value = 0
|
||||
slave.AWVALID.value = 1
|
||||
slave.WDATA.value = 0xA5A5_5A5A
|
||||
slave.WSTRB.value = (1 << config["byte_width"]) - 1
|
||||
slave.WVALID.value = 1
|
||||
slave.BREADY.value = 1
|
||||
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
for master_name, idx in all_index_pairs(masters):
|
||||
entry = masters[master_name]
|
||||
assert get_int(entry["outputs"]["AWVALID"], idx) == 0, (
|
||||
f"{master_name}{idx} must stay idle for invalid write address"
|
||||
)
|
||||
assert get_int(entry["outputs"]["WVALID"], idx) == 0, (
|
||||
f"{master_name}{idx} must stay idle for invalid write address"
|
||||
)
|
||||
|
||||
assert int(slave.BVALID.value) == 1, "Invalid write should return BVALID"
|
||||
assert int(slave.BRESP.value) == 2, "Invalid write should return SLVERR"
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -16,7 +16,7 @@ except ImportError: # pragma: no cover
|
||||
from cocotb_tools.runner import get_runner
|
||||
|
||||
from tests.cocotb_lib import RDL_CASES
|
||||
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
|
||||
from tests.cocotb_lib.utils import colorize_cocotb_log, get_verilog_sources, prepare_cpuif_case
|
||||
|
||||
|
||||
@pytest.mark.simulation
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Iterable
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
|
||||
class SignalHandle:
|
||||
|
||||
101
tests/cocotb_lib/protocol_utils.py
Normal file
101
tests/cocotb_lib/protocol_utils.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Shared helpers for cocotb smoke tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
import cocotb
|
||||
from cocotb.clock import Clock
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
|
||||
from tests.cocotb_lib.handle_utils import resolve_handle
|
||||
|
||||
|
||||
def load_config() -> dict[str, Any]:
|
||||
"""Read the JSON payload describing the generated register topology."""
|
||||
payload = os.environ.get("RDL_TEST_CONFIG")
|
||||
if payload is None:
|
||||
raise RuntimeError("RDL_TEST_CONFIG environment variable was not provided")
|
||||
return json.loads(payload)
|
||||
|
||||
|
||||
def resolve(handle, indices: Iterable[int]):
|
||||
"""Index into hierarchical cocotb handles."""
|
||||
return resolve_handle(handle, indices)
|
||||
|
||||
|
||||
def set_value(handle, indices: Iterable[int], value: int) -> None:
|
||||
resolve(handle, indices).value = value
|
||||
|
||||
|
||||
def get_int(handle, indices: Iterable[int]) -> int:
|
||||
return int(resolve(handle, indices).value)
|
||||
|
||||
|
||||
def all_index_pairs(table: dict[str, dict[str, Any]]):
|
||||
for name, entry in table.items():
|
||||
for idx in entry["indices"]:
|
||||
yield name, idx
|
||||
|
||||
|
||||
def find_invalid_address(config: dict[str, Any]) -> int | None:
|
||||
"""Return an address outside any master/array span, or None if fully covered."""
|
||||
addr_width = config["address_width"]
|
||||
max_addr = 1 << addr_width
|
||||
ranges: list[tuple[int, int]] = []
|
||||
|
||||
for master in config["masters"]:
|
||||
inst_address = master["inst_address"]
|
||||
inst_size = master["inst_size"]
|
||||
n_elems = 1
|
||||
if master.get("is_array"):
|
||||
for dim in master.get("dimensions", []):
|
||||
n_elems *= dim
|
||||
span = inst_size * n_elems
|
||||
ranges.append((inst_address, inst_address + span))
|
||||
|
||||
ranges.sort()
|
||||
|
||||
cursor = 0
|
||||
for start, end in ranges:
|
||||
if cursor < start:
|
||||
return cursor
|
||||
cursor = max(cursor, end)
|
||||
|
||||
if cursor < max_addr:
|
||||
return cursor
|
||||
return None
|
||||
|
||||
|
||||
async def start_clock(clk_handle, period_ns: int = 2) -> None:
|
||||
"""Start a simple clock if handle is present."""
|
||||
if clk_handle is None:
|
||||
return
|
||||
clk_handle.value = 0
|
||||
cocotb.start_soon(Clock(clk_handle, period_ns, unit="ns").start())
|
||||
await RisingEdge(clk_handle)
|
||||
|
||||
|
||||
async def apb_setup(slave, addr: int, write: bool, data: int, *, strobe_mask: int | None = None) -> None:
|
||||
"""APB setup phase helper."""
|
||||
if hasattr(slave, "PPROT"):
|
||||
slave.PPROT.value = 0
|
||||
if hasattr(slave, "PSTRB"):
|
||||
if strobe_mask is None:
|
||||
strobe_mask = (1 << len(slave.PSTRB)) - 1
|
||||
slave.PSTRB.value = strobe_mask
|
||||
slave.PADDR.value = addr
|
||||
slave.PWDATA.value = data
|
||||
slave.PWRITE.value = 1 if write else 0
|
||||
slave.PSEL.value = 1
|
||||
slave.PENABLE.value = 0
|
||||
await Timer(1, unit="ns")
|
||||
|
||||
|
||||
async def apb_access(slave) -> None:
|
||||
"""APB access phase helper."""
|
||||
slave.PENABLE.value = 1
|
||||
await Timer(1, unit="ns")
|
||||
@@ -2,9 +2,9 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from systemrdl import RDLCompiler
|
||||
@@ -49,6 +49,7 @@ def colorize_cocotb_log(text: str) -> str:
|
||||
Returns:
|
||||
A string with colorized log lines.
|
||||
"""
|
||||
|
||||
def _color_line(match: re.Match) -> str:
|
||||
prefix = match.group("prefix")
|
||||
time = match.group("time")
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
from collections.abc import Callable
|
||||
|
||||
from systemrdl.node import AddrmapNode
|
||||
@@ -9,7 +8,7 @@ from peakrdl_busdecoder.design_state import DesignState
|
||||
class TestDesignState:
|
||||
"""Test the DesignState class."""
|
||||
|
||||
def test_design_state_basic(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_basic(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test basic DesignState initialization."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
@@ -31,7 +30,7 @@ class TestDesignState:
|
||||
assert ds.cpuif_data_width == 32 # Should infer from 32-bit field
|
||||
assert ds.addr_width > 0
|
||||
|
||||
def test_design_state_custom_module_name(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_custom_module_name(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test DesignState with custom module name."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
@@ -50,7 +49,7 @@ class TestDesignState:
|
||||
assert ds.module_name == "custom_module"
|
||||
assert ds.package_name == "custom_module_pkg"
|
||||
|
||||
def test_design_state_custom_package_name(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_custom_package_name(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test DesignState with custom package name."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
@@ -68,7 +67,7 @@ class TestDesignState:
|
||||
|
||||
assert ds.package_name == "custom_pkg"
|
||||
|
||||
def test_design_state_custom_address_width(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_custom_address_width(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test DesignState with custom address width."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
@@ -86,7 +85,7 @@ class TestDesignState:
|
||||
|
||||
assert ds.addr_width == 16
|
||||
|
||||
def test_design_state_unroll_arrays(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_unroll_arrays(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test DesignState with cpuif_unroll option."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
@@ -104,7 +103,7 @@ class TestDesignState:
|
||||
|
||||
assert ds.cpuif_unroll is True
|
||||
|
||||
def test_design_state_64bit_registers(self, compile_rdl:Callable[..., AddrmapNode])->None:
|
||||
def test_design_state_64bit_registers(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""Test DesignState with wider data width."""
|
||||
rdl_source = """
|
||||
addrmap test {
|
||||
|
||||
@@ -1,10 +1,5 @@
|
||||
"""Pytest fixtures for unit tests."""
|
||||
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Iterable, Mapping, Optional
|
||||
from collections.abc import Callable
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -248,5 +248,3 @@ def test_unaligned_external_nested_in_addrmap(compile_rdl: Callable[..., Addrmap
|
||||
content = module_file.read_text()
|
||||
# Verify the nested components are in the generated code
|
||||
assert "inner" in content
|
||||
|
||||
|
||||
|
||||
@@ -16,9 +16,7 @@ def _find_child_by_name(node: AddrmapNode, inst_name: str):
|
||||
class TestRefIsInternal:
|
||||
"""Tests for ref_is_internal utility."""
|
||||
|
||||
def test_external_components_flagged(
|
||||
self, compile_rdl: Callable[..., AddrmapNode]
|
||||
) -> None:
|
||||
def test_external_components_flagged(self, compile_rdl: Callable[..., AddrmapNode]) -> None:
|
||||
"""External components should be treated as non-internal."""
|
||||
rdl_source = """
|
||||
reg reg_t {
|
||||
|
||||
Reference in New Issue
Block a user