5 Commits

Author SHA1 Message Date
Byron Lathi
b0db1f60e4 Gate assertions behind "PEAKRDL_ASSERTIONS define" 2025-11-22 16:34:52 -08:00
Byron Lathi
b16eb33ee1 Add taxi apb interface 2025-11-22 15:56:18 -08:00
Byron Lathi
3c87510516 Downsize paddr bits 2025-11-22 12:26:32 -08:00
Arnav Sacheti
f0f25a6d92 update devcontainer extensions 2025-11-12 07:20:11 +00:00
Arnav Sacheti
a9653c8497 Tests/cocotb (#19)
* wip

* reorg

* update sv int

* apb4 working

* apb3 working

* version bump + ignore runner warning

* remove redundant check

* adding log on failure

* cleaning up verilator version issue

* devcontainer

* Fix missing libpython in GitHub Actions CI environment (#21)

* Initial plan

* Install libpython in GitHub Actions for cocotb tests

Co-authored-by: arnavsacheti <36746504+arnavsacheti@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: arnavsacheti <36746504+arnavsacheti@users.noreply.github.com>

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2025-11-10 23:00:28 -08:00
19 changed files with 392 additions and 74 deletions

22
.devcontainer/Dockerfile Normal file
View File

@@ -0,0 +1,22 @@
FROM verilator/verilator:latest
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && \
apt-get install -y --no-install-recommends \
python3 \
python3-venv \
python3-pip \
python3-dev \
build-essential \
pkg-config \
git \
curl \
ca-certificates && \
rm -rf /var/lib/apt/lists/*
ENV UV_INSTALL_DIR=/usr/local/bin
ENV UV_LINK_MODE=copy
# Install uv globally so both VS Code and terminals can use it
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
RUN uv --version

View File

@@ -0,0 +1,36 @@
{
"name": "PeakRDL BusDecoder",
"build": {
"dockerfile": "Dockerfile"
},
"runArgs": [
"--init"
],
"features": {
"ghcr.io/devcontainers/features/common-utils:2": {
"username": "vscode",
"uid": "1000",
"gid": "1000",
"installZsh": "false",
"installOhMyZsh": "false"
}
},
"remoteUser": "vscode",
"postCreateCommand": "uv sync --frozen --all-extras --group tools --group test",
"customizations": {
"vscode": {
"settings": {
"python.defaultInterpreterPath": ".venv/bin/python",
"terminal.integrated.shell.linux": "/bin/bash"
},
"extensions": [
"ms-python.python",
"ms-python.vscode-pylance",
"ms-vscode.cpptools",
"charliermarsh.ruff",
"astral-sh.ty",
"meta.pyrefly"
]
}
}
}

View File

@@ -14,11 +14,13 @@ on:
jobs:
test:
runs-on: ubuntu-latest
container:
image: verilator/verilator:latest
permissions:
contents: read
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
steps:
- uses: actions/checkout@v4
@@ -27,19 +29,21 @@ jobs:
uses: astral-sh/setup-uv@v3
with:
python-version: ${{ matrix.python-version }}
enable-cache: true
- name: Install Verilator
- name: Check Verilator version
run: verilator --version
- name: Install Python development packages
run: |
sudo apt-get update
sudo apt-get install -y verilator
verilator --version
apt-get update && apt-get install -y python3-dev libpython3-dev
- name: Install dependencies
run: |
uv sync --all-extras --group test
- name: Run tests
run: uv run pytest tests/ -v --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term
run: uv run pytest tests/ --cov=peakrdl_busdecoder --cov-report=xml --cov-report=term
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4

View File

@@ -8,7 +8,7 @@ version = "0.5.0"
requires-python = ">=3.10"
dependencies = ["jinja2>=3.1.6", "systemrdl-compiler~=1.30.1"]
authors = [{ name = "Alex Mykyta" }]
authors = [{ name = "Arnav Sacheti" }]
description = "Generate a SystemVerilog bus decoder from SystemRDL for splitting CPU interfaces to multiple sub-address spaces"
readme = "README.md"
license = { text = "LGPLv3" }

View File

@@ -5,7 +5,7 @@ from peakrdl.config import schema
from peakrdl.plugins.entry_points import get_entry_points
from peakrdl.plugins.exporter import ExporterSubcommandPlugin
from .cpuif import BaseCpuif, apb3, apb4, axi4lite
from .cpuif import BaseCpuif, apb3, apb4, axi4lite, taxi_apb
from .exporter import BusDecoderExporter
from .udps import ALL_UDPS
@@ -24,6 +24,7 @@ def get_cpuifs(config: list[tuple[str, Any]]) -> dict[str, type[BaseCpuif]]:
"apb3-flat": apb3.APB3CpuifFlat,
"apb4": apb4.APB4Cpuif,
"apb4-flat": apb4.APB4CpuifFlat,
"taxi-apb": taxi_apb.TaxiAPBCpuif,
"axi4-lite": axi4lite.AXI4LiteCpuif,
"axi4-lite-flat": axi4lite.AXI4LiteCpuifFlat,
}

View File

@@ -2,7 +2,7 @@ from typing import TYPE_CHECKING
from systemrdl.node import AddressableNode
from ...utils import get_indexed_path
from ...utils import clog2, get_indexed_path
from ..base_cpuif import BaseCpuif
from .apb4_interface import APB4FlatInterface
@@ -42,7 +42,7 @@ class APB4CpuifFlat(BaseCpuif):
fanout[self.signal("PWRITE", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
)
fanout[self.signal("PADDR", node, "gi")] = self.signal("PADDR")
fanout[self.signal("PADDR", node, "gi")] = f"{self.signal('PADDR')}[{clog2(node.size) - 1}:0]"
fanout[self.signal("PPROT", node, "gi")] = self.signal("PPROT")
fanout[self.signal("PWDATA", node, "gi")] = "cpuif_wr_data"
fanout[self.signal("PSTRB", node, "gi")] = "cpuif_wr_byte_en"

View File

@@ -2,6 +2,7 @@
from systemrdl.node import AddressableNode
from ...utils import clog2
from ..interface import FlatInterface, SVInterface
@@ -50,7 +51,7 @@ class APB4FlatInterface(FlatInterface):
f"output logic {self.signal('PSEL', child)}",
f"output logic {self.signal('PENABLE', child)}",
f"output logic {self.signal('PWRITE', child)}",
f"output logic [{self.cpuif.addr_width - 1}:0] {self.signal('PADDR', child)}",
f"output logic [{clog2(child.size) - 1}:0] {self.signal('PADDR', child)}",
f"output logic [2:0] {self.signal('PPROT', child)}",
f"output logic [{self.cpuif.data_width - 1}:0] {self.signal('PWDATA', child)}",
f"output logic [{self.cpuif.data_width // 8 - 1}:0] {self.signal('PSTRB', child)}",

View File

@@ -6,8 +6,10 @@
assert_bad_data_width: assert($bits({{cpuif.signal("PWDATA")}}) == {{ds.package_name}}::{{ds.module_name|upper}}_DATA_WIDTH)
else $error("Interface data width of %0d is incorrect. Shall be %0d bits", $bits({{cpuif.signal("PWDATA")}}), {{ds.package_name}}::{{ds.module_name|upper}}_DATA_WIDTH);
end
`ifdef PEAKRDL_ASSERTIONS
assert_wr_sel: assert property (@(posedge {{cpuif.signal("PCLK")}}) {{cpuif.signal("PSEL")}} && {{cpuif.signal("PWRITE")}} |-> ##1 ({{cpuif.signal("PREADY")}} || {{cpuif.signal("PSLVERR")}}))
else $error("APB4 Slave port SEL implies that cpuif_wr_sel must be one-hot encoded");
`endif
`endif
{%- endif %}

View File

@@ -15,6 +15,7 @@
$bits({{cpuif.signal("WDATA")}}), {{ds.package_name}}::{{ds.module_name|upper}}_DATA_WIDTH);
end
`ifdef PEAKRDL_ASSERTIONS
// Simple handshake sanity (one-cycle implication; relax/adjust as needed)
assert_rd_resp_enc: assert property (@(posedge {{cpuif.signal("ACLK")}})
{{cpuif.signal("RVALID")}} |-> (^{{cpuif.signal("RRESP")}} !== 1'bx))
@@ -23,6 +24,7 @@
assert_wr_resp_enc: assert property (@(posedge {{cpuif.signal("ACLK")}})
{{cpuif.signal("BVALID")}} |-> (^{{cpuif.signal("BRESP")}} !== 1'bx))
else $error("BRESP must be a legal AXI response when BVALID is high");
`endif
`endif
{% endif -%}

View File

@@ -64,17 +64,20 @@ class Interface(ABC):
class SVInterface(Interface):
"""SystemVerilog interface-based signal handling."""
slave_modport_name = "slave"
master_modport_name = "master"
@property
def is_interface(self) -> bool:
return True
def get_port_declaration(self, slave_name: str, master_prefix: str) -> str:
"""Generate SystemVerilog interface port declarations."""
slave_ports: list[str] = [f"{self.get_interface_type()}.slave {slave_name}"]
slave_ports: list[str] = [f"{self.get_interface_type()}.{self.slave_modport_name} {slave_name}"]
master_ports: list[str] = []
for child in self.cpuif.addressable_children:
base = f"{self.get_interface_type()}.master {master_prefix}{child.inst_name}"
base = f"{self.get_interface_type()}.{self.master_modport_name} {master_prefix}{child.inst_name}"
# When unrolled, current_idx is set - append it to the name
if child.current_idx is not None:

View File

@@ -0,0 +1,3 @@
from .taxi_apb_cpuif import TaxiAPBCpuif
__all__ = ["TaxiAPBCpuif"]

View File

@@ -0,0 +1,99 @@
from typing import TYPE_CHECKING, overload
from systemrdl.node import AddressableNode
from ...utils import get_indexed_path
from ..base_cpuif import BaseCpuif
from .taxi_apb_interface import TaxiAPBSVInterface
if TYPE_CHECKING:
from ...exporter import BusDecoderExporter
class TaxiAPBCpuif(BaseCpuif):
template_path = "taxi_apb_tmpl.sv"
def __init__(self, exp: "BusDecoderExporter") -> None:
super().__init__(exp)
self._interface = TaxiAPBSVInterface(self)
self._interface.master_modport_name = "mst"
self._interface.slave_modport_name = "slv"
@property
def is_interface(self) -> bool:
return self._interface.is_interface
@property
def port_declaration(self) -> str:
"""Returns the port declaration for the APB4 interface."""
return self._interface.get_port_declaration("s_apb", "m_apb_")
@overload
def signal(self, signal: str, node: None = None, indexer: None = None) -> str: ...
@overload
def signal(self, signal: str, node: AddressableNode, indexer: str) -> str: ...
def signal(self, signal: str, node: AddressableNode | None = None, indexer: str | None = None) -> str:
return self._interface.signal(signal, node, indexer)
def fanout(self, node: AddressableNode) -> str:
fanout: dict[str, str] = {}
fanout[self.signal("psel", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}|cpuif_rd_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
)
fanout[self.signal("penable", node, "gi")] = self.signal("penable")
fanout[self.signal("pwrite", node, "gi")] = (
f"cpuif_wr_sel.{get_indexed_path(self.exp.ds.top_node, node, 'gi')}"
)
fanout[self.signal("paddr", node, "gi")] = self.signal("paddr")
fanout[self.signal("pprot", node, "gi")] = self.signal("pprot")
fanout[self.signal("pwdata", node, "gi")] = "cpuif_wr_data"
fanout[self.signal("pstrb", node, "gi")] = "cpuif_wr_byte_en"
# no user?
return "\n".join(map(lambda kv: f"assign {kv[0]} = {kv[1]};", fanout.items()))
def fanin(self, node: AddressableNode | None = None) -> str:
fanin: dict[str, str] = {}
if node is None:
fanin["cpuif_rd_ack"] = "'0"
fanin["cpuif_rd_err"] = "'0"
else:
# Use intermediate signals for interface arrays to avoid
# non-constant indexing of interface arrays in procedural blocks
if self.is_interface and node.is_array and node.array_dimensions:
# Generate array index string [i0][i1]... for the intermediate signal
array_idx = "".join(f"[i{i}]" for i in range(len(node.array_dimensions)))
fanin["cpuif_rd_ack"] = f"{node.inst_name}_fanin_ready{array_idx}"
fanin["cpuif_rd_err"] = f"{node.inst_name}_fanin_err{array_idx}"
else:
fanin["cpuif_rd_ack"] = self.signal("pready", node, "i")
fanin["cpuif_rd_err"] = self.signal("pslverr", node, "i")
# no user?
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))
def readback(self, node: AddressableNode | None = None) -> str:
fanin: dict[str, str] = {}
if node is None:
fanin["cpuif_rd_data"] = "'0"
else:
# Use intermediate signals for interface arrays to avoid
# non-constant indexing of interface arrays in procedural blocks
if self.is_interface and node.is_array and node.array_dimensions:
# Generate array index string [i0][i1]... for the intermediate signal
array_idx = "".join(f"[i{i}]" for i in range(len(node.array_dimensions)))
fanin["cpuif_rd_data"] = f"{node.inst_name}_fanin_data{array_idx}"
else:
fanin["cpuif_rd_data"] = self.signal("prdata", node, "i")
return "\n".join(map(lambda kv: f"{kv[0]} = {kv[1]};", fanin.items()))
def fanin_intermediate_assignments(
self, node: AddressableNode, inst_name: str, array_idx: str, master_prefix: str, indexed_path: str
) -> list[str]:
"""Generate intermediate signal assignments for APB4 interface arrays."""
return [
f"assign {inst_name}_fanin_ready{array_idx} = {master_prefix}{indexed_path}.pready;",
f"assign {inst_name}_fanin_err{array_idx} = {master_prefix}{indexed_path}.pslverr;",
f"assign {inst_name}_fanin_data{array_idx} = {master_prefix}{indexed_path}.prdata;",
]

View File

@@ -0,0 +1,16 @@
"""Taxi APB-specific interface implementations."""
from ..interface import SVInterface
class TaxiAPBSVInterface(SVInterface):
"""Taxi APB SystemVerilog interface."""
def get_interface_type(self) -> str:
return "taxi_apb_if"
def get_slave_name(self) -> str:
return "s_apb"
def get_master_prefix(self) -> str:
return "m_apb_"

View File

@@ -0,0 +1,45 @@
{%- if cpuif.is_interface %}
`ifndef SYNTHESIS
initial begin
assert_bad_addr_width: assert($bits({{cpuif.signal("paddr")}}) >= {{ds.package_name}}::{{ds.module_name|upper}}_MIN_ADDR_WIDTH)
else $error("Interface address width of %0d is too small. Shall be at least %0d bits", $bits({{cpuif.signal("paddr")}}), {{ds.package_name}}::{{ds.module_name|upper}}_MIN_ADDR_WIDTH);
assert_bad_data_width: assert($bits({{cpuif.signal("pwdata")}}) == {{ds.package_name}}::{{ds.module_name|upper}}_DATA_WIDTH)
else $error("Interface data width of %0d is incorrect. Shall be %0d bits", $bits({{cpuif.signal("pwdata")}}), {{ds.package_name}}::{{ds.module_name|upper}}_DATA_WIDTH);
end
`ifdef PEAKRDL_ASSERTIONS
assert_wr_sel: assert property (@(posedge {{cpuif.signal("PCLK")}}) {{cpuif.signal("psel")}} && {{cpuif.signal("pwrite")}} |-> ##1 ({{cpuif.signal("pready")}} || {{cpuif.signal("pslverr")}}))
else $error("APB4 Slave port SEL implies that cpuif_wr_sel must be one-hot encoded");
`endif
`endif
{%- endif %}
assign cpuif_req = {{cpuif.signal("psel")}};
assign cpuif_wr_en = {{cpuif.signal("pwrite")}};
assign cpuif_rd_en = !{{cpuif.signal("pwrite")}};
assign cpuif_wr_addr = {{cpuif.signal("paddr")}};
assign cpuif_rd_addr = {{cpuif.signal("paddr")}};
assign cpuif_wr_data = {{cpuif.signal("pwdata")}};
assign cpuif_wr_byte_en = {{cpuif.signal("pstrb")}};
assign {{cpuif.signal("prdata")}} = cpuif_rd_data;
assign {{cpuif.signal("pready")}} = cpuif_rd_ack;
assign {{cpuif.signal("pslverr")}} = cpuif_rd_err | cpuif_rd_sel.cpuif_err | cpuif_wr_sel.cpuif_err;
//--------------------------------------------------------------------------
// Fanout CPU Bus interface signals
//--------------------------------------------------------------------------
{{fanout|walk(cpuif=cpuif)}}
{%- if cpuif.is_interface %}
//--------------------------------------------------------------------------
// Intermediate signals for interface array fanin
//--------------------------------------------------------------------------
{{fanin_intermediate|walk(cpuif=cpuif)}}
{%- endif %}
//--------------------------------------------------------------------------
// Fanin CPU Bus interface signals
//--------------------------------------------------------------------------
{{fanin|walk(cpuif=cpuif)}}

View File

@@ -9,6 +9,7 @@ from typing import Any, Iterable
import cocotb
from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _Apb3SlaveShim:
"""Accessor for the APB3 slave signals on the DUT."""
@@ -33,10 +34,7 @@ def _load_config() -> dict[str, Any]:
def _resolve(handle, indices: Iterable[int]):
ref = handle
for idx in indices:
ref = ref[idx]
return ref
return resolve_handle(handle, indices)
def _set_value(handle, indices: Iterable[int], value: int) -> None:
@@ -54,16 +52,16 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": getattr(dut, f"{prefix}_PSEL"),
"PENABLE": getattr(dut, f"{prefix}_PENABLE"),
"PWRITE": getattr(dut, f"{prefix}_PWRITE"),
"PADDR": getattr(dut, f"{prefix}_PADDR"),
"PWDATA": getattr(dut, f"{prefix}_PWDATA"),
"PSEL": SignalHandle(dut, f"{prefix}_PSEL"),
"PENABLE": SignalHandle(dut, f"{prefix}_PENABLE"),
"PWRITE": SignalHandle(dut, f"{prefix}_PWRITE"),
"PADDR": SignalHandle(dut, f"{prefix}_PADDR"),
"PWDATA": SignalHandle(dut, f"{prefix}_PWDATA"),
},
"inputs": {
"PRDATA": getattr(dut, f"{prefix}_PRDATA"),
"PREADY": getattr(dut, f"{prefix}_PREADY"),
"PSLVERR": getattr(dut, f"{prefix}_PSLVERR"),
"PRDATA": SignalHandle(dut, f"{prefix}_PRDATA"),
"PREADY": SignalHandle(dut, f"{prefix}_PREADY"),
"PSLVERR": SignalHandle(dut, f"{prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry

View File

@@ -9,6 +9,7 @@ from typing import Any, Iterable
import cocotb
from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _Apb4SlaveShim:
"""Lightweight accessor for the APB4 slave side of the DUT."""
@@ -37,10 +38,7 @@ def _load_config() -> dict[str, Any]:
def _resolve(handle, indices: Iterable[int]):
"""Index into hierarchical cocotb handles."""
ref = handle
for idx in indices:
ref = ref[idx]
return ref
return resolve_handle(handle, indices)
def _set_value(handle, indices: Iterable[int], value: int) -> None:
@@ -59,18 +57,18 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
"port_prefix": port_prefix,
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"PSEL": getattr(dut, f"{port_prefix}_PSEL"),
"PENABLE": getattr(dut, f"{port_prefix}_PENABLE"),
"PWRITE": getattr(dut, f"{port_prefix}_PWRITE"),
"PADDR": getattr(dut, f"{port_prefix}_PADDR"),
"PPROT": getattr(dut, f"{port_prefix}_PPROT"),
"PWDATA": getattr(dut, f"{port_prefix}_PWDATA"),
"PSTRB": getattr(dut, f"{port_prefix}_PSTRB"),
"PSEL": SignalHandle(dut, f"{port_prefix}_PSEL"),
"PENABLE": SignalHandle(dut, f"{port_prefix}_PENABLE"),
"PWRITE": SignalHandle(dut, f"{port_prefix}_PWRITE"),
"PADDR": SignalHandle(dut, f"{port_prefix}_PADDR"),
"PPROT": SignalHandle(dut, f"{port_prefix}_PPROT"),
"PWDATA": SignalHandle(dut, f"{port_prefix}_PWDATA"),
"PSTRB": SignalHandle(dut, f"{port_prefix}_PSTRB"),
},
"inputs": {
"PRDATA": getattr(dut, f"{port_prefix}_PRDATA"),
"PREADY": getattr(dut, f"{port_prefix}_PREADY"),
"PSLVERR": getattr(dut, f"{port_prefix}_PSLVERR"),
"PRDATA": SignalHandle(dut, f"{port_prefix}_PRDATA"),
"PREADY": SignalHandle(dut, f"{port_prefix}_PREADY"),
"PSLVERR": SignalHandle(dut, f"{port_prefix}_PSLVERR"),
},
}
table[master["inst_name"]] = entry

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import json
from pathlib import Path
import logging
import pytest
from peakrdl_busdecoder.cpuif.apb4.apb4_cpuif_flat import APB4CpuifFlat
@@ -43,17 +43,38 @@ def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
runner = get_runner("verilator")
sim_build = build_root / "sim_build"
try:
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
log_file=str(build_root / "build.log"),
)
except SystemExit as e:
# Print build log on failure for easier debugging
log_path = build_root / "build.log"
if log_path.exists():
logging.error("\n\n=== Build Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Build Log ===\n")
if e.code != 0:
raise
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
)
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb4.smoke.test_register_access",
build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
try:
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb4.smoke.test_register_access",
build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
except SystemExit as e:
# Print simulation log on failure for easier debugging
log_path = build_root / "simulation.log"
if log_path.exists():
logging.error("\n\n=== Simulation Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Simulation Log ===\n")
if e.code != 0:
raise

View File

@@ -9,6 +9,7 @@ from typing import Any, Iterable
import cocotb
from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _AxilSlaveShim:
"""Accessor for AXI4-Lite slave ports on the DUT."""
@@ -44,10 +45,7 @@ def _load_config() -> dict[str, Any]:
def _resolve(handle, indices: Iterable[int]):
ref = handle
for idx in indices:
ref = ref[idx]
return ref
return resolve_handle(handle, indices)
def _set_value(handle, indices: Iterable[int], value: int) -> None:
@@ -65,25 +63,25 @@ def _build_master_table(dut, masters_cfg: list[dict[str, Any]]) -> dict[str, dic
entry = {
"indices": [tuple(idx) for idx in master["indices"]] or [tuple()],
"outputs": {
"AWVALID": getattr(dut, f"{prefix}_AWVALID"),
"AWADDR": getattr(dut, f"{prefix}_AWADDR"),
"AWPROT": getattr(dut, f"{prefix}_AWPROT"),
"WVALID": getattr(dut, f"{prefix}_WVALID"),
"WDATA": getattr(dut, f"{prefix}_WDATA"),
"WSTRB": getattr(dut, f"{prefix}_WSTRB"),
"ARVALID": getattr(dut, f"{prefix}_ARVALID"),
"ARADDR": getattr(dut, f"{prefix}_ARADDR"),
"ARPROT": getattr(dut, f"{prefix}_ARPROT"),
"AWVALID": SignalHandle(dut, f"{prefix}_AWVALID"),
"AWADDR": SignalHandle(dut, f"{prefix}_AWADDR"),
"AWPROT": SignalHandle(dut, f"{prefix}_AWPROT"),
"WVALID": SignalHandle(dut, f"{prefix}_WVALID"),
"WDATA": SignalHandle(dut, f"{prefix}_WDATA"),
"WSTRB": SignalHandle(dut, f"{prefix}_WSTRB"),
"ARVALID": SignalHandle(dut, f"{prefix}_ARVALID"),
"ARADDR": SignalHandle(dut, f"{prefix}_ARADDR"),
"ARPROT": SignalHandle(dut, f"{prefix}_ARPROT"),
},
"inputs": {
"AWREADY": getattr(dut, f"{prefix}_AWREADY"),
"WREADY": getattr(dut, f"{prefix}_WREADY"),
"BVALID": getattr(dut, f"{prefix}_BVALID"),
"BRESP": getattr(dut, f"{prefix}_BRESP"),
"ARREADY": getattr(dut, f"{prefix}_ARREADY"),
"RVALID": getattr(dut, f"{prefix}_RVALID"),
"RDATA": getattr(dut, f"{prefix}_RDATA"),
"RRESP": getattr(dut, f"{prefix}_RRESP"),
"AWREADY": SignalHandle(dut, f"{prefix}_AWREADY"),
"WREADY": SignalHandle(dut, f"{prefix}_WREADY"),
"BVALID": SignalHandle(dut, f"{prefix}_BVALID"),
"BRESP": SignalHandle(dut, f"{prefix}_BRESP"),
"ARREADY": SignalHandle(dut, f"{prefix}_ARREADY"),
"RVALID": SignalHandle(dut, f"{prefix}_RVALID"),
"RDATA": SignalHandle(dut, f"{prefix}_RDATA"),
"RRESP": SignalHandle(dut, f"{prefix}_RRESP"),
},
}
table[master["inst_name"]] = entry

View File

@@ -0,0 +1,69 @@
"""Utilities for resolving cocotb signal handles across simulators."""
from __future__ import annotations
from typing import Any, Iterable
class SignalHandle:
"""
Wrapper that resolves array elements even when the simulator does not expose
unpacked arrays through ``handle[idx]``.
"""
def __init__(self, dut, name: str) -> None:
self._dut = dut
self._name = name
self._base = getattr(dut, name, None)
self._cache: dict[tuple[int, ...], Any] = {}
def resolve(self, indices: tuple[int, ...]):
if not indices:
return self._base if self._base is not None else self._lookup(tuple())
if indices not in self._cache:
self._cache[indices] = self._direct_or_lookup(indices)
return self._cache[indices]
def _direct_or_lookup(self, indices: tuple[int, ...]):
if self._base is not None:
ref = self._base
try:
for idx in indices:
ref = ref[idx]
return ref
except (IndexError, TypeError, AttributeError):
pass
return self._lookup(indices)
def _lookup(self, indices: tuple[int, ...]):
suffix = "".join(f"[{idx}]" for idx in indices)
path = f"{self._name}{suffix}"
try:
return getattr(self._dut, path)
except AttributeError:
pass
errors: list[Exception] = []
for extended in (False, True):
try:
return self._dut._id(path, extended=extended)
except (AttributeError, ValueError) as exc:
errors.append(exc)
raise AttributeError(f"Unable to resolve handle '{path}' via dut._id") from errors[-1]
def resolve_handle(handle, indices: Iterable[int]):
"""Resolve either a regular cocotb handle or a ``SignalHandle`` wrapper."""
index_tuple = tuple(indices)
if isinstance(handle, SignalHandle):
return handle.resolve(index_tuple)
ref = handle
for idx in index_tuple:
ref = ref[idx]
return ref