Compare commits
24 Commits
v0.1.22
...
first_depl
| Author | SHA1 | Date | |
|---|---|---|---|
| 0496147ddf | |||
|
|
3e1e7fc1ec | ||
|
|
698c29b05f | ||
|
|
33f510688a | ||
|
|
da00960112 | ||
|
|
88b6624a93 | ||
|
|
dcb9a6bd02 | ||
|
|
7136dddd0a | ||
|
|
6c15d7d57d | ||
|
|
4595bd8a08 | ||
|
|
204ad7a517 | ||
|
|
f3dbc07100 | ||
|
|
a0a5b7ee55 | ||
|
|
a28ec41f79 | ||
|
|
f2bf8c0ed8 | ||
|
|
28f4585c08 | ||
|
|
775301c6fe | ||
|
|
39b4ca4a93 | ||
|
|
f70731a8d8 | ||
|
|
28bc97f226 | ||
|
|
e816d6a088 | ||
|
|
af377b2c11 | ||
|
|
cfb52c6130 | ||
|
|
7e32e584ff |
63
.github/workflows/build.yaml
vendored
Normal file
63
.github/workflows/build.yaml
vendored
Normal file
@@ -0,0 +1,63 @@
|
||||
name: build
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- 'dev/**'
|
||||
pull_request:
|
||||
branches: [ master ]
|
||||
release:
|
||||
types:
|
||||
- published
|
||||
|
||||
# Allows you to run this workflow manually from the Actions tab
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
name: Build distributions
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- uses: actions/setup-python@v4
|
||||
name: Install Python
|
||||
with:
|
||||
python-version: "3.10"
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install build
|
||||
|
||||
- name: Build sdist
|
||||
run: python -m build
|
||||
|
||||
- uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: dist
|
||||
path: |
|
||||
dist/*.tar.gz
|
||||
dist/*.whl
|
||||
|
||||
#-------------------------------------------------------------------------------
|
||||
deploy:
|
||||
needs:
|
||||
- build
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
environment: release
|
||||
permissions:
|
||||
id-token: write
|
||||
|
||||
# Only publish when a Gitea Release is created.
|
||||
if: gitea.event_name == 'release'
|
||||
steps:
|
||||
- uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: dist
|
||||
path: dist
|
||||
|
||||
- run: python3 -m pip install twine --user --break-system-packages
|
||||
- run: python3 -m pip install -U packaging --user --break-system-packages
|
||||
- run: TWINE_PASSWORD=${{ secrets.PYPI_PAT }} TWINE_USERNAME=bslathi19 python -m twine upload --repository-url ${{ vars.CI_API_URL }} dist/*
|
||||
4
.github/workflows/regression-tests.yml
vendored
4
.github/workflows/regression-tests.yml
vendored
@@ -5,11 +5,11 @@ on: [push, pull_request]
|
||||
jobs:
|
||||
build:
|
||||
name: Python ${{matrix.python-version}}
|
||||
runs-on: ubuntu-22.04
|
||||
runs-on: ubuntu-24.04
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.7", "3.8", "3.9", "3.10"]
|
||||
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
2
LICENSE
2
LICENSE
@@ -1,4 +1,4 @@
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
61
README.md
61
README.md
@@ -1,6 +1,6 @@
|
||||
# AXI interface modules for Cocotb
|
||||
|
||||
[](https://github.com/alexforencich/cocotbext-axi/actions/)
|
||||
[](https://github.com/alexforencich/cocotbext-axi/actions/workflows/regression-tests.yml)
|
||||
[](https://codecov.io/gh/alexforencich/cocotbext-axi)
|
||||
[](https://pypi.org/project/cocotbext-axi)
|
||||
[](https://pepy.tech/project/cocotbext-axi)
|
||||
@@ -9,7 +9,7 @@ GitHub repository: https://github.com/alexforencich/cocotbext-axi
|
||||
|
||||
## Introduction
|
||||
|
||||
AXI, AXI lite, and AXI stream simulation models for [cocotb](https://github.com/cocotb/cocotb).
|
||||
AXI, AXI lite, AXI stream, and APB simulation models for [cocotb](https://github.com/cocotb/cocotb).
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -28,13 +28,13 @@ Installation for active development:
|
||||
|
||||
## Documentation and usage examples
|
||||
|
||||
See the `tests` directory, [verilog-axi](https://github.com/alexforencich/verilog-axi), and [verilog-axis](https://github.com/alexforencich/verilog-axis) for complete testbenches using these modules.
|
||||
See the `tests` directory, [taxi](https://github.com/fpganinja/taxi), [verilog-axi](https://github.com/alexforencich/verilog-axi), and [verilog-axis](https://github.com/alexforencich/verilog-axis) for complete testbenches using these modules.
|
||||
|
||||
### AXI and AXI lite master
|
||||
### AXI, AXI lite, and APB master
|
||||
|
||||
The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region.
|
||||
The `AxiMaster`, `AxiLiteMaster`, and `ApbMaster` classes implement AXI, AXI-lite, and APB masters and are capable of generating read and write operations against the corresponding slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region.
|
||||
|
||||
The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbSlave` is available.
|
||||
|
||||
To use these modules, import the one you need and connect it to the DUT:
|
||||
|
||||
@@ -64,9 +64,9 @@ Alternatively, operations can be initiated with non-blocking `init_read()` and `
|
||||
|
||||
With this method, it is possible to start multiple concurrent operations from the same coroutine. It is also possible to use the events with `Combine`, `First`, and `with_timeout`.
|
||||
|
||||
#### `AxiMaster` and `AxiLiteMaster` constructor parameters
|
||||
#### `AxiMaster`, `AxiLiteMaster`, and `ApbMaster` constructor parameters
|
||||
|
||||
* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals
|
||||
* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals
|
||||
* _clock_: clock signal
|
||||
* _reset_: reset signal (optional)
|
||||
* _reset_active_level_: reset active level (optional, default `True`)
|
||||
@@ -114,20 +114,20 @@ With this method, it is possible to start multiple concurrent operations from th
|
||||
* _wuser_: AXI wuser signal, default `0` (write-related methods only)
|
||||
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
|
||||
|
||||
#### Additional optional arguments for `AxiLiteMaster`
|
||||
#### Additional optional arguments for `AxiLiteMaster` and `ApbMaster`
|
||||
|
||||
* _prot_: AXI protection flags, default `AxiProt.NONSECURE`
|
||||
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
|
||||
|
||||
#### `AxiBus` and `AxiLiteBus` objects
|
||||
#### `AxiBus`, `AxiLiteBus`, and `ApbBus` objects
|
||||
|
||||
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
|
||||
The `AxiBus`, `AxiLiteBus`, `ApbBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate. For APB interfaces, use `ApbBus`.
|
||||
|
||||
### AXI and AXI lite slave
|
||||
### AXI, AXI lite, and APB slave
|
||||
|
||||
The `AxiSlave` and `AxiLiteSlave` classes implement AXI slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality.
|
||||
The `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` classes implement AXI, AXI-lite, and APB slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality.
|
||||
|
||||
The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbSlave` is available.
|
||||
|
||||
To use these modules, import the one you need and connect it to the DUT:
|
||||
|
||||
@@ -137,13 +137,13 @@ To use these modules, import the one you need and connect it to the DUT:
|
||||
region = MemoryRegion(2**axi_slave.read_if.address_width)
|
||||
axi_slave.target = region
|
||||
|
||||
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
|
||||
The first argument to the constructor accepts an `AxiBus`, `AxiLiteBus`, or `ApbBus` object. These objects are containers for the interface signals and include class methods to automate connections.
|
||||
|
||||
It is also possible to extend these modules; operation can be customized by overriding the internal `_read()` and `_write()` methods. See `AxiRam` and `AxiLiteRam` for an example.
|
||||
|
||||
#### `AxiSlave` and `AxiLiteSlave` constructor parameters
|
||||
#### `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` constructor parameters
|
||||
|
||||
* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals
|
||||
* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals
|
||||
* _clock_: clock signal
|
||||
* _reset_: reset signal (optional)
|
||||
* _reset_active_level_: reset active level (optional, default `True`)
|
||||
@@ -153,11 +153,11 @@ It is also possible to extend these modules; operation can be customized by over
|
||||
|
||||
* _target_: target region
|
||||
|
||||
### AXI and AXI lite RAM
|
||||
### AXI, AXI lite, and APB RAM
|
||||
|
||||
The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave` and `AxiLiteSlave` modules. Internally, `SparseMemory` is used to support emulating very large memories.
|
||||
The `AxiRam`, `AxiLiteRam`, and `ApbRam` classes implement AXI, AXI-lite, and APB RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave`, `AxiLiteSlave`, and `ApbSlave` modules. Internally, `SparseMemory` is used to support emulating very large memories.
|
||||
|
||||
The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same. APB is not channelized, so only `ApbRam` is available.
|
||||
|
||||
To use these modules, import the one you need and connect it to the DUT:
|
||||
|
||||
@@ -165,9 +165,9 @@ To use these modules, import the one you need and connect it to the DUT:
|
||||
|
||||
axi_ram = AxiRam(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst, size=2**32)
|
||||
|
||||
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
|
||||
The first argument to the constructor accepts an `AxiBus`, `AxiLiteBus`, or `ApbBus` object. These objects are containers for the interface signals and include class methods to automate connections.
|
||||
|
||||
Once the module is instantiated, the memory contents can be accessed in a couple of different ways. First, the `mmap` object can be accessed directly via the `mem` attribute. Second, `read()`, `write()`, and various word-access wrappers are available. Hex dump helper methods are also provided for debugging. For example:
|
||||
Once the module is instantiated, the memory contents can be accessed in a couple of different ways. First, the `mmap`/`SparseMemory` object can be accessed directly via the `mem` attribute. Second, `read()`, `write()`, and various word-access wrappers are available. Hex dump helper methods are also provided for debugging. For example:
|
||||
|
||||
axi_ram.write(0x0000, b'test')
|
||||
data = axi_ram.read(0x0000, 4)
|
||||
@@ -180,9 +180,9 @@ Multi-port memories can be constructed by passing the `mem` object of the first
|
||||
axi_ram_p3 = AxiRam(AxiBus.from_prefix(dut, "m02_axi"), dut.clk, dut.rst, mem=axi_ram_p1.mem)
|
||||
axi_ram_p4 = AxiRam(AxiBus.from_prefix(dut, "m03_axi"), dut.clk, dut.rst, mem=axi_ram_p1.mem)
|
||||
|
||||
#### `AxiRam` and `AxiLiteRam` constructor parameters
|
||||
#### `AxiRam`, `AxiLiteRam`, and `ApbRam` constructor parameters
|
||||
|
||||
* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals
|
||||
* _bus_: `AxiBus`, `AxiLiteBus`, or `ApbBus` object containing interface signals
|
||||
* _clock_: clock signal
|
||||
* _reset_: reset signal (optional)
|
||||
* _reset_active_level_: reset active level (optional, default `True`)
|
||||
@@ -471,3 +471,16 @@ This is a simple example that shows how the address space abstraction components
|
||||
* `tid`: ID signal, can be used for routing
|
||||
* `tdest`: destination signal, can be used for routing
|
||||
* `tuser`: additional sideband data
|
||||
|
||||
### APB signals
|
||||
|
||||
* `paddr`: address
|
||||
* `pprot`: protection bits
|
||||
* `psel`: select signal, for selecting a target device
|
||||
* `penable`: enable signal, for performing an operation
|
||||
* `pwrite`: read/write control signal
|
||||
* `pwdata`: write data
|
||||
* `pstrb`: write strobe
|
||||
* `pready`: ready signal to stall bus
|
||||
* `prdata`: read data
|
||||
* `pslverr`: read/write response, indicating SLVERR
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -43,3 +43,5 @@ from .axi_channels import AxiWriteBus, AxiReadBus, AxiBus
|
||||
from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster
|
||||
from .axi_slave import AxiSlaveWrite, AxiSlaveRead, AxiSlave
|
||||
from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam
|
||||
|
||||
from .apb import ApbBus, ApbMaster, ApbSlave, ApbRam
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
622
cocotbext/axi/apb.py
Normal file
622
cocotbext/axi/apb.py
Normal file
@@ -0,0 +1,622 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import NamedTuple
|
||||
|
||||
import cocotb
|
||||
from cocotb.queue import Queue
|
||||
from cocotb.triggers import RisingEdge, Event
|
||||
from cocotb_bus.bus import Bus
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiResp, AxiProt
|
||||
from .address_space import Region
|
||||
from .reset import Reset
|
||||
from .memory import Memory
|
||||
|
||||
|
||||
# APB master write helper objects
|
||||
class ApbWriteCmd(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class ApbWriteResp(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
resp: AxiResp
|
||||
|
||||
|
||||
# APB master read helper objects
|
||||
class ApbReadCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class ApbReadResp(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
resp: AxiResp
|
||||
|
||||
def __bytes__(self):
|
||||
return self.data
|
||||
|
||||
|
||||
class ApbBus(Bus):
|
||||
|
||||
_signals = ["paddr", "psel", "penable", "pwrite", "pwdata", "pstrb", "pready", "prdata"]
|
||||
_optional_signals = ["pprot", "pslverr"]
|
||||
|
||||
def __init__(self, entity=None, prefix=None, **kwargs):
|
||||
super().__init__(entity, prefix, self._signals, optional_signals=self._optional_signals, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def from_entity(cls, entity, **kwargs):
|
||||
return cls(entity, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def from_prefix(cls, entity, prefix, **kwargs):
|
||||
return cls(entity, prefix, **kwargs)
|
||||
|
||||
|
||||
class ApbPause:
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self._pause = False
|
||||
self._pause_generator = None
|
||||
self._pause_cr = None
|
||||
|
||||
def _pause_update(self, val):
|
||||
pass
|
||||
|
||||
@property
|
||||
def pause(self):
|
||||
return self._pause
|
||||
|
||||
@pause.setter
|
||||
def pause(self, val):
|
||||
if self._pause != val:
|
||||
self._pause_update(val)
|
||||
self._pause = val
|
||||
|
||||
def set_pause_generator(self, generator=None):
|
||||
if self._pause_cr is not None:
|
||||
self._pause_cr.kill()
|
||||
self._pause_cr = None
|
||||
|
||||
self._pause_generator = generator
|
||||
|
||||
if self._pause_generator is not None:
|
||||
self._pause_cr = cocotb.start_soon(self._run_pause())
|
||||
|
||||
def clear_pause_generator(self):
|
||||
self.set_pause_generator(None)
|
||||
|
||||
async def _run_pause(self):
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
for val in self._pause_generator:
|
||||
self.pause = val
|
||||
await clock_edge_event
|
||||
|
||||
|
||||
class ApbMaster(ApbPause, Region, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
if bus._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
|
||||
|
||||
self.log.info("APB master")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
self.command_queue = Queue()
|
||||
self.command_queue.queue_occupancy_limit = 2
|
||||
self.current_command = None
|
||||
|
||||
self.in_flight_operations = 0
|
||||
self._idle = Event()
|
||||
self._idle.set()
|
||||
|
||||
self.address_width = len(self.bus.paddr)
|
||||
self.width = len(self.bus.pwdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.pprot_present = hasattr(self.bus, "pprot")
|
||||
self.pstrb_present = hasattr(self.bus, "pstrb")
|
||||
self.pslverr_present = hasattr(self.bus, "pslverr")
|
||||
|
||||
super().__init__(2**self.address_width, **kwargs)
|
||||
|
||||
self.log.info("APB master configuration:")
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("APB master signals:")
|
||||
for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
if self.pstrb_present:
|
||||
assert self.byte_lanes == len(self.bus.pstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self.bus.paddr.setimmediatevalue(0)
|
||||
if self.pprot_present:
|
||||
self.bus.pprot.setimmediatevalue(0)
|
||||
self.bus.psel.setimmediatevalue(False)
|
||||
self.bus.penable.setimmediatevalue(False)
|
||||
self.bus.pwrite.setimmediatevalue(False)
|
||||
self.bus.pwdata.setimmediatevalue(0)
|
||||
if self.pstrb_present:
|
||||
self.bus.pstrb.setimmediatevalue(0)
|
||||
|
||||
self._run_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
|
||||
if event is None:
|
||||
event = Event()
|
||||
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.pprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("pprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
data = bytes(data)
|
||||
|
||||
cocotb.start_soon(self._write_wrapper(address, bytes(data), prot, event))
|
||||
|
||||
return event
|
||||
|
||||
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
|
||||
if event is None:
|
||||
event = Event()
|
||||
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.pprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("arprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
cocotb.start_soon(self._read_wrapper(address, length, prot, event))
|
||||
|
||||
return event
|
||||
|
||||
def idle(self):
|
||||
return not self.in_flight_operations
|
||||
|
||||
async def wait(self):
|
||||
while not self.idle():
|
||||
await self._idle.wait()
|
||||
|
||||
async def write(self, address, data, prot=AxiProt.NONSECURE):
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.pprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("pprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
event = Event()
|
||||
data = bytes(data)
|
||||
|
||||
self.in_flight_operations += 1
|
||||
self._idle.clear()
|
||||
|
||||
await self.command_queue.put(ApbWriteCmd(address, data, prot, event))
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def _write_wrapper(self, address, data, prot, event):
|
||||
event.set(await self.write(address, data, prot))
|
||||
|
||||
async def read(self, address, length, prot=AxiProt.NONSECURE):
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.pprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("arprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
event = Event()
|
||||
|
||||
self.in_flight_operations += 1
|
||||
self._idle.clear()
|
||||
|
||||
await self.command_queue.put(ApbReadCmd(address, length, prot, event))
|
||||
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def _read_wrapper(self, address, length, prot, event):
|
||||
event.set(await self.read(address, length, prot))
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
|
||||
self.bus.psel.value = False
|
||||
self.bus.penable.value = False
|
||||
|
||||
if self._run_cr is not None:
|
||||
self._run_cr.kill()
|
||||
self._run_cr = None
|
||||
|
||||
def flush_cmd(cmd):
|
||||
self.log.warning("Flushed write operation during reset: %s", cmd)
|
||||
if cmd.event:
|
||||
cmd.event.set(None)
|
||||
|
||||
while not self.command_queue.empty():
|
||||
cmd = self.command_queue.get_nowait()
|
||||
flush_cmd(cmd)
|
||||
|
||||
if self.current_command:
|
||||
cmd = self.current_command
|
||||
self.current_command = None
|
||||
flush_cmd(cmd)
|
||||
|
||||
self.in_flight_operations = 0
|
||||
self._idle.set()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._run_cr is None:
|
||||
self._run_cr = cocotb.start_soon(self._run())
|
||||
|
||||
async def _run(self):
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
while True:
|
||||
cmd = await self.command_queue.get()
|
||||
self.current_command = cmd
|
||||
|
||||
length = 0
|
||||
pwrite = False
|
||||
|
||||
if isinstance(cmd, ApbWriteCmd):
|
||||
length = len(cmd.data)
|
||||
pwrite = True
|
||||
else:
|
||||
length = cmd.length
|
||||
pwrite = False
|
||||
|
||||
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
|
||||
|
||||
start_offset = cmd.address % self.byte_lanes
|
||||
end_offset = ((cmd.address + length - 1) % self.byte_lanes) + 1
|
||||
|
||||
strb_start = (self.strb_mask << start_offset) & self.strb_mask
|
||||
strb_end = self.strb_mask >> (self.byte_lanes - end_offset)
|
||||
|
||||
cycles = (length + (cmd.address % self.byte_lanes) + self.byte_lanes-1) // self.byte_lanes
|
||||
|
||||
offset = 0
|
||||
read_data = bytearray()
|
||||
resp = AxiResp.OKAY
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
if pwrite:
|
||||
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
|
||||
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
|
||||
else:
|
||||
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
|
||||
cmd.address, cmd.prot, cmd.length)
|
||||
|
||||
await clock_edge_event
|
||||
self.bus.psel.value = True
|
||||
|
||||
for k in range(cycles):
|
||||
start = 0
|
||||
stop = self.byte_lanes
|
||||
strb = self.strb_mask
|
||||
|
||||
if k == 0:
|
||||
start = start_offset
|
||||
strb &= strb_start
|
||||
if k == cycles-1:
|
||||
stop = end_offset
|
||||
strb &= strb_end
|
||||
|
||||
val = 0
|
||||
if pwrite:
|
||||
for j in range(start, stop):
|
||||
val |= cmd.data[offset] << j*8
|
||||
offset += 1
|
||||
|
||||
if not self.pstrb_present and strb != self.strb_mask:
|
||||
self.log.warning("Partial operation requested with pstrb not connected, write will be zero-padded (0x%x != 0x%x)", strb, self.strb_mask)
|
||||
else:
|
||||
strb = 0
|
||||
|
||||
while self.pause:
|
||||
await clock_edge_event
|
||||
|
||||
await clock_edge_event
|
||||
|
||||
if k == 0:
|
||||
self.bus.paddr.value = cmd.address
|
||||
else:
|
||||
self.bus.paddr.value = word_addr + k*self.byte_lanes
|
||||
self.bus.pprot.value = cmd.prot
|
||||
self.bus.penable.value = True
|
||||
self.bus.pwrite.value = pwrite
|
||||
self.bus.pwdata.value = val
|
||||
self.bus.pstrb.value = strb
|
||||
|
||||
await clock_edge_event
|
||||
|
||||
while not int(self.bus.pready.value):
|
||||
await clock_edge_event
|
||||
|
||||
self.bus.penable.value = False
|
||||
|
||||
cycle_data = int(self.bus.prdata.value)
|
||||
if self.pslverr_present and int(self.bus.pslverr.value):
|
||||
resp = AxiResp.SLVERR
|
||||
|
||||
start = 0
|
||||
stop = self.byte_lanes
|
||||
|
||||
if k == 0:
|
||||
start = start_offset
|
||||
if k == cycles-1:
|
||||
stop = end_offset
|
||||
|
||||
for j in range(start, stop):
|
||||
read_data.append((cycle_data >> j*8) & 0xff)
|
||||
|
||||
self.bus.psel.value = False
|
||||
|
||||
if pwrite:
|
||||
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
|
||||
cmd.address, cmd.prot, resp, length)
|
||||
write_resp = ApbWriteResp(cmd.address, length, resp)
|
||||
cmd.event.set(write_resp)
|
||||
else:
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in read_data)))
|
||||
read_resp = ApbReadResp(cmd.address, bytes(read_data), resp)
|
||||
cmd.event.set(read_resp)
|
||||
|
||||
self.current_write_command = None
|
||||
|
||||
self.in_flight_operations -= 1
|
||||
|
||||
if self.in_flight_operations == 0:
|
||||
self._idle.set()
|
||||
|
||||
|
||||
class ApbSlave(ApbPause, Reset):
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
if bus._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
|
||||
|
||||
self.log.info("APB slave model")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.address_width = len(self.bus.paddr)
|
||||
self.width = len(self.bus.pwdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.pprot_present = hasattr(self.bus, "pprot")
|
||||
self.pstrb_present = hasattr(self.bus, "pstrb")
|
||||
self.pslverr_present = hasattr(self.bus, "pslverr")
|
||||
|
||||
self.log.info("APB slave model configuration:")
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("APB slave model signals:")
|
||||
for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
if self.pstrb_present:
|
||||
assert self.byte_lanes == len(self.bus.pstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self.bus.pready.setimmediatevalue(False)
|
||||
self.bus.prdata.setimmediatevalue(0)
|
||||
if self.pslverr_present:
|
||||
self.bus.pslverr.setimmediatevalue(0)
|
||||
|
||||
self._run_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
async def _write(self, address, data):
|
||||
await self.target.write(address, data)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
|
||||
self.bus.pready.value = False
|
||||
|
||||
if self._run_cr is not None:
|
||||
self._run_cr.kill()
|
||||
self._run_cr = None
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._run_cr is None:
|
||||
self._run_cr = cocotb.start_soon(self._run())
|
||||
|
||||
async def _run(self):
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
self.bus.pready.value = False
|
||||
|
||||
while True:
|
||||
await clock_edge_event
|
||||
|
||||
if self.pause:
|
||||
continue
|
||||
|
||||
if not int(self.bus.psel.value) or not int(self.bus.penable.value):
|
||||
continue
|
||||
|
||||
addr = (int(self.bus.paddr.value) // self.byte_lanes) * self.byte_lanes
|
||||
if self.pprot_present:
|
||||
prot = AxiProt(int(self.bus.pprot.value))
|
||||
else:
|
||||
prot = AxiProt.NONSECURE
|
||||
|
||||
pslverr = False
|
||||
|
||||
if (int(self.bus.pwrite.value)):
|
||||
data = int(self.bus.pwdata.value)
|
||||
|
||||
if self.pstrb_present:
|
||||
strb = int(self.bus.pstrb.value)
|
||||
else:
|
||||
strb = self.strb_mask
|
||||
|
||||
# generate operation list
|
||||
offset = 0
|
||||
start_offset = None
|
||||
write_ops = []
|
||||
|
||||
data = data.to_bytes(self.byte_lanes, 'little')
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Write data paddr: 0x%08x pprot: %s pstrb: 0x%02x data: %s",
|
||||
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_lanes):
|
||||
if strb & (1 << i):
|
||||
if start_offset is None:
|
||||
start_offset = offset
|
||||
else:
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((addr+start_offset, data[start_offset:offset]))
|
||||
start_offset = None
|
||||
|
||||
offset += 1
|
||||
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((addr+start_offset, data[start_offset:offset]))
|
||||
|
||||
# perform writes
|
||||
try:
|
||||
for addr, data in write_ops:
|
||||
await self._write(addr, data)
|
||||
except Exception:
|
||||
self.log.warning("Write operation failed")
|
||||
pslverr = True
|
||||
else:
|
||||
try:
|
||||
data = await self._read(addr, self.byte_lanes)
|
||||
except Exception:
|
||||
self.log.warning("Read operation failed")
|
||||
data = bytes(self.byte_lanes)
|
||||
pslverr = True
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read data paddr: 0x%08x pprot: %s data: %s",
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
self.bus.prdata.value = int.from_bytes(data, 'little')
|
||||
|
||||
await clock_edge_event
|
||||
if self.pslverr_present:
|
||||
self.bus.pslverr.value = pslverr
|
||||
self.bus.pready.value = True
|
||||
await clock_edge_event
|
||||
self.bus.pready.value = False
|
||||
if self.pslverr_present:
|
||||
self.bus.pslverr.value = False
|
||||
|
||||
|
||||
class ApbRam(ApbSlave, Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=2**64, mem=None, **kwargs):
|
||||
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
|
||||
|
||||
async def _write(self, address, data):
|
||||
self.write(address % self.size, data)
|
||||
|
||||
async def _read(self, address, length):
|
||||
return self.read(address % self.size, length)
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -198,11 +198,14 @@ class AxiMasterWrite(Region, Reset):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
if bus.aw._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}")
|
||||
|
||||
self.log.info("AXI master (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2020-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
self.aw_channel = AxiAWSource(bus.aw, clock, reset, reset_active_level)
|
||||
@@ -291,7 +294,7 @@ class AxiMasterWrite(Region, Reset):
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if burst != AxiBurstType.FIXED and address+len(data) >= 2**self.address_width:
|
||||
if burst != AxiBurstType.FIXED and address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if awid is None or awid < 0:
|
||||
@@ -360,7 +363,7 @@ class AxiMasterWrite(Region, Reset):
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if burst != AxiBurstType.FIXED and address+len(data) >= 2**self.address_width:
|
||||
if burst != AxiBurstType.FIXED and address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if awid is None or awid < 0:
|
||||
@@ -637,11 +640,14 @@ class AxiMasterRead(Region, Reset):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
if bus.ar._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}")
|
||||
|
||||
self.log.info("AXI master (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2020-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level)
|
||||
@@ -723,7 +729,7 @@ class AxiMasterRead(Region, Reset):
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if burst != AxiBurstType.FIXED and address+length >= 2**self.address_width:
|
||||
if burst != AxiBurstType.FIXED and address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if arid is None or arid < 0:
|
||||
@@ -780,7 +786,7 @@ class AxiMasterRead(Region, Reset):
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if burst != AxiBurstType.FIXED and address+length >= 2**self.address_width:
|
||||
if burst != AxiBurstType.FIXED and address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if arid is None or arid < 0:
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -38,11 +38,14 @@ class AxiSlaveWrite(Reset):
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
if bus.aw._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}")
|
||||
|
||||
self.log.info("AXI slave model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2021-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
@@ -206,11 +209,14 @@ class AxiSlaveRead(Reset):
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
if bus.ar._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}")
|
||||
|
||||
self.log.info("AXI slave model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2021-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -88,11 +88,14 @@ class AxiLiteMasterWrite(Region, Reset):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
if bus.aw._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}")
|
||||
|
||||
self.log.info("AXI lite master (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2020-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
self.aw_channel = AxiLiteAWSource(bus.aw, clock, reset, reset_active_level)
|
||||
@@ -159,6 +162,9 @@ class AxiLiteMasterWrite(Region, Reset):
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.awprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("awprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
@@ -182,7 +188,7 @@ class AxiLiteMasterWrite(Region, Reset):
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if address+len(data) >= 2**self.address_width:
|
||||
if address+len(data) > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.awprot_present and prot != AxiProt.NONSECURE:
|
||||
@@ -342,11 +348,14 @@ class AxiLiteMasterRead(Region, Reset):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
if bus.ar._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}")
|
||||
|
||||
self.log.info("AXI lite master (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2020-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level)
|
||||
@@ -404,6 +413,12 @@ class AxiLiteMasterRead(Region, Reset):
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.arprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("arprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
@@ -425,7 +440,7 @@ class AxiLiteMasterRead(Region, Reset):
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
if address+length >= 2**self.address_width:
|
||||
if address+length > 2**self.address_width:
|
||||
raise ValueError("Requested transfer overruns end of address space")
|
||||
|
||||
if not self.arprot_present and prot != AxiProt.NONSECURE:
|
||||
@@ -546,7 +561,7 @@ class AxiLiteMasterRead(Region, Reset):
|
||||
stop = end_offset
|
||||
|
||||
for j in range(start, stop):
|
||||
data.extend(bytearray([(cycle_data >> j*8) & 0xff]))
|
||||
data.append((cycle_data >> j*8) & 0xff)
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -38,11 +38,14 @@ class AxiLiteSlaveWrite(Reset):
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
if bus.aw._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}")
|
||||
|
||||
self.log.info("AXI lite slave model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2021-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
@@ -161,11 +164,14 @@ class AxiLiteSlaveRead(Reset):
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
if bus.ar._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}")
|
||||
|
||||
self.log.info("AXI lite slave model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2021-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -30,6 +30,11 @@ from cocotb.triggers import RisingEdge, Timer, First, Event
|
||||
from cocotb.utils import get_sim_time
|
||||
from cocotb_bus.bus import Bus
|
||||
|
||||
try:
|
||||
from cocotb.types import LogicArray
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from .version import __version__
|
||||
from .reset import Reset
|
||||
|
||||
@@ -266,11 +271,14 @@ class AxiStreamBase(Reset):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
|
||||
if bus._name:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
|
||||
else:
|
||||
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
|
||||
|
||||
self.log.info("AXI stream %s", self._type)
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("Copyright (c) 2020-2025 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
@@ -298,9 +306,13 @@ class AxiStreamBase(Reset):
|
||||
for sig in self._signals+self._optional_signals:
|
||||
if hasattr(self.bus, sig):
|
||||
if self._init_x and sig not in ("tvalid", "tready"):
|
||||
v = getattr(self.bus, sig).value
|
||||
v.binstr = 'x'*len(v)
|
||||
getattr(self.bus, sig).setimmediatevalue(v)
|
||||
s = getattr(self.bus, sig)
|
||||
try:
|
||||
v = LogicArray("x"*len(s.value))
|
||||
except NameError:
|
||||
v = s.value
|
||||
v.binstr = 'x'*len(v)
|
||||
s.setimmediatevalue(v)
|
||||
|
||||
if hasattr(self.bus, "tkeep"):
|
||||
self.byte_lanes = len(self.bus.tkeep)
|
||||
@@ -698,15 +710,15 @@ class AxiStreamMonitor(AxiStreamBase):
|
||||
self.active = True
|
||||
|
||||
for offset in range(self.byte_lanes):
|
||||
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
|
||||
frame.tdata.append((int(self.bus.tdata.value) >> (offset * self.byte_size)) & self.byte_mask)
|
||||
if has_tkeep:
|
||||
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
|
||||
frame.tkeep.append((int(self.bus.tkeep.value) >> offset) & 1)
|
||||
if has_tid:
|
||||
frame.tid.append(self.bus.tid.value.integer)
|
||||
frame.tid.append(int(self.bus.tid.value))
|
||||
if has_tdest:
|
||||
frame.tdest.append(self.bus.tdest.value.integer)
|
||||
frame.tdest.append(int(self.bus.tdest.value))
|
||||
if has_tuser:
|
||||
frame.tuser.append(self.bus.tuser.value.integer)
|
||||
frame.tuser.append(int(self.bus.tuser.value))
|
||||
|
||||
if not has_tlast or self.bus.tlast.value:
|
||||
frame.sim_time_end = get_sim_time()
|
||||
@@ -781,7 +793,7 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
|
||||
wake_event = self.wake_event.wait()
|
||||
|
||||
while True:
|
||||
pause_sample = self.pause
|
||||
pause_sample = bool(self.pause)
|
||||
|
||||
await clock_edge_event
|
||||
|
||||
@@ -799,15 +811,15 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
|
||||
self.active = True
|
||||
|
||||
for offset in range(self.byte_lanes):
|
||||
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
|
||||
frame.tdata.append((int(self.bus.tdata.value) >> (offset * self.byte_size)) & self.byte_mask)
|
||||
if has_tkeep:
|
||||
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
|
||||
frame.tkeep.append((int(self.bus.tkeep.value) >> offset) & 1)
|
||||
if has_tid:
|
||||
frame.tid.append(self.bus.tid.value.integer)
|
||||
frame.tid.append(int(self.bus.tid.value))
|
||||
if has_tdest:
|
||||
frame.tdest.append(self.bus.tdest.value.integer)
|
||||
frame.tdest.append(int(self.bus.tdest.value))
|
||||
if has_tuser:
|
||||
frame.tuser.append(self.bus.tuser.value.integer)
|
||||
frame.tuser.append(int(self.bus.tuser.value))
|
||||
|
||||
if not has_tlast or self.bus.tlast.value:
|
||||
frame.sim_time_end = get_sim_time()
|
||||
@@ -824,8 +836,14 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
|
||||
self.active = bool(frame)
|
||||
|
||||
if has_tready:
|
||||
self.bus.tready.value = (not self.full() and not pause_sample)
|
||||
paused = self.full() or pause_sample
|
||||
|
||||
if not tvalid_sample or (self.pause and pause_sample) or self.full():
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
self.bus.tready.value = not paused
|
||||
|
||||
if (not tvalid_sample or paused) and (pause_sample == bool(self.pause)):
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
else:
|
||||
if not tvalid_sample:
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -23,7 +23,7 @@ THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import RisingEdge, FallingEdge
|
||||
from cocotb.triggers import Edge
|
||||
|
||||
|
||||
class Reset:
|
||||
@@ -56,11 +56,14 @@ class Reset:
|
||||
|
||||
async def _run_reset(self, reset_signal, active_level):
|
||||
while True:
|
||||
if bool(reset_signal.value):
|
||||
await FallingEdge(reset_signal)
|
||||
self._ext_reset = not active_level
|
||||
self._update_reset()
|
||||
else:
|
||||
await RisingEdge(reset_signal)
|
||||
await Edge(reset_signal)
|
||||
try:
|
||||
level = bool(int(reset_signal.value))
|
||||
except ValueError:
|
||||
continue
|
||||
if level:
|
||||
self._ext_reset = active_level
|
||||
self._update_reset()
|
||||
else:
|
||||
self._ext_reset = not active_level
|
||||
self._update_reset()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2023 Alex Forencich
|
||||
Copyright (c) 2023-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -29,6 +29,11 @@ from cocotb.queue import Queue, QueueFull
|
||||
from cocotb.triggers import RisingEdge, Event, First, Timer
|
||||
from cocotb_bus.bus import Bus
|
||||
|
||||
try:
|
||||
from cocotb.types import LogicArray
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
@@ -119,9 +124,13 @@ class StreamBase(Reset):
|
||||
if sig in self._signal_widths:
|
||||
assert len(getattr(self.bus, sig)) == self._signal_widths[sig]
|
||||
if self._init_x and sig not in (self._valid_signal, self._ready_signal):
|
||||
v = getattr(self.bus, sig).value
|
||||
v.binstr = 'x'*len(v)
|
||||
getattr(self.bus, sig).setimmediatevalue(v)
|
||||
s = getattr(self.bus, sig)
|
||||
try:
|
||||
v = LogicArray("x"*len(s.value))
|
||||
except NameError:
|
||||
v = s.value
|
||||
v.binstr = 'x'*len(v)
|
||||
s.setimmediatevalue(v)
|
||||
|
||||
self._run_cr = None
|
||||
|
||||
@@ -249,24 +258,27 @@ class StreamSource(StreamBase, StreamPause):
|
||||
self.valid.value = 0
|
||||
|
||||
async def _run(self):
|
||||
has_valid = self.valid is not None
|
||||
has_ready = self.ready is not None
|
||||
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
while True:
|
||||
await clock_edge_event
|
||||
|
||||
# read handshake signals
|
||||
ready_sample = self.ready is None or self.ready.value
|
||||
valid_sample = self.valid is None or self.valid.value
|
||||
ready_sample = not has_ready or self.ready.value
|
||||
valid_sample = not has_valid or self.valid.value
|
||||
|
||||
if (ready_sample and valid_sample) or (not valid_sample):
|
||||
if not self.queue.empty() and not self.pause:
|
||||
self.bus.drive(self.queue.get_nowait())
|
||||
self.dequeue_event.set()
|
||||
if self.valid is not None:
|
||||
if has_valid:
|
||||
self.valid.value = 1
|
||||
self.active = True
|
||||
else:
|
||||
if self.valid is not None:
|
||||
if has_valid:
|
||||
self.valid.value = 0
|
||||
self.active = not self.queue.empty()
|
||||
if self.queue.empty():
|
||||
@@ -331,6 +343,9 @@ class StreamMonitor(StreamBase):
|
||||
self.wake_event.set()
|
||||
|
||||
async def _run(self):
|
||||
has_valid = self.valid is not None
|
||||
has_ready = self.ready is not None
|
||||
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
wake_event = self.wake_event.wait()
|
||||
@@ -339,8 +354,8 @@ class StreamMonitor(StreamBase):
|
||||
await clock_edge_event
|
||||
|
||||
# read handshake signals
|
||||
ready_sample = self.ready is None or self.ready.value
|
||||
valid_sample = self.valid is None or self.valid.value
|
||||
ready_sample = not has_ready or self.ready.value
|
||||
valid_sample = not has_valid or self.valid.value
|
||||
|
||||
if ready_sample and valid_sample:
|
||||
obj = self._transaction_obj()
|
||||
@@ -384,18 +399,21 @@ class StreamSink(StreamMonitor, StreamPause):
|
||||
self.wake_event.set()
|
||||
|
||||
async def _run(self):
|
||||
has_valid = self.valid is not None
|
||||
has_ready = self.ready is not None
|
||||
|
||||
clock_edge_event = RisingEdge(self.clock)
|
||||
|
||||
wake_event = self.wake_event.wait()
|
||||
|
||||
while True:
|
||||
pause_sample = self.pause
|
||||
pause_sample = bool(self.pause)
|
||||
|
||||
await clock_edge_event
|
||||
|
||||
# read handshake signals
|
||||
ready_sample = self.ready is None or self.ready.value
|
||||
valid_sample = self.valid is None or self.valid.value
|
||||
ready_sample = not has_ready or self.ready.value
|
||||
valid_sample = not has_valid or self.valid.value
|
||||
|
||||
if ready_sample and valid_sample:
|
||||
obj = self._transaction_obj()
|
||||
@@ -403,12 +421,18 @@ class StreamSink(StreamMonitor, StreamPause):
|
||||
self.queue.put_nowait(obj)
|
||||
self.active_event.set()
|
||||
|
||||
if self.ready is not None:
|
||||
self.ready.value = (not self.full() and not pause_sample)
|
||||
if has_ready:
|
||||
paused = self.full() or pause_sample
|
||||
|
||||
if not valid_sample or (self.pause and pause_sample) or self.full():
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
self.ready.value = not paused
|
||||
|
||||
if (not valid_sample or paused) and (pause_sample == bool(self.pause)):
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
else:
|
||||
if not valid_sample:
|
||||
self.wake_event.clear()
|
||||
await wake_event
|
||||
|
||||
|
||||
def define_stream(name, signals, optional_signals=None, valid_signal=None, ready_signal=None, signal_widths=None):
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.1.22"
|
||||
__version__ = "0.1.27"
|
||||
|
||||
16
setup.cfg
16
setup.cfg
@@ -13,7 +13,7 @@ project_urls =
|
||||
Source Code = https://github.com/alexforencich/cocotbext-axi
|
||||
download_url = https://github.com/alexforencich/cocotbext-axi/tarball/master
|
||||
long_description = file: README.md
|
||||
long-description-content-type = text/markdown
|
||||
long_description_content_type = text/markdown
|
||||
platforms = any
|
||||
classifiers =
|
||||
Development Status :: 3 - Alpha
|
||||
@@ -47,17 +47,19 @@ addopts =
|
||||
|
||||
# tox configuration
|
||||
[tox:tox]
|
||||
envlist = py37, py38, py39, py310
|
||||
envlist = py38, py39, py310, py311, py312, py313
|
||||
skip_missing_interpreters = true
|
||||
minversion = 3.18.0
|
||||
requires = virtualenv >= 16.1
|
||||
|
||||
[gh-actions]
|
||||
python =
|
||||
3.7: py37
|
||||
3.8: py38
|
||||
3.9: py39
|
||||
3.10: py310
|
||||
3.11: py311
|
||||
3.12: py312
|
||||
3.13: py313
|
||||
|
||||
[testenv]
|
||||
setenv =
|
||||
@@ -65,11 +67,11 @@ setenv =
|
||||
usedevelop = True
|
||||
|
||||
deps =
|
||||
pytest == 7.2.1
|
||||
pytest-xdist == 3.1.0
|
||||
cocotb == 1.7.2
|
||||
pytest == 8.3.4
|
||||
pytest-xdist == 3.6.1
|
||||
cocotb == 1.9.2
|
||||
cocotb-bus == 0.2.1
|
||||
cocotb-test == 0.2.4
|
||||
cocotb-test == 0.2.6
|
||||
coverage == 7.0.5
|
||||
pytest-cov == 4.0.0
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright (c) 2020 Alex Forencich
|
||||
# Copyright (c) 2020-2025 Alex Forencich
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
54
tests/apb/Makefile
Normal file
54
tests/apb/Makefile
Normal file
@@ -0,0 +1,54 @@
|
||||
# Copyright (c) 2025 Alex Forencich
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
# THE SOFTWARE.
|
||||
|
||||
TOPLEVEL_LANG = verilog
|
||||
|
||||
SIM ?= icarus
|
||||
WAVES ?= 0
|
||||
|
||||
COCOTB_HDL_TIMEUNIT = 1ns
|
||||
COCOTB_HDL_TIMEPRECISION = 1ns
|
||||
|
||||
DUT = test_apb
|
||||
COCOTB_TEST_MODULES = $(DUT)
|
||||
COCOTB_TOPLEVEL = $(DUT)
|
||||
MODULE = $(COCOTB_TEST_MODULES)
|
||||
TOPLEVEL = $(COCOTB_TOPLEVEL)
|
||||
VERILOG_SOURCES += $(DUT).v
|
||||
|
||||
# module parameters
|
||||
export PARAM_DATA_W := 32
|
||||
export PARAM_ADDR_W := 32
|
||||
export PARAM_STRB_W := $(shell expr $(PARAM_DATA_W) / 8 )
|
||||
|
||||
ifeq ($(SIM), icarus)
|
||||
PLUSARGS += -fst
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
else ifeq ($(SIM), verilator)
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
COMPILE_ARGS += --trace-fst
|
||||
VERILATOR_TRACE = 1
|
||||
endif
|
||||
endif
|
||||
|
||||
include $(shell cocotb-config --makefiles)/Makefile.sim
|
||||
332
tests/apb/test_apb.py
Normal file
332
tests/apb/test_apb.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
|
||||
import cocotb_test.simulator
|
||||
import pytest
|
||||
|
||||
import cocotb
|
||||
from cocotb.clock import Clock
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
from cocotb.regression import TestFactory
|
||||
|
||||
from cocotbext.axi import ApbBus, ApbMaster, ApbRam
|
||||
|
||||
|
||||
class TB:
|
||||
def __init__(self, dut):
|
||||
self.dut = dut
|
||||
|
||||
self.log = logging.getLogger("cocotb.tb")
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
|
||||
cocotb.start_soon(Clock(dut.clk, 2, units="ns").start())
|
||||
|
||||
self.apb_master = ApbMaster(ApbBus.from_prefix(dut, "apb"), dut.clk, dut.rst)
|
||||
self.apb_ram = ApbRam(ApbBus.from_prefix(dut, "apb"), dut.clk, dut.rst, size=2**16)
|
||||
|
||||
def set_idle_generator(self, generator=None):
|
||||
if generator:
|
||||
self.apb_master.set_pause_generator(generator())
|
||||
|
||||
def set_backpressure_generator(self, generator=None):
|
||||
if generator:
|
||||
self.apb_ram.set_pause_generator(generator())
|
||||
|
||||
async def cycle_reset(self):
|
||||
self.dut.rst.setimmediatevalue(0)
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
self.dut.rst.value = 1
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
self.dut.rst.value = 0
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
|
||||
|
||||
async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
|
||||
byte_lanes = tb.apb_master.byte_lanes
|
||||
|
||||
await tb.cycle_reset()
|
||||
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in range(1, byte_lanes*2):
|
||||
for offset in range(byte_lanes):
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.apb_ram.write(addr-128, b'\xaa'*(length+256))
|
||||
|
||||
await tb.apb_master.write(addr, test_data)
|
||||
|
||||
tb.log.debug("%s", tb.apb_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
|
||||
|
||||
assert tb.apb_ram.read(addr, length) == test_data
|
||||
assert tb.apb_ram.read(addr-1, 1) == b'\xaa'
|
||||
assert tb.apb_ram.read(addr+length, 1) == b'\xaa'
|
||||
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
|
||||
byte_lanes = tb.apb_master.byte_lanes
|
||||
|
||||
await tb.cycle_reset()
|
||||
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in range(1, byte_lanes*2):
|
||||
for offset in range(byte_lanes):
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.apb_ram.write(addr, test_data)
|
||||
|
||||
data = await tb.apb_master.read(addr, length)
|
||||
|
||||
assert data.data == test_data
|
||||
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_test_write_words(dut):
|
||||
|
||||
tb = TB(dut)
|
||||
|
||||
byte_lanes = tb.apb_master.byte_lanes
|
||||
|
||||
await tb.cycle_reset()
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_lanes)):
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
event = tb.apb_master.init_write(addr, test_data)
|
||||
await event.wait()
|
||||
assert tb.apb_ram.read(addr, length) == test_data
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
await tb.apb_master.write(addr, test_data)
|
||||
assert tb.apb_ram.read(addr, length) == test_data
|
||||
|
||||
test_data = [x * 0x1001 for x in range(length)]
|
||||
await tb.apb_master.write_words(addr, test_data)
|
||||
assert tb.apb_ram.read_words(addr, length) == test_data
|
||||
|
||||
test_data = [x * 0x10200201 for x in range(length)]
|
||||
await tb.apb_master.write_dwords(addr, test_data)
|
||||
assert tb.apb_ram.read_dwords(addr, length) == test_data
|
||||
|
||||
test_data = [x * 0x1020304004030201 for x in range(length)]
|
||||
await tb.apb_master.write_qwords(addr, test_data)
|
||||
assert tb.apb_ram.read_qwords(addr, length) == test_data
|
||||
|
||||
test_data = 0x01*length
|
||||
await tb.apb_master.write_byte(addr, test_data)
|
||||
assert tb.apb_ram.read_byte(addr) == test_data
|
||||
|
||||
test_data = 0x1001*length
|
||||
await tb.apb_master.write_word(addr, test_data)
|
||||
assert tb.apb_ram.read_word(addr) == test_data
|
||||
|
||||
test_data = 0x10200201*length
|
||||
await tb.apb_master.write_dword(addr, test_data)
|
||||
assert tb.apb_ram.read_dword(addr) == test_data
|
||||
|
||||
test_data = 0x1020304004030201*length
|
||||
await tb.apb_master.write_qword(addr, test_data)
|
||||
assert tb.apb_ram.read_qword(addr) == test_data
|
||||
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_test_read_words(dut):
|
||||
|
||||
tb = TB(dut)
|
||||
|
||||
byte_lanes = tb.apb_master.byte_lanes
|
||||
|
||||
await tb.cycle_reset()
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_lanes)):
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
tb.apb_ram.write(addr, test_data)
|
||||
event = tb.apb_master.init_read(addr, length)
|
||||
await event.wait()
|
||||
assert event.data.data == test_data
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
tb.apb_ram.write(addr, test_data)
|
||||
assert (await tb.apb_master.read(addr, length)).data == test_data
|
||||
|
||||
test_data = [x * 0x1001 for x in range(length)]
|
||||
tb.apb_ram.write_words(addr, test_data)
|
||||
assert await tb.apb_master.read_words(addr, length) == test_data
|
||||
|
||||
test_data = [x * 0x10200201 for x in range(length)]
|
||||
tb.apb_ram.write_dwords(addr, test_data)
|
||||
assert await tb.apb_master.read_dwords(addr, length) == test_data
|
||||
|
||||
test_data = [x * 0x1020304004030201 for x in range(length)]
|
||||
tb.apb_ram.write_qwords(addr, test_data)
|
||||
assert await tb.apb_master.read_qwords(addr, length) == test_data
|
||||
|
||||
test_data = 0x01*length
|
||||
tb.apb_ram.write_byte(addr, test_data)
|
||||
assert await tb.apb_master.read_byte(addr) == test_data
|
||||
|
||||
test_data = 0x1001*length
|
||||
tb.apb_ram.write_word(addr, test_data)
|
||||
assert await tb.apb_master.read_word(addr) == test_data
|
||||
|
||||
test_data = 0x10200201*length
|
||||
tb.apb_ram.write_dword(addr, test_data)
|
||||
assert await tb.apb_master.read_dword(addr) == test_data
|
||||
|
||||
test_data = 0x1020304004030201*length
|
||||
tb.apb_ram.write_qword(addr, test_data)
|
||||
assert await tb.apb_master.read_qword(addr) == test_data
|
||||
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
|
||||
await tb.cycle_reset()
|
||||
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
async def worker(master, offset, aperture, count=16):
|
||||
for k in range(count):
|
||||
length = random.randint(1, min(32, aperture))
|
||||
addr = offset+random.randint(0, aperture-length)
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
await Timer(random.randint(1, 100), 'ns')
|
||||
|
||||
await master.write(addr, test_data)
|
||||
|
||||
await Timer(random.randint(1, 100), 'ns')
|
||||
|
||||
data = await master.read(addr, length)
|
||||
assert data.data == test_data
|
||||
|
||||
workers = []
|
||||
|
||||
for k in range(16):
|
||||
workers.append(cocotb.start_soon(worker(tb.apb_master, k*0x1000, 0x1000, count=16)))
|
||||
|
||||
while workers:
|
||||
await workers.pop(0).join()
|
||||
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
if getattr(cocotb, 'top', None) is not None:
|
||||
|
||||
for test in [run_test_write, run_test_read]:
|
||||
|
||||
factory = TestFactory(test)
|
||||
factory.add_option("idle_inserter", [None, cycle_pause])
|
||||
factory.add_option("backpressure_inserter", [None, cycle_pause])
|
||||
factory.generate_tests()
|
||||
|
||||
for test in [run_test_write_words, run_test_read_words]:
|
||||
|
||||
factory = TestFactory(test)
|
||||
factory.generate_tests()
|
||||
|
||||
factory = TestFactory(run_stress_test)
|
||||
factory.generate_tests()
|
||||
|
||||
|
||||
# cocotb-test
|
||||
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_w", [8, 16, 32])
|
||||
def test_apb(request, data_w):
|
||||
dut = "test_apb"
|
||||
module = os.path.splitext(os.path.basename(__file__))[0]
|
||||
toplevel = dut
|
||||
|
||||
verilog_sources = [
|
||||
os.path.join(os.path.dirname(__file__), f"{dut}.v"),
|
||||
]
|
||||
|
||||
parameters = {}
|
||||
|
||||
parameters['DATA_W'] = data_w
|
||||
parameters['ADDR_W'] = 32
|
||||
parameters['STRB_W'] = parameters['DATA_W'] // 8
|
||||
|
||||
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
|
||||
|
||||
sim_build = os.path.join(tests_dir, "sim_build",
|
||||
request.node.name.replace('[', '-').replace(']', ''))
|
||||
|
||||
cocotb_test.simulator.run(
|
||||
python_search=[tests_dir],
|
||||
verilog_sources=verilog_sources,
|
||||
toplevel=toplevel,
|
||||
module=module,
|
||||
parameters=parameters,
|
||||
sim_build=sim_build,
|
||||
extra_env=extra_env,
|
||||
)
|
||||
54
tests/apb/test_apb.v
Normal file
54
tests/apb/test_apb.v
Normal file
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
|
||||
Copyright (c) 2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
*/
|
||||
|
||||
// Language: Verilog 2001
|
||||
|
||||
`timescale 1ns / 1ns
|
||||
|
||||
/*
|
||||
* APB test module
|
||||
*/
|
||||
module test_apb #
|
||||
(
|
||||
parameter DATA_W = 32,
|
||||
parameter ADDR_W = 16,
|
||||
parameter STRB_W = (DATA_W/8)
|
||||
)
|
||||
(
|
||||
input wire clk,
|
||||
input wire rst,
|
||||
|
||||
inout wire [ADDR_W-1:0] apb_paddr,
|
||||
inout wire [2:0] apb_pprot,
|
||||
inout wire apb_psel,
|
||||
inout wire apb_penable,
|
||||
inout wire apb_pwrite,
|
||||
inout wire [DATA_W-1:0] apb_pwdata,
|
||||
inout wire [STRB_W-1:0] apb_pstrb,
|
||||
inout wire apb_pready,
|
||||
inout wire [DATA_W-1:0] apb_prdata,
|
||||
inout wire apb_pslverr
|
||||
);
|
||||
|
||||
endmodule
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright (c) 2020 Alex Forencich
|
||||
# Copyright (c) 2020-2025 Alex Forencich
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -27,50 +27,34 @@ COCOTB_HDL_TIMEUNIT = 1ns
|
||||
COCOTB_HDL_TIMEPRECISION = 1ns
|
||||
|
||||
DUT = test_axi
|
||||
TOPLEVEL = $(DUT)
|
||||
MODULE = $(DUT)
|
||||
COCOTB_TEST_MODULES = $(DUT)
|
||||
COCOTB_TOPLEVEL = $(DUT)
|
||||
MODULE = $(COCOTB_TEST_MODULES)
|
||||
TOPLEVEL = $(COCOTB_TOPLEVEL)
|
||||
VERILOG_SOURCES += $(DUT).v
|
||||
|
||||
# module parameters
|
||||
export PARAM_DATA_WIDTH ?= 32
|
||||
export PARAM_ADDR_WIDTH ?= 32
|
||||
export PARAM_STRB_WIDTH ?= $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
export PARAM_ID_WIDTH ?= 8
|
||||
export PARAM_AWUSER_WIDTH ?= 1
|
||||
export PARAM_WUSER_WIDTH ?= 1
|
||||
export PARAM_BUSER_WIDTH ?= 1
|
||||
export PARAM_ARUSER_WIDTH ?= 1
|
||||
export PARAM_RUSER_WIDTH ?= 1
|
||||
export PARAM_DATA_WIDTH := 32
|
||||
export PARAM_ADDR_WIDTH := 32
|
||||
export PARAM_STRB_WIDTH := $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
export PARAM_ID_WIDTH := 8
|
||||
export PARAM_AWUSER_WIDTH := 1
|
||||
export PARAM_WUSER_WIDTH := 1
|
||||
export PARAM_BUSER_WIDTH := 1
|
||||
export PARAM_ARUSER_WIDTH := 1
|
||||
export PARAM_RUSER_WIDTH := 1
|
||||
|
||||
ifeq ($(SIM), icarus)
|
||||
PLUSARGS += -fst
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
VERILOG_SOURCES += iverilog_dump.v
|
||||
COMPILE_ARGS += -s iverilog_dump
|
||||
endif
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
else ifeq ($(SIM), verilator)
|
||||
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH -Wno-CASEINCOMPLETE
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
COMPILE_ARGS += --trace-fst
|
||||
VERILATOR_TRACE = 1
|
||||
endif
|
||||
endif
|
||||
|
||||
include $(shell cocotb-config --makefiles)/Makefile.sim
|
||||
|
||||
iverilog_dump.v:
|
||||
echo 'module iverilog_dump();' > $@
|
||||
echo 'initial begin' >> $@
|
||||
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
|
||||
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
|
||||
echo 'end' >> $@
|
||||
echo 'endmodule' >> $@
|
||||
|
||||
clean::
|
||||
@rm -rf iverilog_dump.v
|
||||
@rm -rf dump.fst $(TOPLEVEL).fst
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -296,7 +296,7 @@ def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
if getattr(cocotb, 'top', None) is not None:
|
||||
|
||||
data_width = len(cocotb.top.axi_wdata)
|
||||
byte_lanes = data_width // 8
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright (c) 2020 Alex Forencich
|
||||
# Copyright (c) 2020-2025 Alex Forencich
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -27,44 +27,28 @@ COCOTB_HDL_TIMEUNIT = 1ns
|
||||
COCOTB_HDL_TIMEPRECISION = 1ns
|
||||
|
||||
DUT = test_axil
|
||||
TOPLEVEL = $(DUT)
|
||||
MODULE = $(DUT)
|
||||
COCOTB_TEST_MODULES = $(DUT)
|
||||
COCOTB_TOPLEVEL = $(DUT)
|
||||
MODULE = $(COCOTB_TEST_MODULES)
|
||||
TOPLEVEL = $(COCOTB_TOPLEVEL)
|
||||
VERILOG_SOURCES += $(DUT).v
|
||||
|
||||
# module parameters
|
||||
export PARAM_DATA_WIDTH ?= 32
|
||||
export PARAM_ADDR_WIDTH ?= 32
|
||||
export PARAM_STRB_WIDTH ?= $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
export PARAM_DATA_WIDTH := 32
|
||||
export PARAM_ADDR_WIDTH := 32
|
||||
export PARAM_STRB_WIDTH := $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
|
||||
ifeq ($(SIM), icarus)
|
||||
PLUSARGS += -fst
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
VERILOG_SOURCES += iverilog_dump.v
|
||||
COMPILE_ARGS += -s iverilog_dump
|
||||
endif
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
else ifeq ($(SIM), verilator)
|
||||
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
COMPILE_ARGS += --trace-fst
|
||||
VERILATOR_TRACE = 1
|
||||
endif
|
||||
endif
|
||||
|
||||
include $(shell cocotb-config --makefiles)/Makefile.sim
|
||||
|
||||
iverilog_dump.v:
|
||||
echo 'module iverilog_dump();' > $@
|
||||
echo 'initial begin' >> $@
|
||||
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
|
||||
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
|
||||
echo 'end' >> $@
|
||||
echo 'endmodule' >> $@
|
||||
|
||||
clean::
|
||||
@rm -rf iverilog_dump.v
|
||||
@rm -rf dump.fst $(TOPLEVEL).fst
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -285,7 +285,7 @@ def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
if getattr(cocotb, 'top', None) is not None:
|
||||
|
||||
for test in [run_test_write, run_test_read]:
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Copyright (c) 2020 Alex Forencich
|
||||
# Copyright (c) 2020-2025 Alex Forencich
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -27,46 +27,30 @@ COCOTB_HDL_TIMEUNIT = 1ns
|
||||
COCOTB_HDL_TIMEPRECISION = 1ns
|
||||
|
||||
DUT = test_axis
|
||||
TOPLEVEL = $(DUT)
|
||||
MODULE = $(DUT)
|
||||
COCOTB_TEST_MODULES = $(DUT)
|
||||
COCOTB_TOPLEVEL = $(DUT)
|
||||
MODULE = $(COCOTB_TEST_MODULES)
|
||||
TOPLEVEL = $(COCOTB_TOPLEVEL)
|
||||
VERILOG_SOURCES += $(DUT).v
|
||||
|
||||
# module parameters
|
||||
export PARAM_DATA_WIDTH ?= 8
|
||||
export PARAM_KEEP_WIDTH ?= $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
export PARAM_ID_WIDTH ?= 8
|
||||
export PARAM_DEST_WIDTH ?= 8
|
||||
export PARAM_USER_WIDTH ?= 1
|
||||
export PARAM_DATA_WIDTH := 8
|
||||
export PARAM_KEEP_WIDTH := $(shell expr $(PARAM_DATA_WIDTH) / 8 )
|
||||
export PARAM_ID_WIDTH := 8
|
||||
export PARAM_DEST_WIDTH := 8
|
||||
export PARAM_USER_WIDTH := 1
|
||||
|
||||
ifeq ($(SIM), icarus)
|
||||
PLUSARGS += -fst
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
VERILOG_SOURCES += iverilog_dump.v
|
||||
COMPILE_ARGS += -s iverilog_dump
|
||||
endif
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
|
||||
else ifeq ($(SIM), verilator)
|
||||
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
|
||||
|
||||
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
|
||||
|
||||
ifeq ($(WAVES), 1)
|
||||
COMPILE_ARGS += --trace-fst
|
||||
VERILATOR_TRACE = 1
|
||||
endif
|
||||
endif
|
||||
|
||||
include $(shell cocotb-config --makefiles)/Makefile.sim
|
||||
|
||||
iverilog_dump.v:
|
||||
echo 'module iverilog_dump();' > $@
|
||||
echo 'initial begin' >> $@
|
||||
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
|
||||
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
|
||||
echo 'end' >> $@
|
||||
echo 'endmodule' >> $@
|
||||
|
||||
clean::
|
||||
@rm -rf iverilog_dump.v
|
||||
@rm -rf dump.fst $(TOPLEVEL).fst
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env python
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -135,7 +135,7 @@ def incrementing_payload(length):
|
||||
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
if getattr(cocotb, 'top', None) is not None:
|
||||
|
||||
factory = TestFactory(run_test)
|
||||
factory.add_option("payload_lengths", [size_list])
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
/*
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2020-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
Copyright (c) 2021-2025 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
Reference in New Issue
Block a user