add colorized build/sim log propgate on error to all runners (#26)

* add colorized build/sim log propgate on error to all runners

* add doctoring

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Arnav Sacheti
2025-11-26 08:01:41 -08:00
committed by GitHub
parent f0f25a6d92
commit 88827c65b5
10 changed files with 311 additions and 171 deletions

View File

@@ -11,6 +11,7 @@ from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _Apb3SlaveShim:
"""Accessor for the APB3 slave signals on the DUT."""
@@ -128,16 +129,20 @@ async def test_apb3_address_decoding(dut) -> None:
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write direction"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
assert _get_int(entry["outputs"]["PADDR"], index) == address, (
f"{master_name} must receive write address"
)
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, (
f"{master_name} must receive write data"
)
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
f"{other_name}{other_idx} should remain idle during {txn['label']}"
)
assert int(slave.PREADY.value) == 1, "Slave ready should mirror selected master"
assert int(slave.PSLVERR.value) == 0, "Write should complete without error"
@@ -164,16 +169,20 @@ async def test_apb3_address_decoding(dut) -> None:
await Timer(1, units="ns")
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should clear write during read"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, (
f"{master_name} should clear write during read"
)
assert _get_int(entry["outputs"]["PADDR"], index) == address, (
f"{master_name} must receive read address"
)
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
)
assert int(slave.PRDATA.value) == read_data, "Read data should propagate back to the slave"
assert int(slave.PREADY.value) == 1, "Slave ready should acknowledge the read"

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import json
from pathlib import Path
import logging
import pytest
@@ -15,7 +16,7 @@ except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner
from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
@pytest.mark.simulation
@@ -44,16 +45,42 @@ def test_apb3_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
runner = get_runner("verilator")
sim_build = build_root / "sim_build"
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
)
build_log_file = build_root / "build.log"
sim_log_file = build_root / "simulation.log"
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb3.smoke.test_register_access",
build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
try:
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
log_file=str(build_log_file),
)
except SystemExit as e:
# Print build log on failure for easier debugging
if build_log_file.exists():
logging.error(f"""
=== Build Log ===
{colorize_cocotb_log(build_log_file.read_text())}
=== End Build Log ===
""")
if e.code != 0:
raise
try:
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb3.smoke.test_register_access",
build_dir=sim_build,
log_file=str(sim_log_file),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
except SystemExit as e:
# Print simulation log on failure for easier debugging
if sim_log_file.exists():
logging.error(f"""
=== Simulation Log ===
{colorize_cocotb_log(sim_log_file.read_text())}
=== End Simulation Log ===
""")
if e.code != 0:
raise

View File

@@ -41,23 +41,23 @@ def _apb3_master(dut, base: str):
async def test_depth_1(dut):
"""Test max_decode_depth=1 - should have interface for inner1 only."""
s_apb = _apb3_slave(dut)
# At depth 1, we should have m_apb_inner1 but not deeper interfaces
inner1 = _apb3_master(dut, "m_apb_inner1")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
s_apb.PADDR.value = 0
s_apb.PWDATA.value = 0
inner1.PRDATA.value = 0
inner1.PREADY.value = 0
inner1.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select inner1)
inner1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -65,9 +65,9 @@ async def test_depth_1(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(inner1.PSEL.value) == 1, "inner1 must be selected"
assert int(inner1.PWRITE.value) == 1, "Write should propagate"
assert int(s_apb.PREADY.value) == 1, "Ready should mirror master"
@@ -77,28 +77,28 @@ async def test_depth_1(dut):
async def test_depth_2(dut):
"""Test max_decode_depth=2 - should have interfaces for reg1 and inner2."""
s_apb = _apb3_slave(dut)
# At depth 2, we should have m_apb_reg1 and m_apb_inner2
reg1 = _apb3_master(dut, "m_apb_reg1")
inner2 = _apb3_master(dut, "m_apb_inner2")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
s_apb.PADDR.value = 0
s_apb.PWDATA.value = 0
reg1.PRDATA.value = 0
reg1.PREADY.value = 0
reg1.PSLVERR.value = 0
inner2.PRDATA.value = 0
inner2.PREADY.value = 0
inner2.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -106,18 +106,18 @@ async def test_depth_2(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
assert int(inner2.PSEL.value) == 0, "inner2 should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg1.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select inner2)
inner2.PREADY.value = 1
s_apb.PADDR.value = 0x10
@@ -125,9 +125,9 @@ async def test_depth_2(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
@@ -136,26 +136,26 @@ async def test_depth_2(dut):
async def test_depth_0(dut):
"""Test max_decode_depth=0 - should have interfaces for all leaf registers."""
s_apb = _apb3_slave(dut)
# At depth 0, we should have all leaf registers: reg1, reg2, reg2b
reg1 = _apb3_master(dut, "m_apb_reg1")
reg2 = _apb3_master(dut, "m_apb_reg2")
reg2b = _apb3_master(dut, "m_apb_reg2b")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
s_apb.PWRITE.value = 0
s_apb.PADDR.value = 0
s_apb.PWDATA.value = 0
for master in [reg1, reg2, reg2b]:
master.PRDATA.value = 0
master.PREADY.value = 0
master.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -163,19 +163,19 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"
assert int(reg2b.PSEL.value) == 0, "reg2b should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg1.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select reg2)
reg2.PREADY.value = 1
s_apb.PADDR.value = 0x10
@@ -183,19 +183,19 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
assert int(reg2b.PSEL.value) == 0, "reg2b should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg2.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x14 (should select reg2b)
reg2b.PREADY.value = 1
s_apb.PADDR.value = 0x14
@@ -203,9 +203,9 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"

View File

@@ -11,6 +11,7 @@ from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _Apb4SlaveShim:
"""Lightweight accessor for the APB4 slave side of the DUT."""
@@ -141,17 +142,23 @@ async def test_apb4_address_decoding(dut) -> None:
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} should assert PSEL for write"
assert _get_int(entry["outputs"]["PWRITE"], index) == 1, f"{master_name} should see write intent"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive write address"
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, f"{master_name} must receive write data"
assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, f"{master_name} must receive full strobes"
assert _get_int(entry["outputs"]["PADDR"], index) == address, (
f"{master_name} must receive write address"
)
assert _get_int(entry["outputs"]["PWDATA"], index) == write_data, (
f"{master_name} must receive write data"
)
assert _get_int(entry["outputs"]["PSTRB"], index) == strobe_mask, (
f"{master_name} must receive full strobes"
)
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} should remain idle during {txn['label']}"
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
f"{other_name}{other_idx} should remain idle during {txn['label']}"
)
assert int(slave.PREADY.value) == 1, "Slave ready should reflect selected master"
assert int(slave.PSLVERR.value) == 0, "No error expected during write"
@@ -179,16 +186,20 @@ async def test_apb4_address_decoding(dut) -> None:
await Timer(1, units="ns")
assert _get_int(entry["outputs"]["PSEL"], index) == 1, f"{master_name} must assert PSEL for read"
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, f"{master_name} should deassert write for reads"
assert _get_int(entry["outputs"]["PADDR"], index) == address, f"{master_name} must receive read address"
assert _get_int(entry["outputs"]["PWRITE"], index) == 0, (
f"{master_name} should deassert write for reads"
)
assert _get_int(entry["outputs"]["PADDR"], index) == address, (
f"{master_name} must receive read address"
)
for other_name, other_idx in _all_index_pairs(masters):
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["PSEL"], other_idx) == 0
), f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
assert _get_int(other_entry["outputs"]["PSEL"], other_idx) == 0, (
f"{other_name}{other_idx} must stay idle during read of {txn['label']}"
)
assert int(slave.PRDATA.value) == read_data, "Slave should observe readback data from master"
assert int(slave.PREADY.value) == 1, "Slave ready should follow responding master"

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import json
from pathlib import Path
import logging
import pytest
from peakrdl_busdecoder.cpuif.apb4.apb4_cpuif_flat import APB4CpuifFlat
@@ -15,7 +16,7 @@ except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner
from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
@pytest.mark.simulation
@@ -43,21 +44,25 @@ def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
runner = get_runner("verilator")
sim_build = build_root / "sim_build"
build_log_file = build_root / "build.log"
sim_log_file = build_root / "simulation.log"
try:
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
log_file=str(build_root / "build.log"),
log_file=str(build_log_file),
)
except SystemExit as e:
# Print build log on failure for easier debugging
log_path = build_root / "build.log"
if log_path.exists():
logging.error("\n\n=== Build Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Build Log ===\n")
if build_log_file.exists():
logging.error(f"""
=== Build Log ===
{colorize_cocotb_log(build_log_file.read_text())}
=== End Build Log ===
""")
if e.code != 0:
raise
@@ -66,15 +71,16 @@ def test_apb4_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.apb4.smoke.test_register_access",
build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
log_file=str(sim_log_file),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
except SystemExit as e:
# Print simulation log on failure for easier debugging
log_path = build_root / "simulation.log"
if log_path.exists():
logging.error("\n\n=== Simulation Log ===\n")
logging.error(log_path.read_text())
logging.error("\n=== End Simulation Log ===\n")
if sim_log_file.exists():
logging.error(f"""
=== Simulation Log ===
{colorize_cocotb_log(sim_log_file.read_text())}
=== End Simulation Log ===
""")
if e.code != 0:
raise

View File

@@ -45,10 +45,10 @@ def _apb4_master(dut, base: str):
async def test_depth_1(dut):
"""Test max_decode_depth=1 - should have interface for inner1 only."""
s_apb = _apb4_slave(dut)
# At depth 1, we should have m_apb_inner1 but not deeper interfaces
inner1 = _apb4_master(dut, "m_apb_inner1")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
@@ -57,13 +57,13 @@ async def test_depth_1(dut):
s_apb.PWDATA.value = 0
s_apb.PPROT.value = 0
s_apb.PSTRB.value = 0
inner1.PRDATA.value = 0
inner1.PREADY.value = 0
inner1.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select inner1)
inner1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -72,9 +72,9 @@ async def test_depth_1(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(inner1.PSEL.value) == 1, "inner1 must be selected"
assert int(inner1.PWRITE.value) == 1, "Write should propagate"
assert int(s_apb.PREADY.value) == 1, "Ready should mirror master"
@@ -84,11 +84,11 @@ async def test_depth_1(dut):
async def test_depth_2(dut):
"""Test max_decode_depth=2 - should have interfaces for reg1 and inner2."""
s_apb = _apb4_slave(dut)
# At depth 2, we should have m_apb_reg1 and m_apb_inner2
reg1 = _apb4_master(dut, "m_apb_reg1")
inner2 = _apb4_master(dut, "m_apb_inner2")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
@@ -97,17 +97,17 @@ async def test_depth_2(dut):
s_apb.PWDATA.value = 0
s_apb.PPROT.value = 0
s_apb.PSTRB.value = 0
reg1.PRDATA.value = 0
reg1.PREADY.value = 0
reg1.PSLVERR.value = 0
inner2.PRDATA.value = 0
inner2.PREADY.value = 0
inner2.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -116,18 +116,18 @@ async def test_depth_2(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
assert int(inner2.PSEL.value) == 0, "inner2 should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg1.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select inner2)
inner2.PREADY.value = 1
s_apb.PADDR.value = 0x10
@@ -136,9 +136,9 @@ async def test_depth_2(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(inner2.PSEL.value) == 1, "inner2 must be selected for address 0x10"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
@@ -147,12 +147,12 @@ async def test_depth_2(dut):
async def test_depth_0(dut):
"""Test max_decode_depth=0 - should have interfaces for all leaf registers."""
s_apb = _apb4_slave(dut)
# At depth 0, we should have all leaf registers: reg1, reg2, reg2b
reg1 = _apb4_master(dut, "m_apb_reg1")
reg2 = _apb4_master(dut, "m_apb_reg2")
reg2b = _apb4_master(dut, "m_apb_reg2b")
# Default slave side inputs
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
@@ -161,14 +161,14 @@ async def test_depth_0(dut):
s_apb.PWDATA.value = 0
s_apb.PPROT.value = 0
s_apb.PSTRB.value = 0
for master in [reg1, reg2, reg2b]:
master.PRDATA.value = 0
master.PREADY.value = 0
master.PSLVERR.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.PREADY.value = 1
s_apb.PADDR.value = 0x0
@@ -177,19 +177,19 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg1.PSEL.value) == 1, "reg1 must be selected for address 0x0"
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"
assert int(reg2b.PSEL.value) == 0, "reg2b should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg1.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select reg2)
reg2.PREADY.value = 1
s_apb.PADDR.value = 0x10
@@ -198,19 +198,19 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg2.PSEL.value) == 1, "reg2 must be selected for address 0x10"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
assert int(reg2b.PSEL.value) == 0, "reg2b should not be selected"
# Reset
s_apb.PSEL.value = 0
s_apb.PENABLE.value = 0
reg2.PREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x14 (should select reg2b)
reg2b.PREADY.value = 1
s_apb.PADDR.value = 0x14
@@ -219,9 +219,9 @@ async def test_depth_0(dut):
s_apb.PWRITE.value = 1
s_apb.PSEL.value = 1
s_apb.PENABLE.value = 1
await Timer(1, units="ns")
assert int(reg2b.PSEL.value) == 1, "reg2b must be selected for address 0x14"
assert int(reg1.PSEL.value) == 0, "reg1 should not be selected"
assert int(reg2.PSEL.value) == 0, "reg2 should not be selected"

View File

@@ -11,6 +11,7 @@ from cocotb.triggers import Timer
from tests.cocotb_lib.handle_utils import SignalHandle, resolve_handle
class _AxilSlaveShim:
"""Accessor for AXI4-Lite slave ports on the DUT."""
@@ -167,12 +168,12 @@ async def test_axi4lite_address_decoding(dut) -> None:
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0
), f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}"
assert (
_get_int(other_entry["outputs"]["WVALID"], other_idx) == 0
), f"{other_name}{other_idx} WVALID should remain low during {txn['label']}"
assert _get_int(other_entry["outputs"]["AWVALID"], other_idx) == 0, (
f"{other_name}{other_idx} AWVALID should remain low during {txn['label']}"
)
assert _get_int(other_entry["outputs"]["WVALID"], other_idx) == 0, (
f"{other_name}{other_idx} WVALID should remain low during {txn['label']}"
)
slave.AWVALID.value = 0
slave.WVALID.value = 0
@@ -198,9 +199,9 @@ async def test_axi4lite_address_decoding(dut) -> None:
if other_name == master_name and other_idx == index:
continue
other_entry = masters[other_name]
assert (
_get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0
), f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}"
assert _get_int(other_entry["outputs"]["ARVALID"], other_idx) == 0, (
f"{other_name}{other_idx} ARVALID should remain low during read of {txn['label']}"
)
assert int(slave.RVALID.value) == 1, "Slave should observe RVALID when master responds"
assert int(slave.RDATA.value) == read_data, "Read data must fold back to slave"

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import json
from pathlib import Path
import logging
import pytest
@@ -15,7 +16,7 @@ except ImportError: # pragma: no cover
from cocotb_tools.runner import get_runner
from tests.cocotb_lib import RDL_CASES
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case
from tests.cocotb_lib.utils import get_verilog_sources, prepare_cpuif_case, colorize_cocotb_log
@pytest.mark.simulation
@@ -44,16 +45,42 @@ def test_axi4lite_smoke(tmp_path: Path, rdl_file: str, top_name: str) -> None:
runner = get_runner("verilator")
sim_build = build_root / "sim_build"
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
)
build_log_file = build_root / "build.log"
sim_log_file = build_root / "simulation.log"
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.axi4lite.smoke.test_register_access",
build_dir=sim_build,
log_file=str(build_root / "simulation.log"),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
try:
runner.build(
sources=sources,
hdl_toplevel=module_path.stem,
build_dir=sim_build,
log_file=str(build_log_file),
)
except SystemExit as e:
# Print build log on failure for easier debugging
if build_log_file.exists():
logging.error(f"""
=== Build Log ===
{colorize_cocotb_log(build_log_file.read_text())}
=== End Build Log ===
""")
if e.code != 0:
raise
try:
runner.test(
hdl_toplevel=module_path.stem,
test_module="tests.cocotb.axi4lite.smoke.test_register_access",
build_dir=sim_build,
log_file=str(sim_log_file),
extra_env={"RDL_TEST_CONFIG": json.dumps(config)},
)
except SystemExit as e:
# Print simulation log on failure for easier debugging
if sim_log_file.exists():
logging.error(f"""
=== Simulation Log ===
{colorize_cocotb_log(sim_log_file.read_text())}
=== End Simulation Log ===
""")
if e.code != 0:
raise

View File

@@ -63,10 +63,10 @@ def _axil_master(dut, base: str):
async def test_depth_1(dut):
"""Test max_decode_depth=1 - should have interface for inner1 only."""
s_axil = _axil_slave(dut)
# At depth 1, we should have m_axil_inner1 but not deeper interfaces
inner1 = _axil_master(dut, "m_axil_inner1")
# Default slave side inputs
s_axil.AWVALID.value = 0
s_axil.AWADDR.value = 0
@@ -79,7 +79,7 @@ async def test_depth_1(dut):
s_axil.ARADDR.value = 0
s_axil.ARPROT.value = 0
s_axil.RREADY.value = 0
inner1.AWREADY.value = 0
inner1.WREADY.value = 0
inner1.BVALID.value = 0
@@ -88,9 +88,9 @@ async def test_depth_1(dut):
inner1.RVALID.value = 0
inner1.RDATA.value = 0
inner1.RRESP.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select inner1)
inner1.AWREADY.value = 1
inner1.WREADY.value = 1
@@ -99,9 +99,9 @@ async def test_depth_1(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0x12345678
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(inner1.AWVALID.value) == 1, "inner1 write address valid must be set"
assert int(inner1.WVALID.value) == 1, "inner1 write data valid must be set"
@@ -110,11 +110,11 @@ async def test_depth_1(dut):
async def test_depth_2(dut):
"""Test max_decode_depth=2 - should have interfaces for reg1 and inner2."""
s_axil = _axil_slave(dut)
# At depth 2, we should have m_axil_reg1 and m_axil_inner2
reg1 = _axil_master(dut, "m_axil_reg1")
inner2 = _axil_master(dut, "m_axil_inner2")
# Default slave side inputs
s_axil.AWVALID.value = 0
s_axil.AWADDR.value = 0
@@ -127,7 +127,7 @@ async def test_depth_2(dut):
s_axil.ARADDR.value = 0
s_axil.ARPROT.value = 0
s_axil.RREADY.value = 0
for master in [reg1, inner2]:
master.AWREADY.value = 0
master.WREADY.value = 0
@@ -137,9 +137,9 @@ async def test_depth_2(dut):
master.RVALID.value = 0
master.RDATA.value = 0
master.RRESP.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.AWREADY.value = 1
reg1.WREADY.value = 1
@@ -148,19 +148,19 @@ async def test_depth_2(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0xABCDEF01
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(reg1.AWVALID.value) == 1, "reg1 must be selected for address 0x0"
assert int(inner2.AWVALID.value) == 0, "inner2 should not be selected"
# Reset
s_axil.AWVALID.value = 0
s_axil.WVALID.value = 0
reg1.AWREADY.value = 0
reg1.WREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select inner2)
inner2.AWREADY.value = 1
inner2.WREADY.value = 1
@@ -169,9 +169,9 @@ async def test_depth_2(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0x23456789
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(inner2.AWVALID.value) == 1, "inner2 must be selected for address 0x10"
assert int(reg1.AWVALID.value) == 0, "reg1 should not be selected"
@@ -180,12 +180,12 @@ async def test_depth_2(dut):
async def test_depth_0(dut):
"""Test max_decode_depth=0 - should have interfaces for all leaf registers."""
s_axil = _axil_slave(dut)
# At depth 0, we should have all leaf registers: reg1, reg2, reg2b
reg1 = _axil_master(dut, "m_axil_reg1")
reg2 = _axil_master(dut, "m_axil_reg2")
reg2b = _axil_master(dut, "m_axil_reg2b")
# Default slave side inputs
s_axil.AWVALID.value = 0
s_axil.AWADDR.value = 0
@@ -198,7 +198,7 @@ async def test_depth_0(dut):
s_axil.ARADDR.value = 0
s_axil.ARPROT.value = 0
s_axil.RREADY.value = 0
for master in [reg1, reg2, reg2b]:
master.AWREADY.value = 0
master.WREADY.value = 0
@@ -208,9 +208,9 @@ async def test_depth_0(dut):
master.RVALID.value = 0
master.RDATA.value = 0
master.RRESP.value = 0
await Timer(1, units="ns")
# Write to address 0x0 (should select reg1)
reg1.AWREADY.value = 1
reg1.WREADY.value = 1
@@ -219,20 +219,20 @@ async def test_depth_0(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0x11111111
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(reg1.AWVALID.value) == 1, "reg1 must be selected for address 0x0"
assert int(reg2.AWVALID.value) == 0, "reg2 should not be selected"
assert int(reg2b.AWVALID.value) == 0, "reg2b should not be selected"
# Reset
s_axil.AWVALID.value = 0
s_axil.WVALID.value = 0
reg1.AWREADY.value = 0
reg1.WREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x10 (should select reg2)
reg2.AWREADY.value = 1
reg2.WREADY.value = 1
@@ -241,20 +241,20 @@ async def test_depth_0(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0x22222222
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(reg2.AWVALID.value) == 1, "reg2 must be selected for address 0x10"
assert int(reg1.AWVALID.value) == 0, "reg1 should not be selected"
assert int(reg2b.AWVALID.value) == 0, "reg2b should not be selected"
# Reset
s_axil.AWVALID.value = 0
s_axil.WVALID.value = 0
reg2.AWREADY.value = 0
reg2.WREADY.value = 0
await Timer(1, units="ns")
# Write to address 0x14 (should select reg2b)
reg2b.AWREADY.value = 1
reg2b.WREADY.value = 1
@@ -263,9 +263,9 @@ async def test_depth_0(dut):
s_axil.WVALID.value = 1
s_axil.WDATA.value = 0x33333333
s_axil.WSTRB.value = 0xF
await Timer(1, units="ns")
assert int(reg2b.AWVALID.value) == 1, "reg2b must be selected for address 0x14"
assert int(reg1.AWVALID.value) == 0, "reg1 should not be selected"
assert int(reg2.AWVALID.value) == 0, "reg2 should not be selected"

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from collections import defaultdict
from pathlib import Path
import re
from typing import Any
from systemrdl import RDLCompiler
@@ -12,6 +13,64 @@ from systemrdl.node import AddressableNode, AddrmapNode, RegNode
from peakrdl_busdecoder.cpuif.base_cpuif import BaseCpuif
from peakrdl_busdecoder.exporter import BusDecoderExporter
RESET = "\x1b[0m"
DIM = "\x1b[2m"
LEVEL_COLORS = {
"DEBUG": "\x1b[35m", # magenta
"INFO": "\x1b[36m", # cyan
"WARNING": "\x1b[33m", # yellow
"ERROR": "\x1b[31m", # red
"CRITICAL": "\x1b[1;31m", # bold red
}
# Matches lines like:
# " 0.00ns INFO cocotb ..." or "-.--ns INFO gpi ..."
LINE_RE = re.compile(
r"^(?P<prefix>\s*)" # leading spaces
r"(?P<time>[-0-9\.]+[a-zA-Z]+)" # timestamp (e.g. 0.00ns, -.--ns)
r"\s+"
r"(?P<level>[A-Z]+)" # log level
r"(?P<rest>.*)$" # the rest of the line
)
def colorize_cocotb_log(text: str) -> str:
"""
Colorizes cocotb log lines for improved readability in terminal output.
Each log line is parsed to identify the timestamp and log level, which are then
colorized using ANSI escape codes. The timestamp is dimmed, and the log level
is colored according to its severity (e.g., INFO, WARNING, ERROR).
Args:
text: The input string containing cocotb log lines.
Returns:
A string with colorized log lines.
"""
def _color_line(match: re.Match) -> str:
prefix = match.group("prefix")
time = match.group("time")
level = match.group("level")
rest = match.group("rest")
level_color = LEVEL_COLORS.get(level, "")
# dim timestamp, color level
time_colored = f"{DIM}{time}{RESET}"
level_colored = f"{level_color}{level}{RESET}" if level_color else level
return f"{prefix}{time_colored} {level_colored}{rest}"
lines = []
for line in text.splitlines():
m = LINE_RE.match(line)
if m:
lines.append(_color_line(m))
else:
lines.append(line)
return "\n".join(lines)
def compile_rdl_and_export(
rdl_source: str, top_name: str, output_dir: Path, cpuif_cls: type[BaseCpuif], **kwargs: Any