refactor to use base listener for busdecoder

This commit is contained in:
Arnav Sacheti
2025-10-19 21:35:17 -07:00
parent be116def83
commit bafebf8595
10 changed files with 136 additions and 65 deletions

View File

@@ -2,11 +2,13 @@ from .body import Body, SupportsStr
from .combinational_body import CombinationalBody from .combinational_body import CombinationalBody
from .for_loop_body import ForLoopBody from .for_loop_body import ForLoopBody
from .if_body import IfBody from .if_body import IfBody
from .struct_body import StructBody
__all__ = [ __all__ = [
"Body", "Body",
"CombinationalBody", "CombinationalBody",
"ForLoopBody", "ForLoopBody",
"IfBody", "IfBody",
"StructBody",
"SupportsStr", "SupportsStr",
] ]

View File

@@ -6,5 +6,5 @@ from .body import Body
class CombinationalBody(Body): class CombinationalBody(Body):
def __str__(self) -> str: def __str__(self) -> str:
return f"""always_comb begin return f"""always_comb begin
{indent(super().__str__(), " ")} {indent(super().__str__(), " ")}
end""" end"""

View File

@@ -4,12 +4,22 @@ from .body import Body
class StructBody(Body): class StructBody(Body):
def __init__(self, name: str, packed: bool = True) -> None: def __init__(self, name: str, typedef: bool = False, packed: bool = False) -> None:
super().__init__() super().__init__()
self._name = name self._name = name
self._typedef = typedef
self._packed = packed self._packed = packed
@property
def name(self) -> str:
return self._name
def __str__(self) -> str: def __str__(self) -> str:
return f"""typedef struct {"packed " if self._packed else ""} {{ if self._typedef:
return f"""typedef struct {"packed " if self._packed else ""}{{
{indent(super().__str__(), " ")}
}} {self._name};"""
return f"""struct {{
{indent(super().__str__(), " ")} {indent(super().__str__(), " ")}
}} {self._name};""" }} {self._name};"""

View File

@@ -97,9 +97,4 @@ class BaseCpuif:
addr = node.raw_absolute_address - self.exp.ds.top_node.raw_absolute_address addr = node.raw_absolute_address - self.exp.ds.top_node.raw_absolute_address
size = node.size size = node.size
addr_msb = clog2(addr + size) - 1 return f"({cpuif_addr} - 'h{addr:x})[{clog2(size) - 1}:0]"
addr_lsb = clog2(addr)
if addr_msb == addr_lsb:
return f"{cpuif_addr}[{addr_lsb}]"
return f"{cpuif_addr}[{addr_msb}:{addr_lsb}]"

View File

@@ -2,10 +2,11 @@ from collections import deque
from enum import Enum from enum import Enum
from systemrdl.node import AddressableNode from systemrdl.node import AddressableNode
from systemrdl.walker import RDLListener, RDLSimpleWalker, WalkerAction from systemrdl.walker import WalkerAction
from .body import Body, ForLoopBody, IfBody from .body import Body, ForLoopBody, IfBody
from .design_state import DesignState from .design_state import DesignState
from .listener import BusDecoderListener
from .sv_int import SVInt from .sv_int import SVInt
from .utils import get_indexed_path from .utils import get_indexed_path
@@ -23,30 +24,17 @@ class DecodeLogicFlavor(Enum):
return f"cpuif_{self.value}_sel" return f"cpuif_{self.value}_sel"
class AddressDecode: class DecodeLogicGenerator(BusDecoderListener):
def __init__(self, flavor: DecodeLogicFlavor, ds: DesignState) -> None:
self._flavor = flavor
self._ds = ds
def walk(self) -> str:
walker = RDLSimpleWalker()
dlg = DecodeLogicGenerator(self._flavor, self._ds)
walker.walk(self._ds.top_node, dlg, skip_top=True)
return str(dlg)
class DecodeLogicGenerator(RDLListener):
def __init__( def __init__(
self, self,
flavor: DecodeLogicFlavor,
ds: DesignState, ds: DesignState,
flavor: DecodeLogicFlavor,
) -> None: ) -> None:
self._ds = ds super().__init__(ds)
self._flavor = flavor self._flavor = flavor
self._decode_stack: deque[Body] = deque() # Tracks decoder body self._decode_stack: deque[Body] = deque() # Tracks decoder body
self._cond_stack: deque[str] = deque() # Tracks conditions nested for loops self._cond_stack: deque[str] = deque() # Tracks conditions nested for loops
self._array_stride_stack: deque[int] = deque() # Tracks nested array strids
# Initial Stack Conditions # Initial Stack Conditions
self._decode_stack.append(IfBody()) self._decode_stack.append(IfBody())
@@ -55,15 +43,19 @@ class DecodeLogicGenerator(RDLListener):
# Generate address bounds # Generate address bounds
addr_width = self._ds.addr_width addr_width = self._ds.addr_width
l_bound = SVInt( l_bound = SVInt(
node.raw_address_offset, node.raw_absolute_address,
addr_width, addr_width,
) )
u_bound = l_bound + SVInt(node.total_size, addr_width) u_bound = l_bound + SVInt(node.total_size, addr_width)
array_stack = list(self._array_stride_stack)
if node.array_dimensions:
array_stack = array_stack[: -len(node.array_dimensions)]
# Handle arrayed components # Handle arrayed components
l_bound_comp = [str(l_bound)] l_bound_comp = [str(l_bound)]
u_bound_comp = [str(u_bound)] u_bound_comp = [str(u_bound)]
for i, stride in enumerate(self._array_stride_stack): for i, stride in enumerate(array_stack):
l_bound_comp.append(f"({addr_width}'(i{i})*{SVInt(stride, addr_width)})") l_bound_comp.append(f"({addr_width}'(i{i})*{SVInt(stride, addr_width)})")
u_bound_comp.append(f"({addr_width}'(i{i})*{SVInt(stride, addr_width)})") u_bound_comp.append(f"({addr_width}'(i{i})*{SVInt(stride, addr_width)})")
@@ -82,30 +74,20 @@ class DecodeLogicGenerator(RDLListener):
return [] return []
def enter_AddressableComponent(self, node: AddressableNode) -> WalkerAction | None: def enter_AddressableComponent(self, node: AddressableNode) -> WalkerAction | None:
action = super().enter_AddressableComponent(node)
conditions: list[str] = [] conditions: list[str] = []
conditions.extend(self.cpuif_addr_predicate(node)) conditions.extend(self.cpuif_addr_predicate(node))
conditions.extend(self.cpuif_prot_predicate(node)) conditions.extend(self.cpuif_prot_predicate(node))
condition = " && ".join(f"({c})" for c in conditions) condition = " && ".join(f"({c})" for c in conditions)
if node.array_dimensions:
assert node.array_stride is not None, "Array stride should be defined for arrayed components"
# Collect strides for each array dimension
current_stride = node.array_stride
strides: list[int] = []
for dim in reversed(node.array_dimensions):
strides.append(current_stride)
current_stride *= dim
strides.reverse()
self._array_stride_stack.extend(strides)
# Generate condition string and manage stack # Generate condition string and manage stack
if isinstance(self._decode_stack[-1], IfBody) and node.array_dimensions: if isinstance(self._decode_stack[-1], IfBody) and node.array_dimensions:
# arrayed component with new if-body # arrayed component with new if-body
self._cond_stack.append(condition) self._cond_stack.append(condition)
for i, dim in enumerate( for i, dim in enumerate(
node.array_dimensions, node.array_dimensions,
start=len(self._array_stride_stack) - len(node.array_dimensions),
): ):
fb = ForLoopBody( fb = ForLoopBody(
"int", "int",
@@ -120,10 +102,7 @@ class DecodeLogicGenerator(RDLListener):
with self._decode_stack[-1].cm(condition) as b: with self._decode_stack[-1].cm(condition) as b:
b += f"{self._flavor.cpuif_select}.{get_indexed_path(self._ds.top_node, node)} = 1'b1;" b += f"{self._flavor.cpuif_select}.{get_indexed_path(self._ds.top_node, node)} = 1'b1;"
# if node.external: return action
# return WalkerAction.SkipDescendants
return WalkerAction.Continue
def exit_AddressableComponent(self, node: AddressableNode) -> None: def exit_AddressableComponent(self, node: AddressableNode) -> None:
if not node.array_dimensions: if not node.array_dimensions:
@@ -148,7 +127,7 @@ class DecodeLogicGenerator(RDLListener):
else: else:
self._decode_stack[-1] += b self._decode_stack[-1] += b
self._array_stride_stack.pop() super().exit_AddressableComponent(node)
def __str__(self) -> str: def __str__(self) -> str:
body = self._decode_stack[-1] body = self._decode_stack[-1]

View File

@@ -2,17 +2,20 @@ import os
from datetime import datetime from datetime import datetime
from importlib.metadata import version from importlib.metadata import version
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, TypedDict from typing import TYPE_CHECKING, Any, TypedDict
import jinja2 as jj import jinja2 as jj
from systemrdl.node import AddrmapNode, RootNode from systemrdl.node import AddrmapNode, RootNode
from systemrdl.walker import RDLSteerableWalker
from typing_extensions import Unpack from typing_extensions import Unpack
from .cpuif import BaseCpuif from .cpuif import BaseCpuif
from .cpuif.apb4 import APB4Cpuif from .cpuif.apb4 import APB4Cpuif
from .decoder import AddressDecode, DecodeLogicFlavor from .decoder import DecodeLogicFlavor, DecodeLogicGenerator
from .design_state import DesignState from .design_state import DesignState
from .identifier_filter import kw_filter as kwf from .identifier_filter import kw_filter as kwf
from .listener import BusDecoderListener
from .struct_generator import StructGenerator
from .sv_int import SVInt from .sv_int import SVInt
from .validate_design import DesignValidator from .validate_design import DesignValidator
@@ -32,7 +35,6 @@ if TYPE_CHECKING:
class BusDecoderExporter: class BusDecoderExporter:
cpuif: BaseCpuif cpuif: BaseCpuif
address_decode: type[AddressDecode]
ds: DesignState ds: DesignState
def __init__(self, **kwargs: Unpack[ExporterKwargs]) -> None: def __init__(self, **kwargs: Unpack[ExporterKwargs]) -> None:
@@ -56,6 +58,7 @@ class BusDecoderExporter:
undefined=jj.StrictUndefined, undefined=jj.StrictUndefined,
) )
self.jj_env.filters["kwf"] = kwf self.jj_env.filters["kwf"] = kwf
self.jj_env.filters["walk"] = self.walk
def export(self, node: RootNode | AddrmapNode, output_dir: str, **kwargs: Unpack[ExporterKwargs]) -> None: def export(self, node: RootNode | AddrmapNode, output_dir: str, **kwargs: Unpack[ExporterKwargs]) -> None:
""" """
@@ -98,7 +101,6 @@ class BusDecoderExporter:
# Construct exporter components # Construct exporter components
self.cpuif = cpuif_cls(self) self.cpuif = cpuif_cls(self)
self.address_decode = AddressDecode
# Validate that there are no unsupported constructs # Validate that there are no unsupported constructs
DesignValidator(self).do_validate() DesignValidator(self).do_validate()
@@ -108,8 +110,9 @@ class BusDecoderExporter:
"current_date": datetime.now().strftime("%Y-%m-%d"), "current_date": datetime.now().strftime("%Y-%m-%d"),
"version": version("peakrdl-busdecoder"), "version": version("peakrdl-busdecoder"),
"cpuif": self.cpuif, "cpuif": self.cpuif,
"address_decode": self.address_decode, "cpuif_decode": DecodeLogicGenerator,
"DecodeLogicFlavor": DecodeLogicFlavor, "cpuif_select": StructGenerator,
"cpuif_decode_flavor": DecodeLogicFlavor,
"ds": self.ds, "ds": self.ds,
"SVInt": SVInt, "SVInt": SVInt,
} }
@@ -125,3 +128,9 @@ class BusDecoderExporter:
template = self.jj_env.get_template("module_tmpl.sv") template = self.jj_env.get_template("module_tmpl.sv")
stream = template.stream(context) stream = template.stream(context)
stream.dump(module_file_path) stream.dump(module_file_path)
def walk(self, listener_cls: type[BusDecoderListener], **kwargs: Any) -> BusDecoderListener:
walker = RDLSteerableWalker()
listener = listener_cls(self.ds, **kwargs)
walker.walk(self.ds.top_node, listener, skip_top=True)
return str(listener)

View File

@@ -0,0 +1,29 @@
from collections import deque
from systemrdl.node import AddressableNode, RegNode
from systemrdl.walker import RDLListener, WalkerAction
from .design_state import DesignState
class BusDecoderListener(RDLListener):
def __init__(self, ds: DesignState) -> None:
self._array_stride_stack: deque[int] = deque() # Tracks nested array strides
self._ds = ds
def enter_AddressableComponent(self, node: AddressableNode) -> WalkerAction | None:
if node.array_dimensions:
assert node.array_stride is not None, "Array stride should be defined for arrayed components"
self._array_stride_stack.extend(node.array_dimensions)
if isinstance(node, RegNode):
return WalkerAction.SkipDescendants
return WalkerAction.Continue
def exit_AddressableComponent(self, node: AddressableNode) -> None:
if node.array_dimensions:
for _ in node.array_dimensions:
self._array_stride_stack.pop()
def __str__(self) -> str:
return ""

View File

@@ -32,23 +32,21 @@ module {{ds.module_name}}
logic [{{cpuif.data_width-1}}:0] cpuif_wr_data; logic [{{cpuif.data_width-1}}:0] cpuif_wr_data;
logic [{{cpuif.data_width//8-1}}:0] cpuif_wr_byte_en; logic [{{cpuif.data_width//8-1}}:0] cpuif_wr_byte_en;
logic cpuif_rd_ack [{{cpuif.addressable_children|length}}]; logic cpuif_rd_ack;
logic cpuif_rd_err [{{cpuif.addressable_children|length}}]; logic cpuif_rd_err;
logic [{{cpuif.data_width-1}}:0] cpuif_rd_data [{{cpuif.addressable_children|length}}]; logic [{{cpuif.data_width-1}}:0] cpuif_rd_data;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// Child instance signals // Child instance signals
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
typedef struct packed { {{cpuif_select|walk|indent(4)}}
} cpuif_sel_t;
cpuif_sel_t cpuif_wr_sel; cpuif_sel_t cpuif_wr_sel;
cpuif_sel_t cpuif_rd_sel; cpuif_sel_t cpuif_rd_sel;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// Slave <-> Internal CPUIF <-> Master // Slave <-> Internal CPUIF <-> Master
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
{{-cpuif.get_implementation()|indent}} {{-cpuif.get_implementation()|indent(4)}}
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// Write Address Decoder // Write Address Decoder
@@ -59,7 +57,7 @@ module {{ds.module_name}}
if (cpuif_req && cpuif_wr_en) begin if (cpuif_req && cpuif_wr_en) begin
// A write request is pending // A write request is pending
{{address_decode(DecodeLogicFlavor.WRITE, ds).walk()|indent(12)}} {{cpuif_decode|walk(flavor=cpuif_decode_flavor.WRITE)|indent(12)}}
end else begin end else begin
// No write request, all select signals remain 0 // No write request, all select signals remain 0
end end
@@ -74,7 +72,7 @@ module {{ds.module_name}}
if (cpuif_req && cpuif_rd_en) begin if (cpuif_req && cpuif_rd_en) begin
// A read request is pending // A read request is pending
{{address_decode(DecodeLogicFlavor.READ, ds).walk()|indent(12)}} {{cpuif_decode|walk(flavor=cpuif_decode_flavor.READ)|indent(12)}}
end else begin end else begin
// No read request, all select signals remain 0 // No read request, all select signals remain 0
end end

View File

@@ -0,0 +1,52 @@
from collections import deque
from systemrdl.node import AddressableNode
from systemrdl.walker import WalkerAction
from .body import Body, StructBody
from .design_state import DesignState
from .identifier_filter import kw_filter as kwf
from .listener import BusDecoderListener
class StructGenerator(BusDecoderListener):
def __init__(
self,
ds: DesignState,
) -> None:
super().__init__(ds)
self._stack: deque[Body] = deque()
self._stack.append(StructBody("cpuif_sel_t", True, True))
def enter_AddressableComponent(self, node: AddressableNode) -> WalkerAction | None:
action = super().enter_AddressableComponent(node)
if node.children():
# Push new body onto stack
body = StructBody(f"cpuif_sel_{node.inst_name}_t", True, True)
self._stack.append(body)
return action
def exit_AddressableComponent(self, node: AddressableNode) -> None:
type = "logic"
if node.children():
body = self._stack.pop()
if body and isinstance(body, StructBody):
self._stack.appendleft(body)
type = body.name
name = kwf(node.inst_name)
if node.array_dimensions:
for dim in node.array_dimensions:
name += f"[{dim}]"
self._stack[-1] += f"{type} {name};"
super().exit_AddressableComponent(node)
def __str__(self) -> str:
return "\n".join(map(str, self._stack))

View File

@@ -1,3 +0,0 @@
pytest
systemrdl-compiler
jinja2