49 Commits

Author SHA1 Message Date
Alex Forencich
f3a7652362 Release v0.1.20
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-25 17:55:54 -08:00
Alex Forencich
a84ce5447d Put sinks to sleep when idle
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-25 17:46:46 -08:00
Alex Forencich
1c03ec4697 Pass through full address for unaligned operations
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-25 16:27:14 -08:00
Alex Forencich
824eba793d Update package versions
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-24 12:47:15 -08:00
Alex Forencich
a0aad34698 Fix path issue so latest coverage works
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 20:57:30 -08:00
Alex Forencich
ede6270ed7 Put source to sleep when there is no data to send
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 15:49:16 -08:00
Alex Forencich
cd1a8b47a5 Fix init sequence
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 15:48:46 -08:00
Alex Forencich
be6d490adb Cache signal presence
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 15:48:23 -08:00
Alex Forencich
39686b849a Update github actions versions
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 15:30:32 -08:00
Alex Forencich
706051cb89 Fix tox config and lock package versions
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-20 15:29:11 -08:00
Alex Forencich
3e4f8d7e92 Python 3.6 is EOL; remove from CI tests
Signed-off-by: Alex Forencich <alex@alexforencich.com>
2023-01-18 14:55:25 -08:00
Leon Woestenberg
afae9e69ff Fix AxiStreamFrame default for self.byte_lanes from 1 to all.
If I connect a AXIS source to an AXIS sink, the #byte_lanes is incorrectly 1 rather than all lanes enabled.
self.source = AxiStreamSource(AxiStreamBus.from_prefix(dut, "sink"), dut.clk, dut.reset)

Root cause is AxiStreamFrame assumes byte width 1 without TKEEP, but it should default to self.width // 8
because the AXIS specification mentions "when TKEEP is absent, TKEEP defaults to all bits HIGH" and "The width of the data payload is an integer number of bytes."

Fix: https://github.com/alexforencich/cocotbext-axi/blob/master/cocotbext/axi/axis.py#L290

self.byte_lanes = 1
self.byte_lanes = self.width // 8

Relevant AXIS Specification:
https://developer.arm.com/documentation/ihi0051/a/Default-Signaling-Requirements/Default-value-signaling/Optional-TKEEP-and-TSTRB?lang=en

Signed-off-by: Leon Woestenberg <leon@sidebranch.com>
2023-01-18 12:55:19 -08:00
Alex Forencich
035c1ba803 Support interleaved read data in AXI master 2022-02-01 00:25:01 -08:00
Alex Forencich
873bb1a034 Explicit cast to integer before converting to enum or flag type 2022-01-07 12:52:41 -08:00
Alex Forencich
2d70e5cbe5 Fix AxiLiteSlave wrapper 2022-01-04 15:29:04 -08:00
Alex Forencich
35d9742ae8 Remove extraneous code 2022-01-04 15:28:48 -08:00
Alex Forencich
0f20e2e9bf Bump to dev version 2021-12-28 20:08:44 -08:00
Alex Forencich
7606d7d7bd Release v0.1.18 2021-12-28 17:23:06 -08:00
Alex Forencich
dd345e87c3 Call write from init_write via start_soon so command FIFO size can be limited 2021-12-27 23:29:29 -08:00
Alex Forencich
9c0592c16a Make wstrb optional 2021-12-27 19:44:30 -08:00
Alex Forencich
8aab5a7294 Support overriding allocated region and window types 2021-12-27 17:58:52 -08:00
Alex Forencich
4f26621e2b Make size optional when creating windows 2021-12-27 17:31:08 -08:00
Alex Forencich
1b6993d80d Use start_soon instead of fork 2021-12-27 17:10:37 -08:00
Alex Forencich
6d9ed8a2d2 Specify min package versions 2021-12-27 17:03:19 -08:00
Alex Forencich
d772b73eb2 Specify min tox and venv versions 2021-12-27 17:02:59 -08:00
Alex Forencich
bd88eda17b Skip missing interpreters 2021-12-27 17:02:39 -08:00
Alex Forencich
53313699a9 Test on Python 3.10 2021-12-27 17:00:44 -08:00
Alex Forencich
3f7193b77c Use start_soon instead of fork 2021-12-08 21:38:12 -08:00
Alex Forencich
2b0b12c68d Cache clock edge event objects 2021-12-03 18:40:04 -08:00
Alex Forencich
4a91212f37 Bump to dev version 2021-11-17 00:06:34 -08:00
Alex Forencich
1608af26e5 Release v0.1.16 2021-11-16 22:54:37 -08:00
Alex Forencich
31fb855311 Update readme 2021-11-16 22:40:42 -08:00
Alex Forencich
cb4b0e1738 Wrap access on RAM size 2021-11-16 17:13:18 -08:00
Alex Forencich
ea95eeaf0d Don't pass through extra positional args 2021-11-16 17:01:31 -08:00
Alex Forencich
079f4009b3 Rewrite RAM modules to use common slave implementation 2021-11-16 17:00:48 -08:00
Alex Forencich
612a94c97a Add AXI and AXI lite slave modules 2021-11-16 17:00:05 -08:00
Alex Forencich
757e3a6f2d AXI master modules extend Region 2021-11-16 16:58:50 -08:00
Alex Forencich
b9b9a2da72 Add address space abstraction 2021-11-16 16:55:53 -08:00
Alex Forencich
f7660e9038 Add buddy allocator 2021-11-16 16:53:40 -08:00
Alex Forencich
78693a63d5 Fix types 2021-11-10 23:59:11 -08:00
Alex Forencich
34498f6e5d Add write data type check 2021-11-10 23:49:13 -08:00
Alex Forencich
6329187ced Add address range checks 2021-11-10 23:48:47 -08:00
Alex Forencich
da24857dd2 Cast write data to bytes instead of bytearray 2021-11-10 23:45:46 -08:00
Alex Forencich
c08f22c710 Bring out address and ID signal widths 2021-11-10 21:56:08 -08:00
Alex Forencich
d874d91d05 Use typing.NamedTuple instead of collections.namedtuple to add __bytes__ cast 2021-11-10 21:49:58 -08:00
Alex Forencich
3fd016a84c Lazy logging 2021-11-09 00:53:37 -08:00
Alex Forencich
43de2ea9b0 Use getattr with default value when accessing optional signals 2021-11-09 00:46:37 -08:00
Alex Forencich
558ba51c91 Use correct transaction object 2021-11-09 00:13:19 -08:00
Alex Forencich
f6426bd8f3 Bump to dev version 2021-11-07 13:12:32 -08:00
23 changed files with 1848 additions and 874 deletions

View File

@@ -9,13 +9,13 @@ jobs:
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
python-version: ["3.7", "3.8", "3.9", "3.10"]
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

View File

@@ -32,7 +32,7 @@ See the `tests` directory, [verilog-axi](https://github.com/alexforencich/verilo
### AXI and AXI lite master
The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs.
The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region.
The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
@@ -123,9 +123,39 @@ With this method, it is possible to start multiple concurrent operations from th
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
### AXI and AXI lite slave
The `AxiSlave` and `AxiLiteSlave` classes implement AXI slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality.
The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiBus, AxiSlave, MemoryRegion
axi_slave = AxiSlave(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst)
region = MemoryRegion(2**axi_slave.read_if.address_width)
axi_slave.target = region
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
It is also possible to extend these modules; operation can be customized by overriding the internal `_read()` and `_write()` methods. See `AxiRam` and `AxiLiteRam` for an example.
#### `AxiSlave` and `AxiLiteSlave` constructor parameters
* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals
* _clock_: clock signal
* _reset_: reset signal (optional)
* _reset_active_level_: reset active level (optional, default `True`)
* _target_: target region (optional, default `None`)
#### Attributes:
* _target_: target region
### AXI and AXI lite RAM
The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts.
The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave` and `AxiLiteSlave` modules.
The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
@@ -290,6 +320,66 @@ Methods:
* `normalize()`: pack `tkeep`, `tid`, `tdest`, and `tuser` to the same length as `tdata`, replicating last element if necessary, initialize `tkeep` to list of `1` and `tid`, `tdest`, and `tuser` to list of `0` if not specified.
* `compact()`: remove `tdata`, `tid`, `tdest`, and `tuser` values based on `tkeep`, remove `tkeep`, compact `tid`, `tdest`, and `tuser` to an int if all values are identical.
### Address space abstraction
The address space abstraction provides a framework for cross-connecting multiple memory-mapped interfaces for testing components that interface with complex systems, including components with DMA engines.
`MemoryInterface` is the base class for all components in the address space abstraction. `MemoryInterface` provides the core `read()` and `write()` methods, which implement bounds checking, as well as word-access wrappers. Methods for creating `Window` and `WindowPool` objects are also provided. The function `get_absolute_address()` translates addresses to the system address space. `MemoryInterface` can be extended to implement custom functionality by overriding `_read()` and `_write()`.
`Window` objects represent views onto a parent address space with some length and offset. `read()` and `write()` operations on a `Window` are translated to the equivalent operations on the parent address space. Multiple `Window` instances can overlap and access the same portion of address space.
`WindowPool` provides a method for dynamically allocating windows from a section of address space. It uses a standard memory management algorithm to provide naturally-aligned `Window` objects of the requested size.
`Region` is the base class for all components which implement a portion of address space. `Region` objects can be registered with `AddressSpace` objects to handle `read()` and `write()` operations in a specified region. `Region` can be extended by components that implement a portion of address space.
`MemoryRegion` is an extension of `Region` that uses an `mmap` instance to handle memory operations. `MemoryRegion` also provides hex dump methods as well as indexing and slicing.
`PeripheralRegion` is an extension of `Region` that can wrap another object that implements `read()` and `write()`, as an alternative to extending `Region`.
`AddressSpace` is the core object for handling address spaces. `Region` objects can be registered with `AddressSpace` with specified base address, size, and offset. The `AddressSpace` object will then direct `read()` and `write()` operations to the appropriate `Region`s, splitting requests appropriately when necessary and translating addresses. Regions registered with `offset` other than `None` are translated such that accesses to base address + N map to N + offset. Regions registered with an `offset` of `None` are not translated. `Region` objects registered with the same `AddressSpace` cannot overlap, however the same `Region` can be registered multiple times. `AddressSpace` also provides a method for creating `Pool` objects.
`Pool` is an extension of `AddressSpace` that supports dynamic allocation of `MemoryRegion`s. It uses a standard memory management algorithm to provide naturally-aligned `MemoryRegion` objects of the requested size.
#### Example
This is a simple example that shows how the address space abstraction components can be used to connect a DUT to a simulated host system, including simulated RAM, an AXI interface from the DUT for DMA, and an AXI lite interface to the DUT for control.
from cocotbext.axi import AddressSpace, MemoryRegion
from cocotbext.axi import AxiBus, AxiLiteMaster, AxiSlave
# system address space
address_space = AddressSpace(2**32)
# RAM
ram = MemoryRegion(2**24)
address_space.register_region(ram, 0x0000_0000)
ram_pool = address_space.create_window_pool(0x0000_0000, 2**20)
# DUT control register interface
axil_master = AxiLiteMaster(AxiLiteBus.from_prefix(dut, "s_axil_ctrl"), dut.clk, dut.rst)
address_space.register_region(axil_master, 0x8000_0000)
ctrl_regs = address_space.create_window(0x8000_0000, axil_master.size)
# DMA from DUT
axi_slave = AxiSlave(AxiBus.from_prefix(dut, "m_axi_dma"), dut.clk, dut.rst, target=address_space)
# exercise DUT DMA functionality
src_block = ram_pool.alloc_window(1024)
dst_block = ram_pool.alloc_window(1024)
test_data = b'test data'
await src_block.write(0, test_data)
await ctrl_regs.write_dword(DMA_SRC_ADDR, src_block.get_absolute_address(0))
await ctrl_regs.write_dword(DMA_DST_ADDR, dst_block.get_absolute_address(0))
await ctrl_regs.write_dword(DMA_LEN, len(test_data))
await ctrl_regs.write_dword(DMA_CONTROL, 1)
while await ctrl_regs.read_dword(DMA_STATUS) == 0:
pass
assert await dst_block.read(0, len(test_data)) == test_data
### AXI signals
* Write address channel

View File

@@ -26,14 +26,20 @@ from .version import __version__
from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp
from .address_space import MemoryInterface, Window, WindowPool
from .address_space import Region, MemoryRegion, PeripheralRegion
from .address_space import AddressSpace, Pool
from .axis import AxiStreamFrame, AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
from .axil_channels import AxiLiteAWBus, AxiLiteWBus, AxiLiteBBus, AxiLiteARBus, AxiLiteRBus
from .axil_channels import AxiLiteWriteBus, AxiLiteReadBus, AxiLiteBus
from .axil_master import AxiLiteMasterWrite, AxiLiteMasterRead, AxiLiteMaster
from .axil_slave import AxiLiteSlaveWrite, AxiLiteSlaveRead, AxiLiteSlave
from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam
from .axi_channels import AxiAWBus, AxiWBus, AxiBBus, AxiARBus, AxiRBus
from .axi_channels import AxiWriteBus, AxiReadBus, AxiBus
from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster
from .axi_slave import AxiSlaveWrite, AxiSlaveRead, AxiSlave
from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam

View File

@@ -0,0 +1,332 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import mmap
from .buddy_allocator import BuddyAllocator
from .utils import hexdump, hexdump_lines, hexdump_str
class MemoryInterface:
def __init__(self, size, base=0, parent=None, **kwargs):
self._parent = parent
self._size = size
self._base = base
self.window_type = Window
self.window_pool_type = WindowPool
super().__init__(**kwargs)
@property
def parent(self):
return self._parent
@property
def size(self):
return self._size
@property
def base(self):
return self._base
def check_range(self, address, length=0):
if address < 0 or address >= self.size:
raise ValueError("address out of range")
if length < 0:
raise ValueError("invalid length")
if address+length > self.size:
raise ValueError("operation out of range")
def get_absolute_address(self, address):
if self.base is None:
return None
self.check_range(address)
return address+self.base
async def _read(self, address, length, **kwargs):
raise NotImplementedError()
async def read(self, address, length, **kwargs):
self.check_range(address, length)
return await self._read(address, length, **kwargs)
async def read_words(self, address, count, byteorder='little', ws=2, **kwargs):
data = bytes(await self.read(address, count*ws, **kwargs))
words = []
for k in range(count):
words.append(int.from_bytes(data[ws*k:ws*(k+1)], byteorder))
return words
async def read_dwords(self, address, count, byteorder='little', **kwargs):
return await self.read_words(address, count, byteorder, 4, **kwargs)
async def read_qwords(self, address, count, byteorder='little', **kwargs):
return await self.read_words(address, count, byteorder, 8, **kwargs)
async def read_byte(self, address, **kwargs):
return (await self.read(address, 1, **kwargs)).data[0]
async def read_word(self, address, byteorder='little', ws=2, **kwargs):
return (await self.read_words(address, 1, byteorder, ws, **kwargs))[0]
async def read_dword(self, address, byteorder='little', **kwargs):
return (await self.read_dwords(address, 1, byteorder, **kwargs))[0]
async def read_qword(self, address, byteorder='little', **kwargs):
return (await self.read_qwords(address, 1, byteorder, **kwargs))[0]
async def _write(self, address, data, **kwargs):
raise NotImplementedError()
async def write(self, address, data, **kwargs):
self.check_range(address, len(data))
await self._write(address, data, **kwargs)
async def write_words(self, address, data, byteorder='little', ws=2, **kwargs):
words = data
data = bytearray()
for w in words:
data.extend(w.to_bytes(ws, byteorder))
await self.write(address, data, **kwargs)
async def write_dwords(self, address, data, byteorder='little', **kwargs):
await self.write_words(address, data, byteorder, 4, **kwargs)
async def write_qwords(self, address, data, byteorder='little', **kwargs):
await self.write_words(address, data, byteorder, 8, **kwargs)
async def write_byte(self, address, data, **kwargs):
await self.write(address, [data], **kwargs)
async def write_word(self, address, data, byteorder='little', ws=2, **kwargs):
await self.write_words(address, [data], byteorder, ws, **kwargs)
async def write_dword(self, address, data, byteorder='little', **kwargs):
await self.write_dwords(address, [data], byteorder, **kwargs)
async def write_qword(self, address, data, byteorder='little', **kwargs):
await self.write_qwords(address, [data], byteorder, **kwargs)
def create_window(self, offset, size=None, window_type=None):
if not size or size < 0:
size = self.size - offset
window_type = window_type or self.window_type or Window
self.check_range(offset, size)
return window_type(self, offset, size, base=self.get_absolute_address(offset))
def create_window_pool(self, offset=None, size=None, window_pool_type=None, window_type=None):
if offset is None:
offset = 0
if size is None:
size = self.size - offset
window_pool_type = window_pool_type or self.window_pool_type or WindowPool
window_type = window_type or self.window_type
self.check_range(offset, size)
return window_pool_type(self, offset, size, base=self.get_absolute_address(offset), window_type=window_type)
def __len__(self):
return self._size
class Window(MemoryInterface):
def __init__(self, parent, offset, size, base=0, **kwargs):
super().__init__(size, base=base, parent=parent, **kwargs)
self._offset = offset
@property
def offset(self):
return self._offset
def get_parent_address(self, address):
if address < 0 or address >= self.size:
raise ValueError("address out of range")
return address+self.offset
async def _read(self, address, length, **kwargs):
return await self.parent.read(self.get_parent_address(address), length, **kwargs)
async def _write(self, address, data, **kwargs):
await self.parent.write(self.get_parent_address(address), data, **kwargs)
class WindowPool(Window):
def __init__(self, parent, offset, size, base=None, window_type=None, **kwargs):
super().__init__(parent, offset, size, base=base, **kwargs)
self.window_type = window_type or Window
self.allocator = BuddyAllocator(size)
def alloc_window(self, size, window_type=None):
return self.create_window(self.allocator.alloc(size), size, window_type)
class Region(MemoryInterface):
def __init__(self, size, **kwargs):
super().__init__(size, **kwargs)
class MemoryRegion(Region):
def __init__(self, size=4096, mem=None, **kwargs):
super().__init__(size, **kwargs)
if mem is None:
mem = mmap.mmap(-1, size)
self.mem = mem
async def _read(self, address, length, **kwargs):
return self.mem[address:address+length]
async def _write(self, address, data, **kwargs):
self.mem[address:address+len(data)] = data
def hexdump(self, address, length, prefix=""):
hexdump(self.mem[address:address+length], prefix=prefix, offset=address)
def hexdump_lines(self, address, length, prefix=""):
return hexdump_lines(self.mem[address:address+length], prefix=prefix, offset=address)
def hexdump_str(self, address, length, prefix=""):
return hexdump_str(self.mem[address:address+length], prefix=prefix, offset=address)
def __getitem__(self, key):
return self.mem[key]
def __setitem__(self, key, value):
self.mem[key] = value
def __bytes__(self):
return bytes(self.mem)
class PeripheralRegion(Region):
def __init__(self, obj, size, **kwargs):
super().__init__(size, **kwargs)
self.obj = obj
async def _read(self, address, length, **kwargs):
try:
return await self.obj.read(address, length, **kwargs)
except TypeError:
return self.obj.read(address, length, **kwargs)
async def _write(self, address, data, **kwargs):
try:
await self.obj.write(address, data, **kwargs)
except TypeError:
self.obj.write(address, data, **kwargs)
class AddressSpace(Region):
def __init__(self, size=2**64, base=0, parent=None, **kwargs):
super().__init__(size=size, base=base, parent=parent, **kwargs)
self.pool_type = Pool
self.regions = []
def find_regions(self, address, length=1):
regions = []
if address < 0 or address >= self.size:
raise ValueError("address out of range")
if length < 0:
raise ValueError("invalid length")
length = max(length, 1)
for (base, size, translate, region) in self.regions:
if address < base+size and base < address+length:
regions.append((base, size, translate, region))
regions.sort()
return regions
def register_region(self, region, base, size=None, offset=0):
if size is None:
size = region.size
if self.find_regions(base, size):
raise ValueError("overlaps existing region")
region._parent = self
if offset == 0:
region._base = self.get_absolute_address(base)
else:
region._base = None
self.regions.append((base, size, offset, region))
async def read(self, address, length, **kwargs):
regions = self.find_regions(address, length)
data = bytearray()
if not regions:
raise Exception("Invalid address")
for base, size, offset, region in regions:
if base > address:
raise Exception("Invalid address")
seg_addr = address - base
seg_len = min(size-seg_addr, length)
if offset is None:
seg_addr = address
offset = 0
data.extend(bytes(await region.read(seg_addr+offset, seg_len, **kwargs)))
address += seg_len
length -= seg_len
if length > 0:
raise Exception("Invalid address")
return bytes(data)
async def write(self, address, data, **kwargs):
start = 0
length = len(data)
regions = self.find_regions(address, length)
if not regions:
raise Exception("Invalid address")
for base, size, offset, region in regions:
if base > address:
raise Exception("Invalid address")
seg_addr = address - base
seg_len = min(size-seg_addr, length)
if offset is None:
seg_addr = address
offset = 0
await region.write(seg_addr+offset, data[start:start+seg_len], **kwargs)
address += seg_len
start += seg_len
length -= seg_len
if length > 0:
raise Exception("Invalid address")
def create_pool(self, base=None, size=None, pool_type=None, region_type=None):
if base is None:
base = 0
if size is None:
size = self.size - base
pool_type = pool_type or self.pool_type or Pool
self.check_range(base, size)
pool = pool_type(self, base, size, region_type=region_type)
self.register_region(pool, base, size)
return pool
class Pool(AddressSpace):
def __init__(self, parent, base, size, region_type=None, **kwargs):
super().__init__(parent=parent, base=base, size=size, **kwargs)
self.region_type = region_type or MemoryRegion
self.allocator = BuddyAllocator(size)
def alloc_region(self, size, region_type=None):
region_type = region_type or self.region_type or MemoryRegion
base = self.allocator.alloc(size)
region = region_type(size)
self.register_region(region, base)
return region

View File

@@ -34,8 +34,8 @@ AxiAWBus, AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream
# Write data channel
AxiWBus, AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("AxiW",
signals=["wdata", "wstrb", "wlast", "wvalid", "wready"],
optional_signals=["wuser"],
signals=["wdata", "wlast", "wvalid", "wready"],
optional_signals=["wstrb", "wuser"],
signal_widths={"wlast": 1}
)

View File

@@ -23,7 +23,8 @@ THE SOFTWARE.
"""
import logging
from collections import namedtuple, Counter
from collections import Counter
from typing import List, NamedTuple, Union
import cocotb
from cocotb.queue import Queue
@@ -32,21 +33,78 @@ from cocotb.triggers import Event
from .version import __version__
from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp
from .axi_channels import AxiAWSource, AxiWSource, AxiBSink, AxiARSource, AxiRSink
from .address_space import Region
from .reset import Reset
# AXI master write helper objects
AxiWriteCmd = namedtuple("AxiWriteCmd", ["address", "data", "awid", "burst", "size",
"lock", "cache", "prot", "qos", "region", "user", "wuser", "event"])
AxiWriteRespCmd = namedtuple("AxiWriteRespCmd", ["address", "length", "size", "cycles",
"prot", "burst_list", "event"])
AxiWriteResp = namedtuple("AxiWriteResp", ["address", "length", "resp", "user"])
class AxiWriteCmd(NamedTuple):
address: int
data: bytes
awid: int
burst: AxiBurstType
size: int
lock: AxiLockType
cache: int
prot: AxiProt
qos: int
region: int
user: int
wuser: Union[list, int, None]
event: Event
class AxiWriteRespCmd(NamedTuple):
address: int
length: int
size: int
cycles: int
prot: AxiProt
burst_list: List[int]
event: Event
class AxiWriteResp(NamedTuple):
address: int
length: int
resp: AxiResp
user: Union[list, None]
# AXI master read helper objects
AxiReadCmd = namedtuple("AxiReadCmd", ["address", "length", "arid", "burst", "size",
"lock", "cache", "prot", "qos", "region", "user", "event"])
AxiReadRespCmd = namedtuple("AxiReadRespCmd", ["address", "length", "size", "cycles",
"prot", "burst_list", "event"])
AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
class AxiReadCmd(NamedTuple):
address: int
length: int
arid: int
burst: AxiBurstType
size: int
lock: AxiLockType
cache: int
prot: AxiProt
qos: int
region: int
user: int
event: Event
class AxiReadRespCmd(NamedTuple):
address: int
length: int
size: int
cycles: int
prot: AxiProt
burst_list: List[int]
event: Event
class AxiReadResp(NamedTuple):
address: int
data: bytes
resp: AxiResp
user: Union[list, None]
def __bytes__(self):
return self.data
class TagContext:
@@ -66,7 +124,7 @@ class TagContext:
def _start(self):
if self._cr is None:
self._cr = cocotb.fork(self._process_queue())
self._cr = cocotb.start_soon(self._process_queue())
def _flush(self):
flushed_cmds = []
@@ -135,8 +193,8 @@ class TagContextManager:
return flushed_cmds
class AxiMasterWrite(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
class AxiMasterWrite(Region, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
@@ -155,6 +213,7 @@ class AxiMasterWrite(Reset):
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = Queue()
self.write_command_queue.queue_occupancy_limit = 2
self.current_write_command = None
self.id_count = 2**len(self.aw_channel.bus.awid)
@@ -167,6 +226,8 @@ class AxiMasterWrite(Reset):
self._idle = Event()
self._idle.set()
self.address_width = len(self.aw_channel.bus.awaddr)
self.id_width = len(self.aw_channel.bus.awid)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
@@ -181,12 +242,15 @@ class AxiMasterWrite(Reset):
self.awqos_present = hasattr(self.bus.aw, "awqos")
self.awregion_present = hasattr(self.bus.aw, "awregion")
self.awuser_present = hasattr(self.bus.aw, "awuser")
self.wstrb_present = hasattr(self.bus.w, "wstrb")
self.wuser_present = hasattr(self.bus.w, "wuser")
self.buser_present = hasattr(self.bus.b, "buser")
super().__init__(2**self.address_width, **kwargs)
self.log.info("AXI master configuration:")
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" ID width: %d bits", self.id_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
@@ -201,7 +265,8 @@ class AxiMasterWrite(Reset):
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
if self.wstrb_present:
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
@@ -220,6 +285,12 @@ class AxiMasterWrite(Reset):
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if awid is None or awid < 0:
awid = None
elif awid > self.id_count:
@@ -263,12 +334,10 @@ class AxiMasterWrite(Reset):
else:
wuser = list(wuser)
self.in_flight_operations += 1
self._idle.clear()
data = bytes(data)
cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock,
cache, prot, qos, region, user, wuser, event)
self.write_command_queue.put_nowait(cmd)
cocotb.start_soon(self._write_wrapper(address, data, awid, burst, size,
lock, cache, prot, qos, region, user, wuser, event))
return event
@@ -281,46 +350,73 @@ class AxiMasterWrite(Reset):
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
event = self.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser)
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if awid is None or awid < 0:
awid = None
elif awid > self.id_count:
raise ValueError("Requested ID exceeds maximum ID allowed for ID signal width")
burst = AxiBurstType(burst)
if size is None or size < 0:
size = self.max_burst_size
elif size > self.max_burst_size:
raise ValueError("Requested burst size exceeds maximum burst size allowed for bus width")
lock = AxiLockType(lock)
prot = AxiProt(prot)
if not self.awlock_present and lock != AxiLockType.NORMAL:
raise ValueError("awlock sideband signal value specified, but signal is not connected")
if not self.awcache_present and cache != 0b0011:
raise ValueError("awcache sideband signal value specified, but signal is not connected")
if not self.awprot_present and prot != AxiProt.NONSECURE:
raise ValueError("awprot sideband signal value specified, but signal is not connected")
if not self.awqos_present and qos != 0:
raise ValueError("awqos sideband signal value specified, but signal is not connected")
if not self.awregion_present and region != 0:
raise ValueError("awregion sideband signal value specified, but signal is not connected")
if not self.awuser_present and user != 0:
raise ValueError("awuser sideband signal value specified, but signal is not connected")
if not self.wuser_present and wuser != 0:
raise ValueError("wuser sideband signal value specified, but signal is not connected")
if wuser is None:
wuser = 0
elif isinstance(wuser, int):
pass
else:
wuser = list(wuser)
event = Event()
data = bytes(data)
self.in_flight_operations += 1
self._idle.clear()
cmd = AxiWriteCmd(address, data, awid, burst, size, lock,
cache, prot, qos, region, user, wuser, event)
await self.write_command_queue.put(cmd)
await event.wait()
return event.data
async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
words = data
data = bytearray()
for w in words:
data.extend(w.to_bytes(ws, byteorder))
await self.write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write_words(address, data, byteorder, 4, awid, burst, size,
lock, cache, prot, qos, region, user, wuser)
async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write_words(address, data, byteorder, 8, awid, burst, size,
lock, cache, prot, qos, region, user, wuser)
async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write(address, [data], awid, burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write_words(address, [data], byteorder, ws, awid, burst, size,
lock, cache, prot, qos, region, user, wuser)
async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write_dwords(address, [data], byteorder, awid, burst, size,
lock, cache, prot, qos, region, user, wuser)
async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
await self.write_qwords(address, [data], byteorder, awid, burst, size,
lock, cache, prot, qos, region, user, wuser)
async def _write_wrapper(self, address, data, awid, burst, size,
lock, cache, prot, qos, region, user, wuser, event):
event.set(await self.write(address, data, awid, burst, size,
lock, cache, prot, qos, region, user, wuser))
def _handle_reset(self, state):
if state:
@@ -361,9 +457,9 @@ class AxiMasterWrite(Reset):
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self._process_write_cr = cocotb.start_soon(self._process_write())
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self._process_write_resp_cr = cocotb.start_soon(self._process_write_resp())
async def _process_write(self):
while True:
@@ -380,7 +476,7 @@ class AxiMasterWrite(Reset):
cycles = (len(cmd.data) + (cmd.address % num_bytes) + num_bytes-1) // num_bytes
cur_addr = aligned_addr
cur_addr = cmd.address
offset = 0
cycle_offset = aligned_addr-word_addr
n = 0
@@ -397,8 +493,9 @@ class AxiMasterWrite(Reset):
wuser = cmd.wuser
self.log.info("Write start addr: 0x%08x awid: 0x%x prot: %s data: %s",
cmd.address, awid, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write start addr: 0x%08x awid: 0x%x prot: %s data: %s",
cmd.address, awid, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
for k in range(cycles):
start = cycle_offset
@@ -444,10 +541,13 @@ class AxiMasterWrite(Reset):
await self.aw_channel.send(aw)
self.log.info("Write burst start awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
awid, cur_addr, burst_length-1, cmd.size, cmd.prot)
awid, cur_addr, burst_length-1, cmd.size, cmd.prot)
n += 1
if not self.wstrb_present and strb != self.strb_mask:
self.log.warning("Partial operation requested with wstrb not connected, write will be zero-padded (0x%x != 0x%x)", strb, self.strb_mask)
w = self.w_channel._transaction_obj()
w.wdata = val
w.wstrb = strb
@@ -463,7 +563,10 @@ class AxiMasterWrite(Reset):
await self.w_channel.send(w)
cur_addr += num_bytes
if k == 0:
cur_addr = aligned_addr + num_bytes
else:
cur_addr += num_bytes
cycle_offset = (cycle_offset + num_bytes) % self.byte_lanes
resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event)
@@ -475,10 +578,9 @@ class AxiMasterWrite(Reset):
while True:
b = await self.b_channel.recv()
bid = int(b.bid)
bid = int(getattr(b, 'bid', 0))
if self.active_id[bid] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
assert self.active_id[bid] > 0, "unexpected burst ID"
self.tag_context_manager.put_resp(bid, b)
@@ -491,8 +593,8 @@ class AxiMasterWrite(Reset):
for burst_length in cmd.burst_list:
b = await context.get_resp()
burst_resp = AxiResp(b.bresp)
burst_user = int(b.buser)
burst_resp = AxiResp(int(getattr(b, 'bresp', AxiResp.OKAY)))
burst_user = int(getattr(b, 'buser', 0))
if burst_resp != AxiResp.OKAY:
resp = burst_resp
@@ -500,8 +602,7 @@ class AxiMasterWrite(Reset):
if burst_user is not None:
user.append(burst_user)
if self.active_id[bid] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
assert self.active_id[bid] > 0, "unexpected burst ID"
self.active_id[bid] -= 1
@@ -511,7 +612,7 @@ class AxiMasterWrite(Reset):
user = None
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
cmd.address, cmd.prot, resp, cmd.length)
cmd.address, cmd.prot, resp, cmd.length)
write_resp = AxiWriteResp(cmd.address, cmd.length, resp, user)
@@ -523,8 +624,8 @@ class AxiMasterWrite(Reset):
self._idle.set()
class AxiMasterRead(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
class AxiMasterRead(Region, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
@@ -541,6 +642,7 @@ class AxiMasterRead(Reset):
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = Queue()
self.read_command_queue.queue_occupancy_limit = 2
self.current_read_command = None
self.id_count = 2**len(self.ar_channel.bus.arid)
@@ -553,6 +655,8 @@ class AxiMasterRead(Reset):
self._idle = Event()
self._idle.set()
self.address_width = len(self.ar_channel.bus.araddr)
self.id_width = len(self.ar_channel.bus.arid)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
@@ -568,9 +672,11 @@ class AxiMasterRead(Reset):
self.aruser_present = hasattr(self.bus.ar, "aruser")
self.ruser_present = hasattr(self.bus.r, "ruser")
super().__init__(2**self.address_width, **kwargs)
self.log.info("AXI master configuration:")
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" ID width: %d bits", self.id_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
@@ -603,6 +709,9 @@ class AxiMasterRead(Reset):
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if length < 0:
raise ValueError("Read length must be positive")
@@ -639,11 +748,8 @@ class AxiMasterRead(Reset):
if not self.aruser_present and user != 0:
raise ValueError("aruser sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
self.read_command_queue.put_nowait(cmd)
cocotb.start_soon(self._read_wrapper(address, length, arid, burst, size,
lock, cache, prot, qos, region, user, event))
return event
@@ -656,46 +762,61 @@ class AxiMasterRead(Reset):
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
event = self.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user)
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if length < 0:
raise ValueError("Read length must be positive")
if arid is None or arid < 0:
arid = None
elif arid > self.id_count:
raise ValueError("Requested ID exceeds maximum ID allowed for ID signal width")
burst = AxiBurstType(burst)
if size is None or size < 0:
size = self.max_burst_size
elif size > self.max_burst_size:
raise ValueError("Requested burst size exceeds maximum burst size allowed for bus width")
lock = AxiLockType(lock)
prot = AxiProt(prot)
if not self.arlock_present and lock != AxiLockType.NORMAL:
raise ValueError("arlock sideband signal value specified, but signal is not connected")
if not self.arcache_present and cache != 0b0011:
raise ValueError("arcache sideband signal value specified, but signal is not connected")
if not self.arprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
if not self.arqos_present and qos != 0:
raise ValueError("arqos sideband signal value specified, but signal is not connected")
if not self.arregion_present and region != 0:
raise ValueError("arregion sideband signal value specified, but signal is not connected")
if not self.aruser_present and user != 0:
raise ValueError("aruser sideband signal value specified, but signal is not connected")
event = Event()
self.in_flight_operations += 1
self._idle.clear()
cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
await self.read_command_queue.put(cmd)
await event.wait()
return event.data
async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
data = await self.read(address, count*ws, arid, burst, size, lock, cache, prot, qos, region, user)
words = []
for k in range(count):
words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder))
return words
async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_words(address, count, byteorder, 4, arid, burst, size,
lock, cache, prot, qos, region, user)
async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_words(address, count, byteorder, 8, arid, burst, size,
lock, cache, prot, qos, region, user)
async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return (await self.read(address, 1, arid, burst, size, lock, cache, prot, qos, region, user)).data[0]
async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return (await self.read_words(address, 1, byteorder, ws, arid, burst, size,
lock, cache, prot, qos, region, user))[0]
async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return (await self.read_dwords(address, 1, byteorder, arid, burst, size,
lock, cache, prot, qos, region, user))[0]
async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return (await self.read_qwords(address, 1, byteorder, arid, burst, size,
lock, cache, prot, qos, region, user))[0]
async def _read_wrapper(self, address, length, arid, burst, size,
lock, cache, prot, qos, region, user, event):
event.set(await self.read(address, length, arid, burst, size,
lock, cache, prot, qos, region, user))
def _handle_reset(self, state):
if state:
@@ -735,9 +856,9 @@ class AxiMasterRead(Reset):
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self._process_read_cr = cocotb.start_soon(self._process_read())
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self._process_read_resp_cr = cocotb.start_soon(self._process_read_resp())
async def _process_read(self):
while True:
@@ -752,7 +873,7 @@ class AxiMasterRead(Reset):
burst_list = []
cur_addr = aligned_addr
cur_addr = cmd.address
n = 0
burst_length = 0
@@ -778,7 +899,7 @@ class AxiMasterRead(Reset):
burst_list.append(burst_length)
ar = self.r_channel._transaction_obj()
ar = self.ar_channel._transaction_obj()
ar.arid = arid
ar.araddr = cur_addr
ar.arlen = burst_length-1
@@ -795,9 +916,12 @@ class AxiMasterRead(Reset):
await self.ar_channel.send(ar)
self.log.info("Read burst start arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
arid, cur_addr, burst_length-1, cmd.size, cmd.prot)
arid, cur_addr, burst_length-1, cmd.size, cmd.prot)
cur_addr += num_bytes
if k == 0:
cur_addr = aligned_addr + num_bytes
else:
cur_addr += num_bytes
resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.tag_context_manager.start_cmd(arid, resp_cmd)
@@ -805,27 +929,14 @@ class AxiMasterRead(Reset):
self.current_read_command = None
async def _process_read_resp(self):
burst = []
cur_rid = None
while True:
r = await self.r_channel.recv()
rid = int(r.rid)
rid = int(getattr(r, 'rid', 0))
if cur_rid is not None and cur_rid != rid:
raise Exception(f"ID not constant within burst (expected {cur_rid}, got {rid})")
assert self.active_id[rid] > 0, "unexpected burst ID"
if self.active_id[rid] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
burst.append(r)
cur_rid = rid
if int(r.rlast):
self.tag_context_manager.put_resp(rid, burst)
burst = []
cur_rid = None
self.tag_context_manager.put_resp(rid, r)
async def _process_read_resp_id(self, context, cmd):
rid = context.current_tag
@@ -846,15 +957,19 @@ class AxiMasterRead(Reset):
first = True
for burst_length in cmd.burst_list:
burst = await context.get_resp()
for k in range(burst_length):
r = await context.get_resp()
if len(burst) != burst_length:
raise Exception(f"Burst length incorrect (ID {rid}, expected {burst_length}, got {len(burst)}")
assert self.active_id[rid] > 0, "unexpected burst ID"
if k == burst_length-1:
assert int(r.rlast), "missing rlast at end of burst"
else:
assert not int(r.rlast), "unexpected rlast within burst"
for r in burst:
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)
cycle_user = int(r.ruser)
cycle_resp = AxiResp(int(getattr(r, "rresp", AxiResp.OKAY)))
cycle_user = int(getattr(r, "ruser", 0))
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
@@ -884,10 +999,11 @@ class AxiMasterRead(Reset):
if not self.ruser_present:
user = None
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
read_resp = AxiReadResp(cmd.address, data, resp, user)
read_resp = AxiReadResp(cmd.address, bytes(data), resp, user)
cmd.event.set(read_resp)
@@ -897,13 +1013,15 @@ class AxiMasterRead(Reset):
self._idle.set()
class AxiMaster:
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
class AxiMaster(Region):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
self.write_if = None
self.read_if = None
self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len)
self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len)
self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len, **kwargs)
self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len, **kwargs)
super().__init__(max(self.write_if.size, self.read_if.size), **kwargs)
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
@@ -932,77 +1050,7 @@ class AxiMaster:
return await self.read_if.read(address, length, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_words(address, count, byteorder, ws, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_dwords(address, count, byteorder, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_qwords(address, count, byteorder, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_byte(address, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_word(address, byteorder, ws, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_dword(address, byteorder, arid,
burst, size, lock, cache, prot, qos, region, user)
async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read_qword(address, byteorder, arid,
burst, size, lock, cache, prot, qos, region, user)
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write(address, data, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_words(address, data, byteorder, ws, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_dwords(address, data, byteorder, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_qwords(address, data, byteorder, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_byte(address, data, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_word(address, data, byteorder, ws, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_dword(address, data, byteorder, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)
async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
return await self.write_if.write_qword(address, data, byteorder, awid,
burst, size, lock, cache, prot, qos, region, user, wuser)

View File

@@ -1,6 +1,6 @@
"""
Copyright (c) 2020 Alex Forencich
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -22,278 +22,32 @@ THE SOFTWARE.
"""
import logging
import cocotb
from .version import __version__
from .constants import AxiBurstType, AxiProt, AxiResp
from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource
from .axi_slave import AxiSlaveWrite, AxiSlaveRead
from .memory import Memory
from .reset import Reset
class AxiRamWrite(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
class AxiRamWrite(AxiSlaveWrite, Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
self.log.info("AXI RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
async def _write(self, address, data):
self.write(address % self.size, data)
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
class AxiRamRead(AxiSlaveRead, Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.log.info("AXI RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI RAM model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
self._process_write_cr = None
self._init_reset(reset, reset_active_level)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
awid = int(aw.awid)
addr = int(aw.awaddr)
length = int(aw.awlen)
size = int(aw.awsize)
burst = int(aw.awburst)
prot = AxiProt(int(aw.awprot))
self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
awid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
transfer_size = num_bytes*length
if burst == AxiBurstType.WRAP:
lower_wrap_boundary = (addr // transfer_size) * transfer_size
upper_wrap_boundary = lower_wrap_boundary + transfer_size
if burst == AxiBurstType.INCR:
# check 4k boundary crossing
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
cur_addr = aligned_addr
for n in range(length):
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
w = await self.w_channel.recv()
data = int(w.wdata)
strb = int(w.wstrb)
last = int(w.wlast)
# todo latency
self.mem.seek(cur_word_addr % self.size)
data = data.to_bytes(self.byte_lanes, 'little')
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_lanes):
if strb & (1 << i):
self.mem.write(data[i:i+1])
else:
self.mem.seek(1, 1)
assert last == (n == length-1)
if burst != AxiBurstType.FIXED:
cur_addr += num_bytes
if burst == AxiBurstType.WRAP:
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
b = self.b_channel._transaction_obj()
b.bid = awid
b.bresp = AxiResp.OKAY
await self.b_channel.send(b)
class AxiRamRead(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.log.info("AXI RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI RAM model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
self._process_read_cr = None
self._init_reset(reset, reset_active_level)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
arid = int(ar.arid)
addr = int(ar.araddr)
length = int(ar.arlen)
size = int(ar.arsize)
burst = int(ar.arburst)
prot = AxiProt(ar.arprot)
self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
arid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
transfer_size = num_bytes*length
if burst == AxiBurstType.WRAP:
lower_wrap_boundary = (addr // transfer_size) * transfer_size
upper_wrap_boundary = lower_wrap_boundary + transfer_size
if burst == AxiBurstType.INCR:
# check 4k boundary crossing
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
cur_addr = aligned_addr
for n in range(length):
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
self.mem.seek(cur_word_addr % self.size)
data = self.mem.read(self.byte_lanes)
r = self.r_channel._transaction_obj()
r.rid = arid
r.rdata = int.from_bytes(data, 'little')
r.rlast = n == length-1
r.rresp = AxiResp.OKAY
await self.r_channel.send(r)
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))
if burst != AxiBurstType.FIXED:
cur_addr += num_bytes
if burst == AxiBurstType.WRAP:
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
async def _read(self, address, length):
return self.read(address % self.size, length)
class AxiRam(Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(size, mem, *args, **kwargs)
super().__init__(size, mem, **kwargs)
self.write_if = AxiRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem)
self.read_if = AxiRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem)

341
cocotbext/axi/axi_slave.py Normal file
View File

@@ -0,0 +1,341 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import logging
import cocotb
from .version import __version__
from .constants import AxiBurstType, AxiProt, AxiResp
from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource
from .reset import Reset
class AxiSlaveWrite(Reset):
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.target = target
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI slave model (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2021 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(**kwargs)
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.address_width = len(self.aw_channel.bus.awaddr)
self.id_width = len(self.aw_channel.bus.awid)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.max_burst_size = (self.byte_lanes-1).bit_length()
self.wstrb_present = hasattr(self.bus.w, "wstrb")
self.log.info("AXI slave model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" ID width: %d bits", self.id_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI slave model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
if self.wstrb_present:
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
self._process_write_cr = None
self._init_reset(reset, reset_active_level)
async def _write(self, address, data):
await self.target.write(address, data)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.start_soon(self._process_write())
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
awid = int(getattr(aw, 'awid', 0))
addr = int(aw.awaddr)
length = int(getattr(aw, 'awlen', 0))
size = int(getattr(aw, 'awsize', self.max_burst_size))
burst = AxiBurstType(int(getattr(aw, 'awburst', AxiBurstType.INCR)))
prot = AxiProt(int(getattr(aw, 'awprot', AxiProt.NONSECURE)))
self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
awid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
transfer_size = num_bytes*length
if burst == AxiBurstType.WRAP:
lower_wrap_boundary = (addr // transfer_size) * transfer_size
upper_wrap_boundary = lower_wrap_boundary + transfer_size
if burst == AxiBurstType.INCR:
# check 4k boundary crossing
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
cur_addr = aligned_addr
b = self.b_channel._transaction_obj()
b.bid = awid
b.bresp = AxiResp.OKAY
for n in range(length):
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
w = await self.w_channel.recv()
data = int(w.wdata)
if self.wstrb_present:
strb = int(getattr(w, 'wstrb', self.strb_mask))
else:
strb = self.strb_mask
last = int(w.wlast)
# generate operation list
offset = 0
start_offset = None
write_ops = []
data = data.to_bytes(self.byte_lanes, 'little')
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_lanes):
if strb & (1 << i):
if start_offset is None:
start_offset = offset
else:
if start_offset is not None and offset != start_offset:
write_ops.append((cur_word_addr+start_offset, data[start_offset:offset]))
start_offset = None
offset += 1
if start_offset is not None and offset != start_offset:
write_ops.append((cur_word_addr+start_offset, data[start_offset:offset]))
# perform writes
try:
for addr, data in write_ops:
await self._write(addr, data)
except Exception:
self.log.warning("Write operation failed")
b.bresp = AxiResp.SLVERR
assert last == (n == length-1)
if burst != AxiBurstType.FIXED:
cur_addr += num_bytes
if burst == AxiBurstType.WRAP:
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
await self.b_channel.send(b)
class AxiSlaveRead(Reset):
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.target = target
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI slave model (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2021 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(**kwargs)
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.address_width = len(self.ar_channel.bus.araddr)
self.id_width = len(self.ar_channel.bus.arid)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.max_burst_size = (self.byte_lanes-1).bit_length()
self.log.info("AXI slave model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" ID width: %d bits", self.id_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI slave model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
self._process_read_cr = None
self._init_reset(reset, reset_active_level)
async def _read(self, address, length):
return await self.target.read(address, length)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.start_soon(self._process_read())
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
arid = int(getattr(ar, 'arid', 0))
addr = int(ar.araddr)
length = int(getattr(ar, 'arlen', 0))
size = int(getattr(ar, 'arsize', self.max_burst_size))
burst = AxiBurstType(int(getattr(ar, 'arburst', AxiBurstType.INCR)))
prot = AxiProt(int(getattr(ar, 'arprot', AxiProt.NONSECURE)))
self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
arid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
transfer_size = num_bytes*length
if burst == AxiBurstType.WRAP:
lower_wrap_boundary = (addr // transfer_size) * transfer_size
upper_wrap_boundary = lower_wrap_boundary + transfer_size
if burst == AxiBurstType.INCR:
# check 4k boundary crossing
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
cur_addr = aligned_addr
for n in range(length):
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
r = self.r_channel._transaction_obj()
r.rid = arid
r.rlast = n == length-1
r.rresp = AxiResp.OKAY
try:
data = await self._read(cur_word_addr, self.byte_lanes)
except Exception:
self.log.warning("Read operation failed")
data = bytes(self.byte_lanes)
r.rresp = AxiResp.SLVERR
r.rdata = int.from_bytes(data, 'little')
await self.r_channel.send(r)
if self.log.isEnabledFor(logging.DEBUG):
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))
if burst != AxiBurstType.FIXED:
cur_addr += num_bytes
if burst == AxiBurstType.WRAP:
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
class AxiSlave:
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(**kwargs)
self.write_if = AxiSlaveWrite(bus.write, clock, reset, target, reset_active_level)
self.read_if = AxiSlaveRead(bus.read, clock, reset, target, reset_active_level)

View File

@@ -33,7 +33,8 @@ AxiLiteAWBus, AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMon
# Write data channel
AxiLiteWBus, AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor = define_stream("AxiLiteW",
signals=["wdata", "wstrb", "wvalid", "wready"]
signals=["wdata", "wvalid", "wready"],
optional_signals=["wstrb"]
)
# Write response channel

View File

@@ -23,7 +23,7 @@ THE SOFTWARE.
"""
import logging
from collections import namedtuple
from typing import NamedTuple
import cocotb
from cocotb.queue import Queue
@@ -32,21 +32,59 @@ from cocotb.triggers import Event
from .version import __version__
from .constants import AxiProt, AxiResp
from .axil_channels import AxiLiteAWSource, AxiLiteWSource, AxiLiteBSink, AxiLiteARSource, AxiLiteRSink
from .address_space import Region
from .reset import Reset
# AXI lite master write
AxiLiteWriteCmd = namedtuple("AxiLiteWriteCmd", ["address", "data", "prot", "event"])
AxiLiteWriteRespCmd = namedtuple("AxiLiteWriteRespCmd", ["address", "length", "cycles", "prot", "event"])
AxiLiteWriteResp = namedtuple("AxiLiteWriteResp", ["address", "length", "resp"])
# AXI lite master read
AxiLiteReadCmd = namedtuple("AxiLiteReadCmd", ["address", "length", "prot", "event"])
AxiLiteReadRespCmd = namedtuple("AxiLiteReadRespCmd", ["address", "length", "cycles", "prot", "event"])
AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
# AXI lite master write helper objects
class AxiLiteWriteCmd(NamedTuple):
address: int
data: bytes
prot: AxiProt
event: Event
class AxiLiteMasterWrite(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True):
class AxiLiteWriteRespCmd(NamedTuple):
address: int
length: int
cycles: int
prot: AxiProt
event: Event
class AxiLiteWriteResp(NamedTuple):
address: int
length: int
resp: AxiResp
# AXI lite master read helper objects
class AxiLiteReadCmd(NamedTuple):
address: int
length: int
prot: AxiProt
event: Event
class AxiLiteReadRespCmd(NamedTuple):
address: int
length: int
cycles: int
prot: AxiProt
event: Event
class AxiLiteReadResp(NamedTuple):
address: int
data: bytes
resp: AxiResp
def __bytes__(self):
return self.data
class AxiLiteMasterWrite(Region, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
@@ -65,6 +103,7 @@ class AxiLiteMasterWrite(Reset):
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = Queue()
self.write_command_queue.queue_occupancy_limit = 2
self.current_write_command = None
self.int_write_resp_command_queue = Queue()
@@ -74,15 +113,19 @@ class AxiLiteMasterWrite(Reset):
self._idle = Event()
self._idle.set()
self.address_width = len(self.aw_channel.bus.awaddr)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.awprot_present = hasattr(self.bus.aw, "awprot")
self.wstrb_present = hasattr(self.bus.w, "wstrb")
super().__init__(2**self.address_width, **kwargs)
self.log.info("AXI lite master configuration:")
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
@@ -94,7 +137,8 @@ class AxiLiteMasterWrite(Reset):
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
if self.wstrb_present:
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
self._process_write_cr = None
@@ -109,13 +153,18 @@ class AxiLiteMasterWrite(Reset):
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if not self.awprot_present and prot != AxiProt.NONSECURE:
raise ValueError("awprot sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
data = bytes(data)
self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event))
cocotb.start_soon(self._write_wrapper(address, bytes(data), prot, event))
return event
@@ -127,34 +176,27 @@ class AxiLiteMasterWrite(Reset):
await self._idle.wait()
async def write(self, address, data, prot=AxiProt.NONSECURE):
event = self.init_write(address, data, prot)
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if not self.awprot_present and prot != AxiProt.NONSECURE:
raise ValueError("awprot sideband signal value specified, but signal is not connected")
event = Event()
data = bytes(data)
self.in_flight_operations += 1
self._idle.clear()
await self.write_command_queue.put(AxiLiteWriteCmd(address, data, prot, event))
await event.wait()
return event.data
async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
words = data
data = bytearray()
for w in words:
data.extend(w.to_bytes(ws, byteorder))
await self.write(address, data, prot)
async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
await self.write_words(address, data, byteorder, 4, prot)
async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
await self.write_words(address, data, byteorder, 8, prot)
async def write_byte(self, address, data, prot=AxiProt.NONSECURE):
await self.write(address, [data], prot)
async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
await self.write_words(address, [data], byteorder, ws, prot)
async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
await self.write_dwords(address, [data], byteorder, prot)
async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
await self.write_qwords(address, [data], byteorder, prot)
async def _write_wrapper(self, address, data, prot, event):
event.set(await self.write(address, data, prot))
def _handle_reset(self, state):
if state:
@@ -198,9 +240,9 @@ class AxiLiteMasterWrite(Reset):
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self._process_write_cr = cocotb.start_soon(self._process_write())
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self._process_write_resp_cr = cocotb.start_soon(self._process_write_resp())
async def _process_write(self):
while True:
@@ -222,8 +264,9 @@ class AxiLiteMasterWrite(Reset):
offset = 0
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
for k in range(cycles):
start = 0
@@ -243,9 +286,15 @@ class AxiLiteMasterWrite(Reset):
offset += 1
aw = self.aw_channel._transaction_obj()
aw.awaddr = word_addr + k*self.byte_lanes
if k == 0:
aw.awaddr = cmd.address
else:
aw.awaddr = word_addr + k*self.byte_lanes
aw.awprot = cmd.prot
if not self.wstrb_present and strb != self.strb_mask:
self.log.warning("Partial operation requested with wstrb not connected, write will be zero-padded (0x%x != 0x%x)", strb, self.strb_mask)
w = self.w_channel._transaction_obj()
w.wdata = val
w.wstrb = strb
@@ -265,13 +314,13 @@ class AxiLiteMasterWrite(Reset):
for k in range(cmd.cycles):
b = await self.b_channel.recv()
cycle_resp = AxiResp(b.bresp)
cycle_resp = AxiResp(int(getattr(b, 'bresp', AxiResp.OKAY)))
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
cmd.address, cmd.prot, resp, cmd.length)
cmd.address, cmd.prot, resp, cmd.length)
write_resp = AxiLiteWriteResp(cmd.address, cmd.length, resp)
@@ -285,8 +334,8 @@ class AxiLiteMasterWrite(Reset):
self._idle.set()
class AxiLiteMasterRead(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True):
class AxiLiteMasterRead(Region, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
@@ -303,6 +352,7 @@ class AxiLiteMasterRead(Reset):
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = Queue()
self.read_command_queue.queue_occupancy_limit = 2
self.current_read_command = None
self.int_read_resp_command_queue = Queue()
@@ -312,14 +362,17 @@ class AxiLiteMasterRead(Reset):
self._idle = Event()
self._idle.set()
self.address_width = len(self.ar_channel.bus.araddr)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.arprot_present = hasattr(self.bus.ar, "arprot")
super().__init__(2**self.address_width, **kwargs)
self.log.info("AXI lite master configuration:")
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
@@ -345,13 +398,13 @@ class AxiLiteMasterRead(Reset):
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if not self.arprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
self.read_command_queue.put_nowait(AxiLiteReadCmd(address, length, prot, event))
cocotb.start_soon(self._read_wrapper(address, length, prot, event))
return event
@@ -363,34 +416,24 @@ class AxiLiteMasterRead(Reset):
await self._idle.wait()
async def read(self, address, length, prot=AxiProt.NONSECURE):
event = self.init_read(address, length, prot)
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if not self.arprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
event = Event()
self.in_flight_operations += 1
self._idle.clear()
await self.read_command_queue.put(AxiLiteReadCmd(address, length, prot, event))
await event.wait()
return event.data
async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
data = await self.read(address, count*ws, prot)
words = []
for k in range(count):
words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder))
return words
async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_words(address, count, byteorder, 4, prot)
async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_words(address, count, byteorder, 8, prot)
async def read_byte(self, address, prot=AxiProt.NONSECURE):
return (await self.read(address, 1, prot)).data[0]
async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
return (await self.read_words(address, 1, byteorder, ws, prot))[0]
async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
return (await self.read_dwords(address, 1, byteorder, prot))[0]
async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
return (await self.read_qwords(address, 1, byteorder, prot))[0]
async def _read_wrapper(self, address, length, prot, event):
event.set(await self.read(address, length, prot))
def _handle_reset(self, state):
if state:
@@ -433,9 +476,9 @@ class AxiLiteMasterRead(Reset):
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self._process_read_cr = cocotb.start_soon(self._process_read())
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self._process_read_resp_cr = cocotb.start_soon(self._process_read_resp())
async def _process_read(self):
while True:
@@ -450,11 +493,14 @@ class AxiLiteMasterRead(Reset):
await self.int_read_resp_command_queue.put(resp_cmd)
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
cmd.address, cmd.prot, cmd.length)
cmd.address, cmd.prot, cmd.length)
for k in range(cycles):
ar = self.ar_channel._transaction_obj()
ar.araddr = word_addr + k*self.byte_lanes
if k == 0:
ar.araddr = cmd.address
else:
ar.araddr = word_addr + k*self.byte_lanes
ar.arprot = cmd.prot
await self.ar_channel.send(ar)
@@ -477,7 +523,7 @@ class AxiLiteMasterRead(Reset):
r = await self.r_channel.recv()
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)
cycle_resp = AxiResp(int(getattr(r, 'rresp', AxiResp.OKAY)))
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
@@ -493,10 +539,11 @@ class AxiLiteMasterRead(Reset):
for j in range(start, stop):
data.extend(bytearray([(cycle_data >> j*8) & 0xff]))
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
read_resp = AxiLiteReadResp(cmd.address, data, resp)
read_resp = AxiLiteReadResp(cmd.address, bytes(data), resp)
cmd.event.set(read_resp)
@@ -508,13 +555,15 @@ class AxiLiteMasterRead(Reset):
self._idle.set()
class AxiLiteMaster:
def __init__(self, bus, clock, reset=None, reset_active_level=True):
class AxiLiteMaster(Region):
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
self.write_if = None
self.read_if = None
self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level)
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level)
self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level, **kwargs)
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level, **kwargs)
super().__init__(max(self.write_if.size, self.read_if.size), **kwargs)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
return self.read_if.init_read(address, length, prot, event)
@@ -539,47 +588,5 @@ class AxiLiteMaster:
async def read(self, address, length, prot=AxiProt.NONSECURE):
return await self.read_if.read(address, length, prot)
async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
return await self.read_if.read_words(address, count, byteorder, ws, prot)
async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_if.read_dwords(address, count, byteorder, prot)
async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_if.read_qwords(address, count, byteorder, prot)
async def read_byte(self, address, prot=AxiProt.NONSECURE):
return await self.read_if.read_byte(address, prot)
async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
return await self.read_if.read_word(address, byteorder, ws, prot)
async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_if.read_dword(address, byteorder, prot)
async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
return await self.read_if.read_qword(address, byteorder, prot)
async def write(self, address, data, prot=AxiProt.NONSECURE):
return await self.write_if.write(address, data, prot)
async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
return await self.write_if.write_words(address, data, byteorder, ws, prot)
async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
return await self.write_if.write_dwords(address, data, byteorder, prot)
async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
return await self.write_if.write_qwords(address, data, byteorder, prot)
async def write_byte(self, address, data, prot=AxiProt.NONSECURE):
return await self.write_if.write_byte(address, data, prot)
async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
return await self.write_if.write_word(address, data, byteorder, ws, prot)
async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
return await self.write_if.write_dword(address, data, byteorder, prot)
async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
return await self.write_if.write_qword(address, data, byteorder, prot)

View File

@@ -1,6 +1,6 @@
"""
Copyright (c) 2020 Alex Forencich
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -22,198 +22,32 @@ THE SOFTWARE.
"""
import logging
import cocotb
from .version import __version__
from .constants import AxiProt, AxiResp
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
from .axil_slave import AxiLiteSlaveWrite, AxiLiteSlaveRead
from .memory import Memory
from .reset import Reset
class AxiLiteRamWrite(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
class AxiLiteRamWrite(AxiLiteSlaveWrite, Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
self.log.info("AXI lite RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.log.info("AXI lite RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI lite RAM model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
self._process_write_cr = None
self._init_reset(reset, reset_active_level)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(aw.awprot)
w = await self.w_channel.recv()
data = int(w.wdata)
strb = int(w.wstrb)
# todo latency
self.mem.seek(addr % self.size)
data = data.to_bytes(self.byte_lanes, 'little')
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_lanes):
if strb & (1 << i):
self.mem.write(data[i:i+1])
else:
self.mem.seek(1, 1)
b = self.b_channel._transaction_obj()
b.bresp = AxiResp.OKAY
await self.b_channel.send(b)
async def _write(self, address, data):
self.write(address % self.size, data)
class AxiLiteRamRead(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
class AxiLiteRamRead(AxiLiteSlaveRead, Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
self.log.info("AXI lite RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.log.info("AXI lite RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI lite RAM model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
self._process_read_cr = None
self._init_reset(reset, reset_active_level)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(ar.arprot)
# todo latency
self.mem.seek(addr % self.size)
data = self.mem.read(self.byte_lanes)
r = self.r_channel._transaction_obj()
r.rdata = int.from_bytes(data, 'little')
r.rresp = AxiResp.OKAY
await self.r_channel.send(r)
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
addr, prot, ' '.join((f'{c:02x}' for c in data)))
async def _read(self, address, length):
return self.read(address % self.size, length)
class AxiLiteRam(Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(size, mem, *args, **kwargs)
super().__init__(size, mem, **kwargs)
self.write_if = AxiLiteRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem)
self.read_if = AxiLiteRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem)

253
cocotbext/axi/axil_slave.py Normal file
View File

@@ -0,0 +1,253 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import logging
import cocotb
from .version import __version__
from .constants import AxiProt, AxiResp
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
from .reset import Reset
class AxiLiteSlaveWrite(Reset):
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.target = target
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI lite slave model (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2021 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(**kwargs)
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.address_width = len(self.aw_channel.bus.awaddr)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.wstrb_present = hasattr(self.bus.w, "wstrb")
self.log.info("AXI lite slave model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI lite slave model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
if self.wstrb_present:
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
self._process_write_cr = None
self._init_reset(reset, reset_active_level)
async def _write(self, address, data):
await self.target.write(address, data)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.start_soon(self._process_write())
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(int(getattr(aw, 'awprot', AxiProt.NONSECURE)))
w = await self.w_channel.recv()
data = int(w.wdata)
if self.wstrb_present:
strb = int(getattr(w, 'wstrb', self.strb_mask))
else:
strb = self.strb_mask
# generate operation list
offset = 0
start_offset = None
write_ops = []
data = data.to_bytes(self.byte_lanes, 'little')
b = self.b_channel._transaction_obj()
b.bresp = AxiResp.OKAY
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_lanes):
if strb & (1 << i):
if start_offset is None:
start_offset = offset
else:
if start_offset is not None and offset != start_offset:
write_ops.append((addr+start_offset, data[start_offset:offset]))
start_offset = None
offset += 1
if start_offset is not None and offset != start_offset:
write_ops.append((addr+start_offset, data[start_offset:offset]))
# perform writes
try:
for addr, data in write_ops:
await self._write(addr, data)
except Exception:
self.log.warning("Write operation failed")
b.bresp = AxiResp.SLVERR
await self.b_channel.send(b)
class AxiLiteSlaveRead(Reset):
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.target = target
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI lite slave model (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2021 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(**kwargs)
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.address_width = len(self.ar_channel.bus.araddr)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.log.info("AXI lite slave model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("AXI lite slave model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
self._process_read_cr = None
self._init_reset(reset, reset_active_level)
async def _read(self, address, length):
return await self.target.read(address, length)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.start_soon(self._process_read())
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(int(getattr(ar, 'arprot', AxiProt.NONSECURE)))
r = self.r_channel._transaction_obj()
r.rresp = AxiResp.OKAY
try:
data = await self._read(addr, self.byte_lanes)
except Exception:
self.log.warning("Read operation failed")
data = bytes(self.byte_lanes)
r.rresp = AxiResp.SLVERR
r.rdata = int.from_bytes(data, 'little')
await self.r_channel.send(r)
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
addr, prot, ' '.join((f'{c:02x}' for c in data)))
class AxiLiteSlave:
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(**kwargs)
self.write_if = AxiLiteSlaveWrite(bus.write, clock, reset, target, reset_active_level)
self.read_if = AxiLiteSlaveRead(bus.read, clock, reset, target, reset_active_level)

View File

@@ -282,12 +282,13 @@ class AxiStreamBase(Reset):
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.wake_event = Event()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata)
self.byte_lanes = 1
self.byte_lanes = self.width // 8
if self._valid_init is not None and hasattr(self.bus, "tvalid"):
self.bus.tvalid.setimmediatevalue(self._valid_init)
@@ -366,7 +367,7 @@ class AxiStreamBase(Reset):
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self._run_cr = cocotb.start_soon(self._run())
async def _run(self):
raise NotImplementedError()
@@ -376,10 +377,23 @@ class AxiStreamPause:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pause = False
self._pause = False
self._pause_generator = None
self._pause_cr = None
def _pause_update(self, val):
pass
@property
def pause(self):
return self._pause
@pause.setter
def pause(self, val):
if self._pause != val:
self._pause_update(val)
self._pause = val
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
@@ -388,15 +402,17 @@ class AxiStreamPause:
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.fork(self._run_pause())
self._pause_cr = cocotb.start_soon(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
async def _run_pause(self):
clock_edge_event = RisingEdge(self.clock)
for val in self._pause_generator:
self.pause = val
await RisingEdge(self.clock)
await clock_edge_event
class AxiStreamSource(AxiStreamBase, AxiStreamPause):
@@ -423,6 +439,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame = AxiStreamFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.active_event.set()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
@@ -432,6 +449,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.active_event.set()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
@@ -483,15 +501,25 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame_offset = 0
self.active = False
has_tready = hasattr(self.bus, "tready")
has_tvalid = hasattr(self.bus, "tvalid")
has_tlast = hasattr(self.bus, "tlast")
has_tkeep = hasattr(self.bus, "tkeep")
has_tid = hasattr(self.bus, "tid")
has_tdest = hasattr(self.bus, "tdest")
has_tuser = hasattr(self.bus, "tuser")
clock_edge_event = RisingEdge(self.clock)
while True:
await RisingEdge(self.clock)
await clock_edge_event
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
tready_sample = (not has_tready) or self.bus.tready.value
tvalid_sample = (not has_tvalid) or self.bus.tvalid.value
if (tready_sample and tvalid_sample) or not tvalid_sample:
if frame is None and not self.queue.empty():
if not frame and not self.queue.empty():
frame = self.queue.get_nowait()
self.dequeue_event.set()
self.queue_occupancy_bytes -= len(frame)
@@ -529,26 +557,29 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
break
self.bus.tdata.value = tdata_val
if hasattr(self.bus, "tvalid"):
if has_tvalid:
self.bus.tvalid.value = 1
if hasattr(self.bus, "tlast"):
if has_tlast:
self.bus.tlast.value = tlast_val
if hasattr(self.bus, "tkeep"):
if has_tkeep:
self.bus.tkeep.value = tkeep_val
if hasattr(self.bus, "tid"):
if has_tid:
self.bus.tid.value = tid_val
if hasattr(self.bus, "tdest"):
if has_tdest:
self.bus.tdest.value = tdest_val
if hasattr(self.bus, "tuser"):
if has_tuser:
self.bus.tuser.value = tuser_val
else:
if hasattr(self.bus, "tvalid"):
if has_tvalid:
self.bus.tvalid.value = 0
if hasattr(self.bus, "tlast"):
if has_tlast:
self.bus.tlast.value = 0
self.active = bool(frame)
if not frame and self.queue.empty():
self.idle_event.set()
self.active_event.clear()
await self.active_event.wait()
class AxiStreamMonitor(AxiStreamBase):
@@ -567,11 +598,20 @@ class AxiStreamMonitor(AxiStreamBase):
self.read_queue = []
if hasattr(self.bus, "tvalid"):
cocotb.start_soon(self._run_tvalid_monitor())
if hasattr(self.bus, "tready"):
cocotb.start_soon(self._run_tready_monitor())
def _dequeue(self, frame):
pass
def _recv(self, frame, compact=True):
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
self._dequeue(frame)
if compact:
frame.compact()
return frame
@@ -611,19 +651,45 @@ class AxiStreamMonitor(AxiStreamBase):
else:
await self.active_event.wait()
async def _run_tvalid_monitor(self):
event = RisingEdge(self.bus.tvalid)
while True:
await event
self.wake_event.set()
async def _run_tready_monitor(self):
event = RisingEdge(self.bus.tready)
while True:
await event
self.wake_event.set()
async def _run(self):
frame = None
self.active = False
has_tready = hasattr(self.bus, "tready")
has_tvalid = hasattr(self.bus, "tvalid")
has_tlast = hasattr(self.bus, "tlast")
has_tkeep = hasattr(self.bus, "tkeep")
has_tid = hasattr(self.bus, "tid")
has_tdest = hasattr(self.bus, "tdest")
has_tuser = hasattr(self.bus, "tuser")
clock_edge_event = RisingEdge(self.clock)
wake_event = self.wake_event.wait()
while True:
await RisingEdge(self.clock)
await clock_edge_event
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
tready_sample = (not has_tready) or self.bus.tready.value
tvalid_sample = (not has_tvalid) or self.bus.tvalid.value
if tready_sample and tvalid_sample:
if frame is None:
if not frame:
if self.byte_size == 8:
frame = AxiStreamFrame(bytearray(), [], [], [], [])
else:
@@ -633,16 +699,16 @@ class AxiStreamMonitor(AxiStreamBase):
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
if has_tkeep:
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
if hasattr(self.bus, "tid"):
if has_tid:
frame.tid.append(self.bus.tid.value.integer)
if hasattr(self.bus, "tdest"):
if has_tdest:
frame.tdest.append(self.bus.tdest.value.integer)
if hasattr(self.bus, "tuser"):
if has_tuser:
frame.tuser.append(self.bus.tuser.value.integer)
if not hasattr(self.bus, "tlast") or self.bus.tlast.value:
if not has_tlast or self.bus.tlast.value:
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame)
@@ -656,6 +722,9 @@ class AxiStreamMonitor(AxiStreamBase):
else:
self.active = bool(frame)
self.wake_event.clear()
await wake_event
class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
@@ -669,11 +738,11 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
def __init__(self, bus, clock, reset=None, reset_active_level=True,
byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1
super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs)
def full(self):
if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
@@ -689,19 +758,39 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
if hasattr(self.bus, "tready"):
self.bus.tready.value = 0
def _pause_update(self, val):
self.wake_event.set()
def _dequeue(self, frame):
self.wake_event.set()
async def _run(self):
frame = None
self.active = False
has_tready = hasattr(self.bus, "tready")
has_tvalid = hasattr(self.bus, "tvalid")
has_tlast = hasattr(self.bus, "tlast")
has_tkeep = hasattr(self.bus, "tkeep")
has_tid = hasattr(self.bus, "tid")
has_tdest = hasattr(self.bus, "tdest")
has_tuser = hasattr(self.bus, "tuser")
clock_edge_event = RisingEdge(self.clock)
wake_event = self.wake_event.wait()
while True:
await RisingEdge(self.clock)
pause_sample = self.pause
await clock_edge_event
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
tready_sample = (not has_tready) or self.bus.tready.value
tvalid_sample = (not has_tvalid) or self.bus.tvalid.value
if tready_sample and tvalid_sample:
if frame is None:
if not frame:
if self.byte_size == 8:
frame = AxiStreamFrame(bytearray(), [], [], [], [])
else:
@@ -711,16 +800,16 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
if has_tkeep:
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
if hasattr(self.bus, "tid"):
if has_tid:
frame.tid.append(self.bus.tid.value.integer)
if hasattr(self.bus, "tdest"):
if has_tdest:
frame.tdest.append(self.bus.tdest.value.integer)
if hasattr(self.bus, "tuser"):
if has_tuser:
frame.tuser.append(self.bus.tuser.value.integer)
if not hasattr(self.bus, "tlast") or self.bus.tlast.value:
if not has_tlast or self.bus.tlast.value:
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame)
@@ -734,5 +823,9 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
else:
self.active = bool(frame)
if hasattr(self.bus, "tready"):
self.bus.tready.value = (not self.full() and not self.pause)
if has_tready:
self.bus.tready.value = (not self.full() and not pause_sample)
if not tvalid_sample or (self.pause and pause_sample) or self.full():
self.wake_event.clear()
await wake_event

View File

@@ -0,0 +1,92 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
class BuddyAllocator:
def __init__(self, size, min_alloc=1):
self.size = size
self.min_alloc = min_alloc
self.free_lists = [[] for x in range((self.size-1).bit_length())]
self.free_lists.append([0])
self.allocations = {}
def alloc(self, size):
if size < 1 or size > self.size:
raise ValueError("size out of range")
size = max(size, self.min_alloc)
bucket = (size-1).bit_length()
orig_bucket = bucket
while bucket < len(self.free_lists):
if not self.free_lists[bucket]:
# find free block
bucket += 1
continue
while bucket > orig_bucket:
# split block
block = self.free_lists[bucket].pop(0)
bucket -= 1
self.free_lists[bucket].append(block)
self.free_lists[bucket].append(block+2**bucket)
if self.free_lists[bucket]:
# allocate
block = self.free_lists[bucket].pop(0)
self.allocations[block] = bucket
return block
break
raise Exception("out of memory")
def free(self, addr):
if addr not in self.allocations:
raise ValueError("unknown allocation")
bucket = self.allocations.pop(addr)
while bucket < len(self.free_lists):
size = 2**bucket
# find buddy
if (addr // size) % 2:
buddy = addr - size
else:
buddy = addr + size
if buddy in self.free_lists[bucket]:
# buddy is free, merge
self.free_lists[bucket].remove(buddy)
addr = min(addr, buddy)
bucket += 1
else:
# buddy is not free, so add to free list
self.free_lists[bucket].append(addr)
return
raise Exception("failed to free memory")

View File

@@ -28,13 +28,13 @@ from .utils import hexdump, hexdump_lines, hexdump_str
class Memory:
def __init__(self, size=1024, mem=None, *args, **kwargs):
def __init__(self, size=1024, mem=None, **kwargs):
if mem is not None:
self.mem = mem
else:
self.mem = mmap.mmap(-1, size)
self.size = len(self.mem)
super().__init__(*args, **kwargs)
super().__init__(**kwargs)
def read(self, address, length):
self.mem.seek(address)

View File

@@ -33,7 +33,7 @@ class Reset:
self._reset_state = True
if reset_signal is not None:
cocotb.fork(self._run_reset(reset_signal, bool(active_level)))
cocotb.start_soon(self._run_reset(reset_signal, bool(active_level)))
self._update_reset()

View File

@@ -99,6 +99,7 @@ class StreamBase(Reset):
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.wake_event = Event()
self.ready = None
self.valid = None
@@ -153,7 +154,7 @@ class StreamBase(Reset):
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self._run_cr = cocotb.start_soon(self._run())
async def _run(self):
raise NotImplementedError()
@@ -163,10 +164,23 @@ class StreamPause:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pause = False
self._pause = False
self._pause_generator = None
self._pause_cr = None
def _pause_update(self, val):
pass
@property
def pause(self):
return self._pause
@pause.setter
def pause(self, val):
if self._pause != val:
self._pause_update(val)
self._pause = val
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
@@ -175,15 +189,17 @@ class StreamPause:
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.fork(self._run_pause())
self._pause_cr = cocotb.start_soon(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
async def _run_pause(self):
clock_edge_event = RisingEdge(self.clock)
for val in self._pause_generator:
self.pause = val
await RisingEdge(self.clock)
await clock_edge_event
class StreamSource(StreamBase, StreamPause):
@@ -204,12 +220,14 @@ class StreamSource(StreamBase, StreamPause):
await self.dequeue_event.wait()
await self.queue.put(obj)
self.idle_event.clear()
self.active_event.set()
def send_nowait(self, obj):
if self.full():
raise QueueFull()
self.queue.put_nowait(obj)
self.idle_event.clear()
self.active_event.set()
def full(self):
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
@@ -231,8 +249,10 @@ class StreamSource(StreamBase, StreamPause):
self.valid.value = 0
async def _run(self):
clock_edge_event = RisingEdge(self.clock)
while True:
await RisingEdge(self.clock)
await clock_edge_event
# read handshake signals
ready_sample = self.ready is None or self.ready.value
@@ -251,6 +271,9 @@ class StreamSource(StreamBase, StreamPause):
self.active = not self.queue.empty()
if self.queue.empty():
self.idle_event.set()
self.active_event.clear()
await self.active_event.wait()
class StreamMonitor(StreamBase):
@@ -260,9 +283,21 @@ class StreamMonitor(StreamBase):
_valid_init = None
_ready_init = None
def __init__(self, bus, clock, reset=None, reset_active_level=True, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, *args, **kwargs)
if self.valid is not None:
cocotb.start_soon(self._run_valid_monitor())
if self.ready is not None:
cocotb.start_soon(self._run_ready_monitor())
def _dequeue(self, item):
pass
def _recv(self, item):
if self.queue.empty():
self.active_event.clear()
self._dequeue(item)
return item
async def recv(self):
@@ -281,9 +316,27 @@ class StreamMonitor(StreamBase):
else:
await self.active_event.wait()
async def _run(self):
async def _run_valid_monitor(self):
event = RisingEdge(self.valid)
while True:
await RisingEdge(self.clock)
await event
self.wake_event.set()
async def _run_ready_monitor(self):
event = RisingEdge(self.ready)
while True:
await event
self.wake_event.set()
async def _run(self):
clock_edge_event = RisingEdge(self.clock)
wake_event = self.wake_event.wait()
while True:
await clock_edge_event
# read handshake signals
ready_sample = self.ready is None or self.ready.value
@@ -294,6 +347,9 @@ class StreamMonitor(StreamBase):
self.bus.sample(obj)
self.queue.put_nowait(obj)
self.active_event.set()
else:
self.wake_event.clear()
await wake_event
class StreamSink(StreamMonitor, StreamPause):
@@ -321,9 +377,21 @@ class StreamSink(StreamMonitor, StreamPause):
if self.ready is not None:
self.ready.value = 0
def _pause_update(self, val):
self.wake_event.set()
def _dequeue(self, item):
self.wake_event.set()
async def _run(self):
clock_edge_event = RisingEdge(self.clock)
wake_event = self.wake_event.wait()
while True:
await RisingEdge(self.clock)
pause_sample = self.pause
await clock_edge_event
# read handshake signals
ready_sample = self.ready is None or self.ready.value
@@ -336,7 +404,11 @@ class StreamSink(StreamMonitor, StreamPause):
self.active_event.set()
if self.ready is not None:
self.ready.value = (not self.full() and not self.pause)
self.ready.value = (not self.full() and not pause_sample)
if not valid_sample or (self.pause and pause_sample) or self.full():
self.wake_event.clear()
await wake_event
def define_stream(name, signals, optional_signals=None, valid_signal=None, ready_signal=None, signal_widths=None):

View File

@@ -1 +1 @@
__version__ = "0.1.14"
__version__ = "0.1.20"

View File

@@ -27,7 +27,7 @@ classifiers =
packages = find_namespace:
python_requires = >=3.6
install_requires =
cocotb
cocotb >= 1.6.0
cocotb-bus
[options.extras_require]
@@ -47,31 +47,38 @@ addopts =
# tox configuration
[tox:tox]
envlist = py36, py37, py38, py39
envlist = py37, py38, py39, py310
skip_missing_interpreters = true
minversion = 3.18.0
requires = virtualenv >= 16.1
[gh-actions]
python =
3.6: py36
3.7: py37
3.8: py38
3.9: py39
3.10: py310
[testenv]
setenv =
COVERAGE=1
usedevelop = True
deps =
pytest
pytest-xdist
cocotb-test
coverage
pytest-cov
pytest == 7.2.1
pytest-xdist == 3.1.0
cocotb == 1.7.2
cocotb-bus == 0.2.1
cocotb-test == 0.2.4
coverage == 7.0.5
pytest-cov == 4.0.0
commands =
pytest --cov=cocotbext --cov=tests --cov-branch -n auto
pytest --cov=cocotbext --cov=tests --cov-branch {posargs:-n auto --verbose}
bash -c 'find . -type f -name "\.coverage" | xargs coverage combine --append'
coverage report
whitelist_externals =
allowlist_externals =
bash
# combine if paths are different

View File

@@ -45,7 +45,7 @@ class TB:
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
cocotb.start_soon(Clock(dut.clk, 2, units="ns").start())
self.axi_master = AxiMaster(AxiBus.from_prefix(dut, "axi"), dut.clk, dut.rst)
self.axi_ram = AxiRam(AxiBus.from_prefix(dut, "axi"), dut.clk, dut.rst, size=2**16)
@@ -283,7 +283,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = []
for k in range(16):
workers.append(cocotb.fork(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
workers.append(cocotb.start_soon(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()

View File

@@ -45,7 +45,7 @@ class TB:
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
cocotb.start_soon(Clock(dut.clk, 2, units="ns").start())
self.axil_master = AxiLiteMaster(AxiLiteBus.from_prefix(dut, "axil"), dut.clk, dut.rst)
self.axil_ram = AxiLiteRam(AxiLiteBus.from_prefix(dut, "axil"), dut.clk, dut.rst, size=2**16)
@@ -272,7 +272,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = []
for k in range(16):
workers.append(cocotb.fork(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
workers.append(cocotb.start_soon(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()

View File

@@ -45,7 +45,7 @@ class TB:
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
cocotb.start_soon(Clock(dut.clk, 2, units="ns").start())
self.source = AxiStreamSource(AxiStreamBus.from_prefix(dut, "axis"), dut.clk, dut.rst)
self.sink = AxiStreamSink(AxiStreamBus.from_prefix(dut, "axis"), dut.clk, dut.rst)

View File

@@ -0,0 +1,44 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
from cocotbext.axi.buddy_allocator import BuddyAllocator
def test_allocator():
ba = BuddyAllocator(1024)
lst = []
for k in range(1, 32):
print(f"Alloc {k} bytes")
addr = ba.alloc(k)
print(f"Got address {addr}")
assert addr & (2**((k-1).bit_length())-1) == 0
lst.append(addr)
for addr in lst:
print(f"Free {addr}")
ba.free(addr)
assert ba.free_lists[-1] == [0]