diff --git a/README.md b/README.md index e6ccd51..cbfa152 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ GitHub repository: https://github.com/alexforencich/cocotbext-axi ## Introduction -AXI, AXI lite, and AXI stream simulation models for [cocotb](https://github.com/cocotb/cocotb). +AXI, AXI lite, AXI stream, and APB simulation models for [cocotb](https://github.com/cocotb/cocotb). ## Installation @@ -28,13 +28,13 @@ Installation for active development: ## Documentation and usage examples -See the `tests` directory, [verilog-axi](https://github.com/alexforencich/verilog-axi), and [verilog-axis](https://github.com/alexforencich/verilog-axis) for complete testbenches using these modules. +See the `tests` directory, [taxi](https://github.com/fpganinja/taxi), [verilog-axi](https://github.com/alexforencich/verilog-axi), and [verilog-axis](https://github.com/alexforencich/verilog-axis) for complete testbenches using these modules. -### AXI and AXI lite master +### AXI, AXI lite, and APB master -The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region. +The `AxiMaster`, `AxiLiteMaster`, and `ApbMaster` classes implement AXI, AXI-lite, and APB masters and are capable of generating read and write operations against the corresponding slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region. -The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. +The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbSlave` is available. To use these modules, import the one you need and connect it to the DUT: @@ -64,9 +64,9 @@ Alternatively, operations can be initiated with non-blocking `init_read()` and ` With this method, it is possible to start multiple concurrent operations from the same coroutine. It is also possible to use the events with `Combine`, `First`, and `with_timeout`. -#### `AxiMaster` and `AxiLiteMaster` constructor parameters +#### `AxiMaster`, `AxiLiteMaster`, and `ApbMaster` constructor parameters -* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals +* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals * _clock_: clock signal * _reset_: reset signal (optional) * _reset_active_level_: reset active level (optional, default `True`) @@ -114,20 +114,20 @@ With this method, it is possible to start multiple concurrent operations from th * _wuser_: AXI wuser signal, default `0` (write-related methods only) * _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only) -#### Additional optional arguments for `AxiLiteMaster` +#### Additional optional arguments for `AxiLiteMaster` and `ApbMaster` * _prot_: AXI protection flags, default `AxiProt.NONSECURE` * _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only) -#### `AxiBus` and `AxiLiteBus` objects +#### `AxiBus`, `AxiLiteBus`, and `ApbBus` objects -The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate. +The `AxiBus`, `AxiLiteBus`, `ApbBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate. For APB interfaces, use `ApbBus`. -### AXI and AXI lite slave +### AXI, AXI lite, and APB slave -The `AxiSlave` and `AxiLiteSlave` classes implement AXI slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality. +The `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` classes implement AXI, AXI-lite, and APB slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality. -The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. +The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbSlave` is available. To use these modules, import the one you need and connect it to the DUT: @@ -137,13 +137,13 @@ To use these modules, import the one you need and connect it to the DUT: region = MemoryRegion(2**axi_slave.read_if.address_width) axi_slave.target = region -The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections. +The first argument to the constructor accepts an `AxiBus`, `AxiLiteBus`, or `ApbBus` object. These objects are containers for the interface signals and include class methods to automate connections. It is also possible to extend these modules; operation can be customized by overriding the internal `_read()` and `_write()` methods. See `AxiRam` and `AxiLiteRam` for an example. -#### `AxiSlave` and `AxiLiteSlave` constructor parameters +#### `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` constructor parameters -* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals +* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals * _clock_: clock signal * _reset_: reset signal (optional) * _reset_active_level_: reset active level (optional, default `True`) @@ -153,11 +153,11 @@ It is also possible to extend these modules; operation can be customized by over * _target_: target region -### AXI and AXI lite RAM +### AXI, AXI lite, and APB RAM -The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave` and `AxiLiteSlave` modules. Internally, `SparseMemory` is used to support emulating very large memories. +The `AxiRam`, `AxiLiteRam`, and `ApbRam` classes implement AXI, AXI-lite, and APB RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` modules. Internally, `SparseMemory` is used to support emulating very large memories. -The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. +The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbRam` is available. To use these modules, import the one you need and connect it to the DUT: @@ -165,9 +165,9 @@ To use these modules, import the one you need and connect it to the DUT: axi_ram = AxiRam(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst, size=2**32) -The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections. +The first argument to the constructor accepts an `AxiBus`, `AxiLiteBus`, or `ApbBus` object. These objects are containers for the interface signals and include class methods to automate connections. -Once the module is instantiated, the memory contents can be accessed in a couple of different ways. First, the `mmap` object can be accessed directly via the `mem` attribute. Second, `read()`, `write()`, and various word-access wrappers are available. Hex dump helper methods are also provided for debugging. For example: +Once the module is instantiated, the memory contents can be accessed in a couple of different ways. First, the `mmap`/`SparseMemory` object can be accessed directly via the `mem` attribute. Second, `read()`, `write()`, and various word-access wrappers are available. Hex dump helper methods are also provided for debugging. For example: axi_ram.write(0x0000, b'test') data = axi_ram.read(0x0000, 4) @@ -180,9 +180,9 @@ Multi-port memories can be constructed by passing the `mem` object of the first axi_ram_p3 = AxiRam(AxiBus.from_prefix(dut, "m02_axi"), dut.clk, dut.rst, mem=axi_ram_p1.mem) axi_ram_p4 = AxiRam(AxiBus.from_prefix(dut, "m03_axi"), dut.clk, dut.rst, mem=axi_ram_p1.mem) -#### `AxiRam` and `AxiLiteRam` constructor parameters +#### `AxiRam`, `AxiLiteRam`, and `ApbRam` constructor parameters -* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals +* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals * _clock_: clock signal * _reset_: reset signal (optional) * _reset_active_level_: reset active level (optional, default `True`) @@ -471,3 +471,16 @@ This is a simple example that shows how the address space abstraction components * `tid`: ID signal, can be used for routing * `tdest`: destination signal, can be used for routing * `tuser`: additional sideband data + +### APB signals + +* `paddr`: address +* `pprot`: protection bits +* `psel`: select signal, for selecting a target device +* `penable`: enable signal, for performing an operation +* `pwrite`: read/write control signal +* `pwdata`: write data +* `pstrb`: write strobe +* `pready`: ready signal to stall bus +* `prdata`: read data +* `pslverr`: read/write response, indicating SLVERR diff --git a/cocotbext/axi/__init__.py b/cocotbext/axi/__init__.py index 5c9f63f..935f2ae 100644 --- a/cocotbext/axi/__init__.py +++ b/cocotbext/axi/__init__.py @@ -43,3 +43,5 @@ from .axi_channels import AxiWriteBus, AxiReadBus, AxiBus from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster from .axi_slave import AxiSlaveWrite, AxiSlaveRead, AxiSlave from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam + +from .apb import ApbBus, ApbMaster, ApbSlave, ApbRam diff --git a/cocotbext/axi/apb.py b/cocotbext/axi/apb.py new file mode 100644 index 0000000..44301a5 --- /dev/null +++ b/cocotbext/axi/apb.py @@ -0,0 +1,624 @@ +""" + +Copyright (c) 2025 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +""" + +import logging +from typing import NamedTuple + +import cocotb +from cocotb.queue import Queue +from cocotb.triggers import RisingEdge, Event +from cocotb_bus.bus import Bus + +from .version import __version__ +from .constants import AxiResp, AxiProt +from .address_space import Region +from .reset import Reset +from .memory import Memory + + +# APB master write helper objects +class ApbWriteCmd(NamedTuple): + address: int + data: bytes + prot: AxiProt + event: Event + + +class ApbWriteResp(NamedTuple): + address: int + length: int + resp: AxiResp + + +# APB master read helper objects +class ApbReadCmd(NamedTuple): + address: int + length: int + prot: AxiProt + event: Event + + +class ApbReadResp(NamedTuple): + address: int + data: bytes + resp: AxiResp + + def __bytes__(self): + return self.data + + +class ApbBus(Bus): + + _signals = ["paddr", "psel", "penable", "pwrite", "pwdata", "pstrb", "pready", "prdata"] + _optional_signals = ["pprot", "pslverr"] + + def __init__(self, entity=None, prefix=None, **kwargs): + super().__init__(entity, prefix, self._signals, optional_signals=self._optional_signals, **kwargs) + + @classmethod + def from_entity(cls, entity, **kwargs): + return cls(entity, **kwargs) + + @classmethod + def from_prefix(cls, entity, prefix, **kwargs): + return cls(entity, prefix, **kwargs) + + +class ApbPause: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._pause = False + self._pause_generator = None + self._pause_cr = None + + def _pause_update(self, val): + pass + + @property + def pause(self): + return self._pause + + @pause.setter + def pause(self, val): + if self._pause != val: + self._pause_update(val) + self._pause = val + + def set_pause_generator(self, generator=None): + if self._pause_cr is not None: + self._pause_cr.kill() + self._pause_cr = None + + self._pause_generator = generator + + if self._pause_generator is not None: + self._pause_cr = cocotb.start_soon(self._run_pause()) + + def clear_pause_generator(self): + self.set_pause_generator(None) + + async def _run_pause(self): + clock_edge_event = RisingEdge(self.clock) + + for val in self._pause_generator: + self.pause = val + await clock_edge_event + + +class ApbMaster(ApbPause, Region, Reset): + def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs): + self.bus = bus + self.clock = clock + self.reset = reset + if bus._name: + self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}") + else: + self.log = logging.getLogger(f"cocotb.{bus._entity._name}") + + self.log.info("APB master") + self.log.info("cocotbext-axi version %s", __version__) + self.log.info("Copyright (c) 2025 Alex Forencich") + self.log.info("https://github.com/alexforencich/cocotbext-axi") + + self.command_queue = Queue() + self.command_queue.queue_occupancy_limit = 2 + self.current_command = None + + self.in_flight_operations = 0 + self._idle = Event() + self._idle.set() + + self.address_width = len(self.bus.paddr) + self.width = len(self.bus.pwdata) + self.byte_size = 8 + self.byte_lanes = self.width // self.byte_size + self.strb_mask = 2**self.byte_lanes-1 + + self.pprot_present = hasattr(self.bus, "pprot") + self.pstrb_present = hasattr(self.bus, "pstrb") + self.pslverr_present = hasattr(self.bus, "pslverr") + + super().__init__(2**self.address_width, **kwargs) + + self.log.info("APB master configuration:") + self.log.info(" Address width: %d bits", self.address_width) + self.log.info(" Byte size: %d bits", self.byte_size) + self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) + + self.log.info("APB master signals:") + for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))): + if hasattr(bus, sig): + self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) + else: + self.log.info(" %s: not present", sig) + + if self.pstrb_present: + assert self.byte_lanes == len(self.bus.pstrb) + assert self.byte_lanes * self.byte_size == self.width + + self.bus.paddr.setimmediatevalue(0) + if self.pprot_present: + self.bus.pprot.setimmediatevalue(0) + self.bus.psel.setimmediatevalue(False) + self.bus.penable.setimmediatevalue(False) + self.bus.pwrite.setimmediatevalue(False) + self.bus.pwdata.setimmediatevalue(0) + if self.pstrb_present: + self.bus.pstrb.setimmediatevalue(0) + + self._run_cr = None + + self._init_reset(reset, reset_active_level) + + def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None): + if event is None: + event = Event() + + if not isinstance(event, Event): + raise ValueError("Expected event object") + + if address < 0 or address >= 2**self.address_width: + raise ValueError("Address out of range") + + if isinstance(data, int): + raise ValueError("Expected bytes or bytearray for data") + + if address+len(data) > 2**self.address_width: + raise ValueError("Requested transfer overruns end of address space") + + if not self.pprot_present and prot != AxiProt.NONSECURE: + raise ValueError("pprot sideband signal value specified, but signal is not connected") + + data = bytes(data) + + cocotb.start_soon(self._write_wrapper(address, bytes(data), prot, event)) + + return event + + def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None): + if event is None: + event = Event() + + if not isinstance(event, Event): + raise ValueError("Expected event object") + + if address < 0 or address >= 2**self.address_width: + raise ValueError("Address out of range") + + if length < 0: + raise ValueError("Read length must be positive") + + if address+length > 2**self.address_width: + raise ValueError("Requested transfer overruns end of address space") + + if not self.pprot_present and prot != AxiProt.NONSECURE: + raise ValueError("arprot sideband signal value specified, but signal is not connected") + + cocotb.start_soon(self._read_wrapper(address, length, prot, event)) + + return event + + def idle(self): + return not self.in_flight_operations + + async def wait(self): + while not self.idle(): + await self._idle.wait() + + async def write(self, address, data, prot=AxiProt.NONSECURE): + if address < 0 or address >= 2**self.address_width: + raise ValueError("Address out of range") + + if isinstance(data, int): + raise ValueError("Expected bytes or bytearray for data") + + if address+len(data) > 2**self.address_width: + raise ValueError("Requested transfer overruns end of address space") + + if not self.pprot_present and prot != AxiProt.NONSECURE: + raise ValueError("pprot sideband signal value specified, but signal is not connected") + + event = Event() + data = bytes(data) + + self.in_flight_operations += 1 + self._idle.clear() + + await self.command_queue.put(ApbWriteCmd(address, data, prot, event)) + await event.wait() + return event.data + + async def _write_wrapper(self, address, data, prot, event): + event.set(await self.write(address, data, prot)) + + async def read(self, address, length, prot=AxiProt.NONSECURE): + if address < 0 or address >= 2**self.address_width: + raise ValueError("Address out of range") + + if length < 0: + raise ValueError("Read length must be positive") + + if address+length > 2**self.address_width: + raise ValueError("Requested transfer overruns end of address space") + + if not self.pprot_present and prot != AxiProt.NONSECURE: + raise ValueError("arprot sideband signal value specified, but signal is not connected") + + event = Event() + + self.in_flight_operations += 1 + self._idle.clear() + + await self.command_queue.put(ApbReadCmd(address, length, prot, event)) + + await event.wait() + return event.data + + async def _read_wrapper(self, address, length, prot, event): + event.set(await self.read(address, length, prot)) + + def _handle_reset(self, state): + if state: + self.log.info("Reset asserted") + + self.bus.psel.value = False + self.bus.penable.value = False + + if self._run_cr is not None: + self._run_cr.kill() + self._run_cr = None + + def flush_cmd(cmd): + self.log.warning("Flushed write operation during reset: %s", cmd) + if cmd.event: + cmd.event.set(None) + + while not self.command_queue.empty(): + cmd = self.command_queue.get_nowait() + flush_cmd(cmd) + + if self.current_command: + cmd = self.current_command + self.current_command = None + flush_cmd(cmd) + + self.in_flight_operations = 0 + self._idle.set() + else: + self.log.info("Reset de-asserted") + if self._run_cr is None: + self._run_cr = cocotb.start_soon(self._run()) + + async def _run(self): + clock_edge_event = RisingEdge(self.clock) + + while True: + cmd = await self.command_queue.get() + self.current_command = cmd + + length = 0 + pwrite = False + + if isinstance(cmd, ApbWriteCmd): + length = len(cmd.data) + pwrite = True + else: + length = cmd.length + pwrite = False + + word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes + + start_offset = cmd.address % self.byte_lanes + end_offset = ((cmd.address + length - 1) % self.byte_lanes) + 1 + + strb_start = (self.strb_mask << start_offset) & self.strb_mask + strb_end = self.strb_mask >> (self.byte_lanes - end_offset) + + cycles = (length + (cmd.address % self.byte_lanes) + self.byte_lanes-1) // self.byte_lanes + + offset = 0 + read_data = bytearray() + resp = AxiResp.OKAY + + if self.log.isEnabledFor(logging.INFO): + if pwrite: + self.log.info("Write start addr: 0x%08x prot: %s data: %s", + cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data))) + else: + self.log.info("Read start addr: 0x%08x prot: %s length: %d", + cmd.address, cmd.prot, cmd.length) + + await clock_edge_event + self.bus.psel.value = True + + for k in range(cycles): + start = 0 + stop = self.byte_lanes + strb = self.strb_mask + + if k == 0: + start = start_offset + strb &= strb_start + if k == cycles-1: + stop = end_offset + strb &= strb_end + + val = 0 + if pwrite: + for j in range(start, stop): + val |= cmd.data[offset] << j*8 + offset += 1 + + if not self.pstrb_present and strb != self.strb_mask: + self.log.warning("Partial operation requested with pstrb not connected, write will be zero-padded (0x%x != 0x%x)", strb, self.strb_mask) + else: + strb = 0 + + while self.pause: + await clock_edge_event + + await clock_edge_event + + if k == 0: + self.bus.paddr.value = cmd.address + else: + self.bus.paddr.value = word_addr + k*self.byte_lanes + self.bus.pprot.value = cmd.prot + self.bus.penable.value = True + self.bus.pwrite.value = pwrite + self.bus.pwdata.value = val + self.bus.pstrb.value = strb + + await clock_edge_event + + while not int(self.bus.pready.value): + await clock_edge_event + + self.bus.penable.value = False + + cycle_data = int(self.bus.prdata.value) + if self.pslverr_present and int(self.bus.pslverr.value): + resp = AxiResp.SLVERR + + start = 0 + stop = self.byte_lanes + + if k == 0: + start = start_offset + if k == cycles-1: + stop = end_offset + + for j in range(start, stop): + read_data.append((cycle_data >> j*8) & 0xff) + + self.bus.psel.value = False + + if pwrite: + self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d", + cmd.address, cmd.prot, resp, length) + write_resp = ApbWriteResp(cmd.address, length, resp) + cmd.event.set(write_resp) + else: + if self.log.isEnabledFor(logging.INFO): + self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s", + cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in read_data))) + read_resp = ApbReadResp(cmd.address, bytes(read_data), resp) + cmd.event.set(read_resp) + + self.current_write_command = None + + self.in_flight_operations -= 1 + + if self.in_flight_operations == 0: + self._idle.set() + + +class ApbSlave(ApbPause, Reset): + def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs): + self.bus = bus + self.clock = clock + self.reset = reset + self.target = target + if bus._name: + self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}") + else: + self.log = logging.getLogger(f"cocotb.{bus._entity._name}") + + self.log.info("APB slave model") + self.log.info("cocotbext-axi version %s", __version__) + self.log.info("Copyright (c) 2025 Alex Forencich") + self.log.info("https://github.com/alexforencich/cocotbext-axi") + + super().__init__(**kwargs) + + self.address_width = len(self.bus.paddr) + self.width = len(self.bus.pwdata) + self.byte_size = 8 + self.byte_lanes = self.width // self.byte_size + self.strb_mask = 2**self.byte_lanes-1 + + self.pprot_present = hasattr(self.bus, "pprot") + self.pstrb_present = hasattr(self.bus, "pstrb") + self.pslverr_present = hasattr(self.bus, "pslverr") + + self.log.info("APB slave model configuration:") + self.log.info(" Address width: %d bits", self.address_width) + self.log.info(" Byte size: %d bits", self.byte_size) + self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) + + self.log.info("APB slave model signals:") + for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))): + if hasattr(bus, sig): + self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) + else: + self.log.info(" %s: not present", sig) + + if self.pstrb_present: + assert self.byte_lanes == len(self.bus.pstrb) + assert self.byte_lanes * self.byte_size == self.width + + self.bus.pready.setimmediatevalue(False) + self.bus.prdata.setimmediatevalue(0) + if self.pslverr_present: + self.bus.pslverr.setimmediatevalue(0) + + self._run_cr = None + + self._init_reset(reset, reset_active_level) + + async def _write(self, address, data): + await self.target.write(address, data) + + def _handle_reset(self, state): + if state: + self.log.info("Reset asserted") + + self.bus.pready.value = False + + if self._run_cr is not None: + self._run_cr.kill() + self._run_cr = None + else: + self.log.info("Reset de-asserted") + if self._run_cr is None: + self._run_cr = cocotb.start_soon(self._run()) + + async def _run(self): + clock_edge_event = RisingEdge(self.clock) + + self.bus.pready.value = False + + while True: + await clock_edge_event + + if self.pause: + continue + + if not int(self.bus.psel.value) or not int(self.bus.penable.value): + continue + + addr = (int(self.bus.paddr.value) // self.byte_lanes) * self.byte_lanes + if self.pprot_present: + prot = AxiProt(int(self.bus.pprot.value)) + else: + prot = AxiProt.NONSECURE + + pslverr = False + + if (int(self.bus.pwrite.value)): + data = int(self.bus.pwdata.value) + + if self.pstrb_present: + strb = int(self.bus.pstrb.value) + else: + strb = self.strb_mask + + # generate operation list + offset = 0 + start_offset = None + write_ops = [] + + data = data.to_bytes(self.byte_lanes, 'little') + + if self.log.isEnabledFor(logging.INFO): + self.log.info("Write data paddr: 0x%08x pprot: %s pstrb: 0x%02x data: %s", + addr, prot, strb, ' '.join((f'{c:02x}' for c in data))) + + for i in range(self.byte_lanes): + if strb & (1 << i): + if start_offset is None: + start_offset = offset + else: + if start_offset is not None and offset != start_offset: + write_ops.append((addr+start_offset, data[start_offset:offset])) + start_offset = None + + offset += 1 + + if start_offset is not None and offset != start_offset: + write_ops.append((addr+start_offset, data[start_offset:offset])) + + print(write_ops) + + # perform writes + try: + for addr, data in write_ops: + await self._write(addr, data) + except Exception: + self.log.warning("Write operation failed") + pslverr = True + else: + try: + data = await self._read(addr, self.byte_lanes) + except Exception: + self.log.warning("Read operation failed") + data = bytes(self.byte_lanes) + pslverr = True + + if self.log.isEnabledFor(logging.INFO): + self.log.info("Read data paddr: 0x%08x pprot: %s data: %s", + addr, prot, ' '.join((f'{c:02x}' for c in data))) + + self.bus.prdata.value = int.from_bytes(data, 'little') + + await clock_edge_event + if self.pslverr_present: + self.bus.pslverr.value = pslverr + self.bus.pready.value = True + await clock_edge_event + self.bus.pready.value = False + if self.pslverr_present: + self.bus.pslverr.value = False + + +class ApbRam(ApbSlave, Memory): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=2**64, mem=None, **kwargs): + super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs) + + async def _write(self, address, data): + self.write(address % self.size, data) + + async def _read(self, address, length): + return self.read(address % self.size, length) diff --git a/tests/apb/Makefile b/tests/apb/Makefile new file mode 100644 index 0000000..25a807e --- /dev/null +++ b/tests/apb/Makefile @@ -0,0 +1,70 @@ +# Copyright (c) 2025 Alex Forencich +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +TOPLEVEL_LANG = verilog + +SIM ?= icarus +WAVES ?= 0 + +COCOTB_HDL_TIMEUNIT = 1ns +COCOTB_HDL_TIMEPRECISION = 1ns + +DUT = test_apb +TOPLEVEL = $(DUT) +MODULE = $(DUT) +VERILOG_SOURCES += $(DUT).v + +# module parameters +export PARAM_DATA_W := 32 +export PARAM_ADDR_W := 32 +export PARAM_STRB_W := $(shell expr $(PARAM_DATA_W) / 8 ) + +ifeq ($(SIM), icarus) + PLUSARGS += -fst + + COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v))) + + ifeq ($(WAVES), 1) + VERILOG_SOURCES += iverilog_dump.v + COMPILE_ARGS += -s iverilog_dump + endif +else ifeq ($(SIM), verilator) + COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH + + COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v))) + + ifeq ($(WAVES), 1) + COMPILE_ARGS += --trace-fst + endif +endif + +include $(shell cocotb-config --makefiles)/Makefile.sim + +iverilog_dump.v: + echo 'module iverilog_dump();' > $@ + echo 'initial begin' >> $@ + echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@ + echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@ + echo 'end' >> $@ + echo 'endmodule' >> $@ + +clean:: + @rm -rf iverilog_dump.v + @rm -rf dump.fst $(TOPLEVEL).fst diff --git a/tests/apb/test_apb.py b/tests/apb/test_apb.py new file mode 100644 index 0000000..1d766e2 --- /dev/null +++ b/tests/apb/test_apb.py @@ -0,0 +1,332 @@ +""" + +Copyright (c) 2025 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +""" + +import itertools +import logging +import os +import random + +import cocotb_test.simulator +import pytest + +import cocotb +from cocotb.clock import Clock +from cocotb.triggers import RisingEdge, Timer +from cocotb.regression import TestFactory + +from cocotbext.axi import ApbBus, ApbMaster, ApbRam + + +class TB: + def __init__(self, dut): + self.dut = dut + + self.log = logging.getLogger("cocotb.tb") + self.log.setLevel(logging.DEBUG) + + cocotb.start_soon(Clock(dut.clk, 2, units="ns").start()) + + self.apb_master = ApbMaster(ApbBus.from_prefix(dut, "apb"), dut.clk, dut.rst) + self.apb_ram = ApbRam(ApbBus.from_prefix(dut, "apb"), dut.clk, dut.rst, size=2**16) + + def set_idle_generator(self, generator=None): + if generator: + self.apb_master.set_pause_generator(generator()) + + def set_backpressure_generator(self, generator=None): + if generator: + self.apb_ram.set_pause_generator(generator()) + + async def cycle_reset(self): + self.dut.rst.setimmediatevalue(0) + await RisingEdge(self.dut.clk) + await RisingEdge(self.dut.clk) + self.dut.rst.value = 1 + await RisingEdge(self.dut.clk) + await RisingEdge(self.dut.clk) + self.dut.rst.value = 0 + await RisingEdge(self.dut.clk) + await RisingEdge(self.dut.clk) + + +async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): + + tb = TB(dut) + + byte_lanes = tb.apb_master.byte_lanes + + await tb.cycle_reset() + + tb.set_idle_generator(idle_inserter) + tb.set_backpressure_generator(backpressure_inserter) + + for length in range(1, byte_lanes*2): + for offset in range(byte_lanes): + tb.log.info("length %d, offset %d", length, offset) + addr = offset+0x1000 + test_data = bytearray([x % 256 for x in range(length)]) + + tb.apb_ram.write(addr-128, b'\xaa'*(length+256)) + + await tb.apb_master.write(addr, test_data) + + tb.log.debug("%s", tb.apb_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48)) + + assert tb.apb_ram.read(addr, length) == test_data + assert tb.apb_ram.read(addr-1, 1) == b'\xaa' + assert tb.apb_ram.read(addr+length, 1) == b'\xaa' + + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + +async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): + + tb = TB(dut) + + byte_lanes = tb.apb_master.byte_lanes + + await tb.cycle_reset() + + tb.set_idle_generator(idle_inserter) + tb.set_backpressure_generator(backpressure_inserter) + + for length in range(1, byte_lanes*2): + for offset in range(byte_lanes): + tb.log.info("length %d, offset %d", length, offset) + addr = offset+0x1000 + test_data = bytearray([x % 256 for x in range(length)]) + + tb.apb_ram.write(addr, test_data) + + data = await tb.apb_master.read(addr, length) + + assert data.data == test_data + + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + +async def run_test_write_words(dut): + + tb = TB(dut) + + byte_lanes = tb.apb_master.byte_lanes + + await tb.cycle_reset() + + for length in list(range(1, 4)): + for offset in list(range(byte_lanes)): + tb.log.info("length %d, offset %d", length, offset) + addr = offset+0x1000 + + test_data = bytearray([x % 256 for x in range(length)]) + event = tb.apb_master.init_write(addr, test_data) + await event.wait() + assert tb.apb_ram.read(addr, length) == test_data + + test_data = bytearray([x % 256 for x in range(length)]) + await tb.apb_master.write(addr, test_data) + assert tb.apb_ram.read(addr, length) == test_data + + test_data = [x * 0x1001 for x in range(length)] + await tb.apb_master.write_words(addr, test_data) + assert tb.apb_ram.read_words(addr, length) == test_data + + test_data = [x * 0x10200201 for x in range(length)] + await tb.apb_master.write_dwords(addr, test_data) + assert tb.apb_ram.read_dwords(addr, length) == test_data + + test_data = [x * 0x1020304004030201 for x in range(length)] + await tb.apb_master.write_qwords(addr, test_data) + assert tb.apb_ram.read_qwords(addr, length) == test_data + + test_data = 0x01*length + await tb.apb_master.write_byte(addr, test_data) + assert tb.apb_ram.read_byte(addr) == test_data + + test_data = 0x1001*length + await tb.apb_master.write_word(addr, test_data) + assert tb.apb_ram.read_word(addr) == test_data + + test_data = 0x10200201*length + await tb.apb_master.write_dword(addr, test_data) + assert tb.apb_ram.read_dword(addr) == test_data + + test_data = 0x1020304004030201*length + await tb.apb_master.write_qword(addr, test_data) + assert tb.apb_ram.read_qword(addr) == test_data + + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + +async def run_test_read_words(dut): + + tb = TB(dut) + + byte_lanes = tb.apb_master.byte_lanes + + await tb.cycle_reset() + + for length in list(range(1, 4)): + for offset in list(range(byte_lanes)): + tb.log.info("length %d, offset %d", length, offset) + addr = offset+0x1000 + + test_data = bytearray([x % 256 for x in range(length)]) + tb.apb_ram.write(addr, test_data) + event = tb.apb_master.init_read(addr, length) + await event.wait() + assert event.data.data == test_data + + test_data = bytearray([x % 256 for x in range(length)]) + tb.apb_ram.write(addr, test_data) + assert (await tb.apb_master.read(addr, length)).data == test_data + + test_data = [x * 0x1001 for x in range(length)] + tb.apb_ram.write_words(addr, test_data) + assert await tb.apb_master.read_words(addr, length) == test_data + + test_data = [x * 0x10200201 for x in range(length)] + tb.apb_ram.write_dwords(addr, test_data) + assert await tb.apb_master.read_dwords(addr, length) == test_data + + test_data = [x * 0x1020304004030201 for x in range(length)] + tb.apb_ram.write_qwords(addr, test_data) + assert await tb.apb_master.read_qwords(addr, length) == test_data + + test_data = 0x01*length + tb.apb_ram.write_byte(addr, test_data) + assert await tb.apb_master.read_byte(addr) == test_data + + test_data = 0x1001*length + tb.apb_ram.write_word(addr, test_data) + assert await tb.apb_master.read_word(addr) == test_data + + test_data = 0x10200201*length + tb.apb_ram.write_dword(addr, test_data) + assert await tb.apb_master.read_dword(addr) == test_data + + test_data = 0x1020304004030201*length + tb.apb_ram.write_qword(addr, test_data) + assert await tb.apb_master.read_qword(addr) == test_data + + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + +async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): + + tb = TB(dut) + + await tb.cycle_reset() + + tb.set_idle_generator(idle_inserter) + tb.set_backpressure_generator(backpressure_inserter) + + async def worker(master, offset, aperture, count=16): + for k in range(count): + length = random.randint(1, min(32, aperture)) + addr = offset+random.randint(0, aperture-length) + test_data = bytearray([x % 256 for x in range(length)]) + + await Timer(random.randint(1, 100), 'ns') + + await master.write(addr, test_data) + + await Timer(random.randint(1, 100), 'ns') + + data = await master.read(addr, length) + assert data.data == test_data + + workers = [] + + for k in range(16): + workers.append(cocotb.start_soon(worker(tb.apb_master, k*0x1000, 0x1000, count=16))) + + while workers: + await workers.pop(0).join() + + await RisingEdge(dut.clk) + await RisingEdge(dut.clk) + + +def cycle_pause(): + return itertools.cycle([1, 1, 1, 0]) + + +if cocotb.SIM_NAME: + + for test in [run_test_write, run_test_read]: + + factory = TestFactory(test) + factory.add_option("idle_inserter", [None, cycle_pause]) + factory.add_option("backpressure_inserter", [None, cycle_pause]) + factory.generate_tests() + + for test in [run_test_write_words, run_test_read_words]: + + factory = TestFactory(test) + factory.generate_tests() + + factory = TestFactory(run_stress_test) + factory.generate_tests() + + +# cocotb-test + +tests_dir = os.path.dirname(__file__) + + +@pytest.mark.parametrize("data_w", [8, 16, 32]) +def test_apb(request, data_w): + dut = "test_apb" + module = os.path.splitext(os.path.basename(__file__))[0] + toplevel = dut + + verilog_sources = [ + os.path.join(os.path.dirname(__file__), f"{dut}.v"), + ] + + parameters = {} + + parameters['DATA_W'] = data_w + parameters['ADDR_W'] = 32 + parameters['STRB_W'] = parameters['DATA_W'] // 8 + + extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} + + sim_build = os.path.join(tests_dir, "sim_build", + request.node.name.replace('[', '-').replace(']', '')) + + cocotb_test.simulator.run( + python_search=[tests_dir], + verilog_sources=verilog_sources, + toplevel=toplevel, + module=module, + parameters=parameters, + sim_build=sim_build, + extra_env=extra_env, + ) diff --git a/tests/apb/test_apb.v b/tests/apb/test_apb.v new file mode 100644 index 0000000..66cc16e --- /dev/null +++ b/tests/apb/test_apb.v @@ -0,0 +1,54 @@ +/* + +Copyright (c) 2025 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +*/ + +// Language: Verilog 2001 + +`timescale 1ns / 1ns + +/* + * APB test module + */ +module test_apb # +( + parameter DATA_W = 32, + parameter ADDR_W = 16, + parameter STRB_W = (DATA_W/8) +) +( + input wire clk, + input wire rst, + + inout wire [ADDR_W-1:0] apb_paddr, + inout wire [2:0] apb_pprot, + inout wire apb_psel, + inout wire apb_penable, + inout wire apb_pwrite, + inout wire [DATA_W-1:0] apb_pwdata, + inout wire [STRB_W-1:0] apb_pstrb, + inout wire apb_pready, + inout wire [DATA_W-1:0] apb_prdata, + inout wire apb_pslverr +); + +endmodule