Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1608af26e5 | ||
|
|
31fb855311 | ||
|
|
cb4b0e1738 | ||
|
|
ea95eeaf0d | ||
|
|
079f4009b3 | ||
|
|
612a94c97a | ||
|
|
757e3a6f2d | ||
|
|
b9b9a2da72 | ||
|
|
f7660e9038 | ||
|
|
78693a63d5 | ||
|
|
34498f6e5d | ||
|
|
6329187ced | ||
|
|
da24857dd2 | ||
|
|
c08f22c710 | ||
|
|
d874d91d05 | ||
|
|
3fd016a84c | ||
|
|
43de2ea9b0 | ||
|
|
558ba51c91 | ||
|
|
f6426bd8f3 |
94
README.md
94
README.md
@@ -32,7 +32,7 @@ See the `tests` directory, [verilog-axi](https://github.com/alexforencich/verilo
|
||||
|
||||
### AXI and AXI lite master
|
||||
|
||||
The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs.
|
||||
The `AxiMaster` and `AxiLiteMaster` classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The `AxiMaster` module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs. `AxiMaster` and `AxiLiteMaster` and related objects all extend `Region`, so they can be attached to `AddressSpace` objects to handle memory operations in the specified region.
|
||||
|
||||
The `AxiMaster` is a wrapper around `AxiMasterWrite` and `AxiMasterRead`. Similarly, `AxiLiteMaster` is a wrapper around `AxiLiteMasterWrite` and `AxiLiteMasterRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
|
||||
@@ -123,9 +123,39 @@ With this method, it is possible to start multiple concurrent operations from th
|
||||
|
||||
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
|
||||
|
||||
### AXI and AXI lite slave
|
||||
|
||||
The `AxiSlave` and `AxiLiteSlave` classes implement AXI slaves and are capable of completing read and write operations from upstream AXI masters. The `AxiSlave` module is capable of handling narrow bursts. These modules can either be used to perform memory reads and writes on a `MemoryInterface` on behalf of the DUT, or they can be extended to implement customized functionality.
|
||||
|
||||
The `AxiSlave` is a wrapper around `AxiSlaveWrite` and `AxiSlaveRead`. Similarly, `AxiLiteSlave` is a wrapper around `AxiLiteSlaveWrite` and `AxiLiteSlaveRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
|
||||
To use these modules, import the one you need and connect it to the DUT:
|
||||
|
||||
from cocotbext.axi import AxiBus, AxiSlave, MemoryRegion
|
||||
|
||||
axi_slave = AxiSlave(AxiBus.from_prefix(dut, "m_axi"), dut.clk, dut.rst)
|
||||
region = MemoryRegion(2**axi_slave.read_if.address_width)
|
||||
axi_slave.target = region
|
||||
|
||||
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
|
||||
|
||||
It is also possible to extend these modules; operation can be customized by overriding the internal `_read()` and `_write()` methods. See `AxiRam` and `AxiLiteRam` for an example.
|
||||
|
||||
#### `AxiSlave` and `AxiLiteSlave` constructor parameters
|
||||
|
||||
* _bus_: `AxiBus` or `AxiLiteBus` object containing AXI interface signals
|
||||
* _clock_: clock signal
|
||||
* _reset_: reset signal (optional)
|
||||
* _reset_active_level_: reset active level (optional, default `True`)
|
||||
* _target_: target region (optional, default `None`)
|
||||
|
||||
#### Attributes:
|
||||
|
||||
* _target_: target region
|
||||
|
||||
### AXI and AXI lite RAM
|
||||
|
||||
The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts.
|
||||
The `AxiRam` and `AxiLiteRam` classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The `AxiRam` module is capable of handling narrow bursts. These modules are extensions of the corresponding `AxiSlave` and `AxiLiteSlave` modules.
|
||||
|
||||
The `AxiRam` is a wrapper around `AxiRamWrite` and `AxiRamRead`. Similarly, `AxiLiteRam` is a wrapper around `AxiLiteRamWrite` and `AxiLiteRamRead`. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
|
||||
|
||||
@@ -290,6 +320,66 @@ Methods:
|
||||
* `normalize()`: pack `tkeep`, `tid`, `tdest`, and `tuser` to the same length as `tdata`, replicating last element if necessary, initialize `tkeep` to list of `1` and `tid`, `tdest`, and `tuser` to list of `0` if not specified.
|
||||
* `compact()`: remove `tdata`, `tid`, `tdest`, and `tuser` values based on `tkeep`, remove `tkeep`, compact `tid`, `tdest`, and `tuser` to an int if all values are identical.
|
||||
|
||||
### Address space abstraction
|
||||
|
||||
The address space abstraction provides a framework for cross-connecting multiple memory-mapped interfaces for testing components that interface with complex systems, including components with DMA engines.
|
||||
|
||||
`MemoryInterface` is the base class for all components in the address space abstraction. `MemoryInterface` provides the core `read()` and `write()` methods, which implement bounds checking, as well as word-access wrappers. Methods for creating `Window` and `WindowPool` objects are also provided. The function `get_absolute_address()` translates addresses to the system address space. `MemoryInterface` can be extended to implement custom functionality by overriding `_read()` and `_write()`.
|
||||
|
||||
`Window` objects represent views onto a parent address space with some length and offset. `read()` and `write()` operations on a `Window` are translated to the equivalent operations on the parent address space. Multiple `Window` instances can overlap and access the same portion of address space.
|
||||
|
||||
`WindowPool` provides a method for dynamically allocating windows from a section of address space. It uses a standard memory management algorithm to provide naturally-aligned `Window` objects of the requested size.
|
||||
|
||||
`Region` is the base class for all components which implement a portion of address space. `Region` objects can be registered with `AddressSpace` objects to handle `read()` and `write()` operations in a specified region. `Region` can be extended by components that implement a portion of address space.
|
||||
|
||||
`MemoryRegion` is an extension of `Region` that uses an `mmap` instance to handle memory operations. `MemoryRegion` also provides hex dump methods as well as indexing and slicing.
|
||||
|
||||
`PeripheralRegion` is an extension of `Region` that can wrap another object that implements `read()` and `write()`, as an alternative to extending `Region`.
|
||||
|
||||
`AddressSpace` is the core object for handling address spaces. `Region` objects can be registered with `AddressSpace` with specified base address, size, and offset. The `AddressSpace` object will then direct `read()` and `write()` operations to the appropriate `Region`s, splitting requests appropriately when necessary and translating addresses. Regions registered with `offset` other than `None` are translated such that accesses to base address + N map to N + offset. Regions registered with an `offset` of `None` are not translated. `Region` objects registered with the same `AddressSpace` cannot overlap, however the same `Region` can be registered multiple times. `AddressSpace` also provides a method for creating `Pool` objects.
|
||||
|
||||
`Pool` is an extension of `AddressSpace` that supports dynamic allocation of `MemoryRegion`s. It uses a standard memory management algorithm to provide naturally-aligned `MemoryRegion` objects of the requested size.
|
||||
|
||||
#### Example
|
||||
|
||||
This is a simple example that shows how the address space abstraction components can be used to connect a DUT to a simulated host system, including simulated RAM, an AXI interface from the DUT for DMA, and an AXI lite interface to the DUT for control.
|
||||
|
||||
from cocotbext.axi import AddressSpace, MemoryRegion
|
||||
from cocotbext.axi import AxiBus, AxiLiteMaster, AxiSlave
|
||||
|
||||
# system address space
|
||||
address_space = AddressSpace(2**32)
|
||||
|
||||
# RAM
|
||||
ram = MemoryRegion(2**24)
|
||||
address_space.register_region(ram, 0x0000_0000)
|
||||
ram_pool = address_space.create_window_pool(0x0000_0000, 2**20)
|
||||
|
||||
# DUT control register interface
|
||||
axil_master = AxiLiteMaster(AxiLiteBus.from_prefix(dut, "s_axil_ctrl"), dut.clk, dut.rst)
|
||||
address_space.register_region(axil_master, 0x8000_0000)
|
||||
ctrl_regs = address_space.create_window(0x8000_0000, axil_master.size)
|
||||
|
||||
# DMA from DUT
|
||||
axi_slave = AxiSlave(AxiBus.from_prefix(dut, "m_axi_dma"), dut.clk, dut.rst, target=address_space)
|
||||
|
||||
# exercise DUT DMA functionality
|
||||
src_block = ram_pool.alloc_window(1024)
|
||||
dst_block = ram_pool.alloc_window(1024)
|
||||
|
||||
test_data = b'test data'
|
||||
await src_block.write(0, test_data)
|
||||
|
||||
await ctrl_regs.write_dword(DMA_SRC_ADDR, src_block.get_absolute_address(0))
|
||||
await ctrl_regs.write_dword(DMA_DST_ADDR, dst_block.get_absolute_address(0))
|
||||
await ctrl_regs.write_dword(DMA_LEN, len(test_data))
|
||||
await ctrl_regs.write_dword(DMA_CONTROL, 1)
|
||||
|
||||
while await ctrl_regs.read_dword(DMA_STATUS) == 0:
|
||||
pass
|
||||
|
||||
assert await dst_block.read(0, len(test_data)) == test_data
|
||||
|
||||
### AXI signals
|
||||
|
||||
* Write address channel
|
||||
|
||||
@@ -26,14 +26,20 @@ from .version import __version__
|
||||
|
||||
from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp
|
||||
|
||||
from .address_space import MemoryInterface, Window, WindowPool
|
||||
from .address_space import Region, MemoryRegion, PeripheralRegion
|
||||
from .address_space import AddressSpace, Pool
|
||||
|
||||
from .axis import AxiStreamFrame, AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
|
||||
|
||||
from .axil_channels import AxiLiteAWBus, AxiLiteWBus, AxiLiteBBus, AxiLiteARBus, AxiLiteRBus
|
||||
from .axil_channels import AxiLiteWriteBus, AxiLiteReadBus, AxiLiteBus
|
||||
from .axil_master import AxiLiteMasterWrite, AxiLiteMasterRead, AxiLiteMaster
|
||||
from .axil_slave import AxiLiteSlaveWrite, AxiLiteSlaveRead, AxiLiteSlave
|
||||
from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam
|
||||
|
||||
from .axi_channels import AxiAWBus, AxiWBus, AxiBBus, AxiARBus, AxiRBus
|
||||
from .axi_channels import AxiWriteBus, AxiReadBus, AxiBus
|
||||
from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster
|
||||
from .axi_slave import AxiSlaveWrite, AxiSlaveRead, AxiSlave
|
||||
from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam
|
||||
|
||||
320
cocotbext/axi/address_space.py
Normal file
320
cocotbext/axi/address_space.py
Normal file
@@ -0,0 +1,320 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import mmap
|
||||
|
||||
from .buddy_allocator import BuddyAllocator
|
||||
from .utils import hexdump, hexdump_lines, hexdump_str
|
||||
|
||||
|
||||
class MemoryInterface:
|
||||
def __init__(self, size, base=0, parent=None, **kwargs):
|
||||
self._parent = parent
|
||||
self._size = size
|
||||
self._base = base
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@property
|
||||
def parent(self):
|
||||
return self._parent
|
||||
|
||||
@property
|
||||
def size(self):
|
||||
return self._size
|
||||
|
||||
@property
|
||||
def base(self):
|
||||
return self._base
|
||||
|
||||
def check_range(self, address, length=0):
|
||||
if address < 0 or address >= self.size:
|
||||
raise ValueError("address out of range")
|
||||
if length < 0:
|
||||
raise ValueError("invalid length")
|
||||
if address+length > self.size:
|
||||
raise ValueError("operation out of range")
|
||||
|
||||
def get_absolute_address(self, address):
|
||||
if self.base is None:
|
||||
return None
|
||||
self.check_range(address)
|
||||
return address+self.base
|
||||
|
||||
async def _read(self, address, length, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
async def read(self, address, length, **kwargs):
|
||||
self.check_range(address, length)
|
||||
return await self._read(address, length, **kwargs)
|
||||
|
||||
async def read_words(self, address, count, byteorder='little', ws=2, **kwargs):
|
||||
data = bytes(await self.read(address, count*ws, **kwargs))
|
||||
words = []
|
||||
for k in range(count):
|
||||
words.append(int.from_bytes(data[ws*k:ws*(k+1)], byteorder))
|
||||
return words
|
||||
|
||||
async def read_dwords(self, address, count, byteorder='little', **kwargs):
|
||||
return await self.read_words(address, count, byteorder, 4, **kwargs)
|
||||
|
||||
async def read_qwords(self, address, count, byteorder='little', **kwargs):
|
||||
return await self.read_words(address, count, byteorder, 8, **kwargs)
|
||||
|
||||
async def read_byte(self, address, **kwargs):
|
||||
return (await self.read(address, 1, **kwargs)).data[0]
|
||||
|
||||
async def read_word(self, address, byteorder='little', ws=2, **kwargs):
|
||||
return (await self.read_words(address, 1, byteorder, ws, **kwargs))[0]
|
||||
|
||||
async def read_dword(self, address, byteorder='little', **kwargs):
|
||||
return (await self.read_dwords(address, 1, byteorder, **kwargs))[0]
|
||||
|
||||
async def read_qword(self, address, byteorder='little', **kwargs):
|
||||
return (await self.read_qwords(address, 1, byteorder, **kwargs))[0]
|
||||
|
||||
async def _write(self, address, data, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
async def write(self, address, data, **kwargs):
|
||||
self.check_range(address, len(data))
|
||||
await self._write(address, data, **kwargs)
|
||||
|
||||
async def write_words(self, address, data, byteorder='little', ws=2, **kwargs):
|
||||
words = data
|
||||
data = bytearray()
|
||||
for w in words:
|
||||
data.extend(w.to_bytes(ws, byteorder))
|
||||
await self.write(address, data, **kwargs)
|
||||
|
||||
async def write_dwords(self, address, data, byteorder='little', **kwargs):
|
||||
await self.write_words(address, data, byteorder, 4, **kwargs)
|
||||
|
||||
async def write_qwords(self, address, data, byteorder='little', **kwargs):
|
||||
await self.write_words(address, data, byteorder, 8, **kwargs)
|
||||
|
||||
async def write_byte(self, address, data, **kwargs):
|
||||
await self.write(address, [data], **kwargs)
|
||||
|
||||
async def write_word(self, address, data, byteorder='little', ws=2, **kwargs):
|
||||
await self.write_words(address, [data], byteorder, ws, **kwargs)
|
||||
|
||||
async def write_dword(self, address, data, byteorder='little', **kwargs):
|
||||
await self.write_dwords(address, [data], byteorder, **kwargs)
|
||||
|
||||
async def write_qword(self, address, data, byteorder='little', **kwargs):
|
||||
await self.write_qwords(address, [data], byteorder, **kwargs)
|
||||
|
||||
def create_window(self, offset, size):
|
||||
self.check_range(offset, size)
|
||||
return Window(self, offset, size, base=self.get_absolute_address(offset))
|
||||
|
||||
def create_window_pool(self, offset=None, size=None):
|
||||
if offset is None:
|
||||
offset = 0
|
||||
if size is None:
|
||||
size = self.size - offset
|
||||
self.check_range(offset, size)
|
||||
return WindowPool(self, offset, size, base=self.get_absolute_address(offset))
|
||||
|
||||
def __len__(self):
|
||||
return self._size
|
||||
|
||||
|
||||
class Window(MemoryInterface):
|
||||
def __init__(self, parent, offset, size, base=0, **kwargs):
|
||||
super().__init__(size, base=base, parent=parent, **kwargs)
|
||||
self._offset = offset
|
||||
|
||||
@property
|
||||
def offset(self):
|
||||
return self._offset
|
||||
|
||||
def get_parent_address(self, address):
|
||||
if address < 0 or address >= self.size:
|
||||
raise ValueError("address out of range")
|
||||
return address+self.offset
|
||||
|
||||
async def _read(self, address, length, **kwargs):
|
||||
return await self.parent.read(self.get_parent_address(address), length, **kwargs)
|
||||
|
||||
async def _write(self, address, data, **kwargs):
|
||||
await self.parent.write(self.get_parent_address(address), data, **kwargs)
|
||||
|
||||
|
||||
class WindowPool(Window):
|
||||
def __init__(self, parent, offset, size, base=None, **kwargs):
|
||||
super().__init__(parent, offset, size, base=base, **kwargs)
|
||||
self.allocator = BuddyAllocator(size)
|
||||
|
||||
def alloc_window(self, size):
|
||||
return self.create_window(self.allocator.alloc(size), size)
|
||||
|
||||
|
||||
class Region(MemoryInterface):
|
||||
def __init__(self, size, **kwargs):
|
||||
super().__init__(size, **kwargs)
|
||||
|
||||
|
||||
class MemoryRegion(Region):
|
||||
def __init__(self, size=4096, mem=None, **kwargs):
|
||||
super().__init__(size, **kwargs)
|
||||
if mem is None:
|
||||
mem = mmap.mmap(-1, size)
|
||||
self.mem = mem
|
||||
|
||||
async def _read(self, address, length, **kwargs):
|
||||
return self.mem[address:address+length]
|
||||
|
||||
async def _write(self, address, data, **kwargs):
|
||||
self.mem[address:address+len(data)] = data
|
||||
|
||||
def hexdump(self, address, length, prefix=""):
|
||||
hexdump(self.mem[address:address+length], prefix=prefix, offset=address)
|
||||
|
||||
def hexdump_lines(self, address, length, prefix=""):
|
||||
return hexdump_lines(self.mem[address:address+length], prefix=prefix, offset=address)
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem[address:address+length], prefix=prefix, offset=address)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.mem[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.mem[key] = value
|
||||
|
||||
def __bytes__(self):
|
||||
return bytes(self.mem)
|
||||
|
||||
|
||||
class PeripheralRegion(Region):
|
||||
def __init__(self, obj, size, **kwargs):
|
||||
super().__init__(size, **kwargs)
|
||||
self.obj = obj
|
||||
|
||||
async def _read(self, address, length, **kwargs):
|
||||
try:
|
||||
return await self.obj.read(address, length, **kwargs)
|
||||
except TypeError:
|
||||
return self.obj.read(address, length, **kwargs)
|
||||
|
||||
async def _write(self, address, data, **kwargs):
|
||||
try:
|
||||
await self.obj.write(address, data, **kwargs)
|
||||
except TypeError:
|
||||
self.obj.write(address, data, **kwargs)
|
||||
|
||||
|
||||
class AddressSpace(Region):
|
||||
def __init__(self, size=2**64, base=0, parent=None, **kwargs):
|
||||
super().__init__(size=size, base=base, parent=parent, **kwargs)
|
||||
self.regions = []
|
||||
|
||||
def find_regions(self, address, length=1):
|
||||
regions = []
|
||||
if address < 0 or address >= self.size:
|
||||
raise ValueError("address out of range")
|
||||
if length < 0:
|
||||
raise ValueError("invalid length")
|
||||
length = max(length, 1)
|
||||
for (base, size, translate, region) in self.regions:
|
||||
if address < base+size and base < address+length:
|
||||
regions.append((base, size, translate, region))
|
||||
regions.sort()
|
||||
return regions
|
||||
|
||||
def register_region(self, region, base, size=None, offset=0):
|
||||
if size is None:
|
||||
size = region.size
|
||||
if self.find_regions(base, size):
|
||||
raise ValueError("overlaps existing region")
|
||||
region._parent = self
|
||||
if offset == 0:
|
||||
region._base = self.get_absolute_address(base)
|
||||
else:
|
||||
region._base = None
|
||||
self.regions.append((base, size, offset, region))
|
||||
|
||||
async def read(self, address, length, **kwargs):
|
||||
regions = self.find_regions(address, length)
|
||||
data = bytearray()
|
||||
if not regions:
|
||||
raise Exception("Invalid address")
|
||||
for base, size, offset, region in regions:
|
||||
if base > address:
|
||||
raise Exception("Invalid address")
|
||||
seg_addr = address - base
|
||||
seg_len = min(size-seg_addr, length)
|
||||
if offset is None:
|
||||
seg_addr = address
|
||||
offset = 0
|
||||
data.extend(bytes(await region.read(seg_addr+offset, seg_len, **kwargs)))
|
||||
address += seg_len
|
||||
length -= seg_len
|
||||
if length > 0:
|
||||
raise Exception("Invalid address")
|
||||
return bytes(data)
|
||||
|
||||
async def write(self, address, data, **kwargs):
|
||||
start = 0
|
||||
length = len(data)
|
||||
regions = self.find_regions(address, length)
|
||||
if not regions:
|
||||
raise Exception("Invalid address")
|
||||
for base, size, offset, region in regions:
|
||||
if base > address:
|
||||
raise Exception("Invalid address")
|
||||
seg_addr = address - base
|
||||
seg_len = min(size-seg_addr, length)
|
||||
if offset is None:
|
||||
seg_addr = address
|
||||
offset = 0
|
||||
await region.write(seg_addr+offset, data[start:start+seg_len], **kwargs)
|
||||
address += seg_len
|
||||
start += seg_len
|
||||
length -= seg_len
|
||||
if length > 0:
|
||||
raise Exception("Invalid address")
|
||||
|
||||
def create_pool(self, base=None, size=None):
|
||||
if base is None:
|
||||
base = 0
|
||||
if size is None:
|
||||
size = self.size - base
|
||||
self.check_range(base, size)
|
||||
pool = Pool(self, base, size)
|
||||
self.register_region(pool, base, size)
|
||||
return pool
|
||||
|
||||
|
||||
class Pool(AddressSpace):
|
||||
def __init__(self, parent, base, size, **kwargs):
|
||||
super().__init__(parent=parent, base=base, size=size, **kwargs)
|
||||
self.allocator = BuddyAllocator(size)
|
||||
|
||||
def alloc_region(self, size):
|
||||
base = self.allocator.alloc(size)
|
||||
region = MemoryRegion(size)
|
||||
self.register_region(region, base)
|
||||
return region
|
||||
@@ -23,7 +23,8 @@ THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import namedtuple, Counter
|
||||
from collections import Counter
|
||||
from typing import List, NamedTuple, Union
|
||||
|
||||
import cocotb
|
||||
from cocotb.queue import Queue
|
||||
@@ -32,21 +33,78 @@ from cocotb.triggers import Event
|
||||
from .version import __version__
|
||||
from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp
|
||||
from .axi_channels import AxiAWSource, AxiWSource, AxiBSink, AxiARSource, AxiRSink
|
||||
from .address_space import Region
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
# AXI master write helper objects
|
||||
AxiWriteCmd = namedtuple("AxiWriteCmd", ["address", "data", "awid", "burst", "size",
|
||||
"lock", "cache", "prot", "qos", "region", "user", "wuser", "event"])
|
||||
AxiWriteRespCmd = namedtuple("AxiWriteRespCmd", ["address", "length", "size", "cycles",
|
||||
"prot", "burst_list", "event"])
|
||||
AxiWriteResp = namedtuple("AxiWriteResp", ["address", "length", "resp", "user"])
|
||||
class AxiWriteCmd(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
awid: int
|
||||
burst: AxiBurstType
|
||||
size: int
|
||||
lock: AxiLockType
|
||||
cache: int
|
||||
prot: AxiProt
|
||||
qos: int
|
||||
region: int
|
||||
user: int
|
||||
wuser: Union[list, int, None]
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiWriteRespCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
size: int
|
||||
cycles: int
|
||||
prot: AxiProt
|
||||
burst_list: List[int]
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiWriteResp(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
resp: AxiResp
|
||||
user: Union[list, None]
|
||||
|
||||
|
||||
# AXI master read helper objects
|
||||
AxiReadCmd = namedtuple("AxiReadCmd", ["address", "length", "arid", "burst", "size",
|
||||
"lock", "cache", "prot", "qos", "region", "user", "event"])
|
||||
AxiReadRespCmd = namedtuple("AxiReadRespCmd", ["address", "length", "size", "cycles",
|
||||
"prot", "burst_list", "event"])
|
||||
AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
|
||||
class AxiReadCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
arid: int
|
||||
burst: AxiBurstType
|
||||
size: int
|
||||
lock: AxiLockType
|
||||
cache: int
|
||||
prot: AxiProt
|
||||
qos: int
|
||||
region: int
|
||||
user: int
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiReadRespCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
size: int
|
||||
cycles: int
|
||||
prot: AxiProt
|
||||
burst_list: List[int]
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiReadResp(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
resp: AxiResp
|
||||
user: Union[list, None]
|
||||
|
||||
def __bytes__(self):
|
||||
return self.data
|
||||
|
||||
|
||||
class TagContext:
|
||||
@@ -135,8 +193,8 @@ class TagContextManager:
|
||||
return flushed_cmds
|
||||
|
||||
|
||||
class AxiMasterWrite(Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
|
||||
class AxiMasterWrite(Region, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
@@ -167,6 +225,8 @@ class AxiMasterWrite(Reset):
|
||||
self._idle = Event()
|
||||
self._idle.set()
|
||||
|
||||
self.address_width = len(self.aw_channel.bus.awaddr)
|
||||
self.id_width = len(self.aw_channel.bus.awid)
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
@@ -184,9 +244,11 @@ class AxiMasterWrite(Reset):
|
||||
self.wuser_present = hasattr(self.bus.w, "wuser")
|
||||
self.buser_present = hasattr(self.bus.b, "buser")
|
||||
|
||||
super().__init__(2**self.address_width, **kwargs)
|
||||
|
||||
self.log.info("AXI master configuration:")
|
||||
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
|
||||
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" ID width: %d bits", self.id_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
|
||||
@@ -220,6 +282,12 @@ class AxiMasterWrite(Reset):
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if awid is None or awid < 0:
|
||||
awid = None
|
||||
elif awid > self.id_count:
|
||||
@@ -266,7 +334,7 @@ class AxiMasterWrite(Reset):
|
||||
self.in_flight_operations += 1
|
||||
self._idle.clear()
|
||||
|
||||
cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock,
|
||||
cmd = AxiWriteCmd(address, bytes(data), awid, burst, size, lock,
|
||||
cache, prot, qos, region, user, wuser, event)
|
||||
self.write_command_queue.put_nowait(cmd)
|
||||
|
||||
@@ -285,43 +353,6 @@ class AxiMasterWrite(Reset):
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
words = data
|
||||
data = bytearray()
|
||||
for w in words:
|
||||
data.extend(w.to_bytes(ws, byteorder))
|
||||
await self.write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write_words(address, data, byteorder, 4, awid, burst, size,
|
||||
lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write_words(address, data, byteorder, 8, awid, burst, size,
|
||||
lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write(address, [data], awid, burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write_words(address, [data], byteorder, ws, awid, burst, size,
|
||||
lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write_dwords(address, [data], byteorder, awid, burst, size,
|
||||
lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
await self.write_qwords(address, [data], byteorder, awid, burst, size,
|
||||
lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
@@ -397,8 +428,9 @@ class AxiMasterWrite(Reset):
|
||||
|
||||
wuser = cmd.wuser
|
||||
|
||||
self.log.info("Write start addr: 0x%08x awid: 0x%x prot: %s data: %s",
|
||||
cmd.address, awid, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Write start addr: 0x%08x awid: 0x%x prot: %s data: %s",
|
||||
cmd.address, awid, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
|
||||
|
||||
for k in range(cycles):
|
||||
start = cycle_offset
|
||||
@@ -444,7 +476,7 @@ class AxiMasterWrite(Reset):
|
||||
await self.aw_channel.send(aw)
|
||||
|
||||
self.log.info("Write burst start awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
|
||||
awid, cur_addr, burst_length-1, cmd.size, cmd.prot)
|
||||
awid, cur_addr, burst_length-1, cmd.size, cmd.prot)
|
||||
|
||||
n += 1
|
||||
|
||||
@@ -475,7 +507,7 @@ class AxiMasterWrite(Reset):
|
||||
while True:
|
||||
b = await self.b_channel.recv()
|
||||
|
||||
bid = int(b.bid)
|
||||
bid = int(getattr(b, 'bid', 0))
|
||||
|
||||
if self.active_id[bid] <= 0:
|
||||
raise Exception(f"Unexpected burst ID {bid}")
|
||||
@@ -491,8 +523,8 @@ class AxiMasterWrite(Reset):
|
||||
for burst_length in cmd.burst_list:
|
||||
b = await context.get_resp()
|
||||
|
||||
burst_resp = AxiResp(b.bresp)
|
||||
burst_user = int(b.buser)
|
||||
burst_resp = AxiResp(getattr(b, 'bresp', AxiResp.OKAY))
|
||||
burst_user = int(getattr(b, 'buser', 0))
|
||||
|
||||
if burst_resp != AxiResp.OKAY:
|
||||
resp = burst_resp
|
||||
@@ -511,7 +543,7 @@ class AxiMasterWrite(Reset):
|
||||
user = None
|
||||
|
||||
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
|
||||
cmd.address, cmd.prot, resp, cmd.length)
|
||||
cmd.address, cmd.prot, resp, cmd.length)
|
||||
|
||||
write_resp = AxiWriteResp(cmd.address, cmd.length, resp, user)
|
||||
|
||||
@@ -523,8 +555,8 @@ class AxiMasterWrite(Reset):
|
||||
self._idle.set()
|
||||
|
||||
|
||||
class AxiMasterRead(Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
|
||||
class AxiMasterRead(Region, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
@@ -553,6 +585,8 @@ class AxiMasterRead(Reset):
|
||||
self._idle = Event()
|
||||
self._idle.set()
|
||||
|
||||
self.address_width = len(self.ar_channel.bus.araddr)
|
||||
self.id_width = len(self.ar_channel.bus.arid)
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
@@ -568,9 +602,11 @@ class AxiMasterRead(Reset):
|
||||
self.aruser_present = hasattr(self.bus.ar, "aruser")
|
||||
self.ruser_present = hasattr(self.bus.r, "ruser")
|
||||
|
||||
super().__init__(2**self.address_width, **kwargs)
|
||||
|
||||
self.log.info("AXI master configuration:")
|
||||
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
|
||||
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" ID width: %d bits", self.id_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
|
||||
@@ -603,6 +639,9 @@ class AxiMasterRead(Reset):
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if length < 0:
|
||||
raise ValueError("Read length must be positive")
|
||||
|
||||
@@ -660,43 +699,6 @@ class AxiMasterRead(Reset):
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
data = await self.read(address, count*ws, arid, burst, size, lock, cache, prot, qos, region, user)
|
||||
words = []
|
||||
for k in range(count):
|
||||
words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder))
|
||||
return words
|
||||
|
||||
async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_words(address, count, byteorder, 4, arid, burst, size,
|
||||
lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_words(address, count, byteorder, 8, arid, burst, size,
|
||||
lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read(address, 1, arid, burst, size, lock, cache, prot, qos, region, user)).data[0]
|
||||
|
||||
async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_words(address, 1, byteorder, ws, arid, burst, size,
|
||||
lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_dwords(address, 1, byteorder, arid, burst, size,
|
||||
lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_qwords(address, 1, byteorder, arid, burst, size,
|
||||
lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
@@ -778,7 +780,7 @@ class AxiMasterRead(Reset):
|
||||
|
||||
burst_list.append(burst_length)
|
||||
|
||||
ar = self.r_channel._transaction_obj()
|
||||
ar = self.ar_channel._transaction_obj()
|
||||
ar.arid = arid
|
||||
ar.araddr = cur_addr
|
||||
ar.arlen = burst_length-1
|
||||
@@ -795,7 +797,7 @@ class AxiMasterRead(Reset):
|
||||
await self.ar_channel.send(ar)
|
||||
|
||||
self.log.info("Read burst start arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
|
||||
arid, cur_addr, burst_length-1, cmd.size, cmd.prot)
|
||||
arid, cur_addr, burst_length-1, cmd.size, cmd.prot)
|
||||
|
||||
cur_addr += num_bytes
|
||||
|
||||
@@ -811,7 +813,7 @@ class AxiMasterRead(Reset):
|
||||
while True:
|
||||
r = await self.r_channel.recv()
|
||||
|
||||
rid = int(r.rid)
|
||||
rid = int(getattr(r, 'rid', 0))
|
||||
|
||||
if cur_rid is not None and cur_rid != rid:
|
||||
raise Exception(f"ID not constant within burst (expected {cur_rid}, got {rid})")
|
||||
@@ -853,8 +855,8 @@ class AxiMasterRead(Reset):
|
||||
|
||||
for r in burst:
|
||||
cycle_data = int(r.rdata)
|
||||
cycle_resp = AxiResp(r.rresp)
|
||||
cycle_user = int(r.ruser)
|
||||
cycle_resp = AxiResp(getattr(r, "rresp", AxiResp.OKAY))
|
||||
cycle_user = int(getattr(r, "ruser", 0))
|
||||
|
||||
if cycle_resp != AxiResp.OKAY:
|
||||
resp = cycle_resp
|
||||
@@ -884,10 +886,11 @@ class AxiMasterRead(Reset):
|
||||
if not self.ruser_present:
|
||||
user = None
|
||||
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
read_resp = AxiReadResp(cmd.address, data, resp, user)
|
||||
read_resp = AxiReadResp(cmd.address, bytes(data), resp, user)
|
||||
|
||||
cmd.event.set(read_resp)
|
||||
|
||||
@@ -897,13 +900,15 @@ class AxiMasterRead(Reset):
|
||||
self._idle.set()
|
||||
|
||||
|
||||
class AxiMaster:
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
|
||||
class AxiMaster(Region):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len)
|
||||
self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len)
|
||||
self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len, **kwargs)
|
||||
self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len, **kwargs)
|
||||
|
||||
super().__init__(max(self.write_if.size, self.read_if.size), **kwargs)
|
||||
|
||||
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
|
||||
@@ -932,77 +937,7 @@ class AxiMaster:
|
||||
return await self.read_if.read(address, length, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_words(address, count, byteorder, ws, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_dwords(address, count, byteorder, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_qwords(address, count, byteorder, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_byte(address, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_word(address, byteorder, ws, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_dword(address, byteorder, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_qword(address, byteorder, arid,
|
||||
burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write(address, data, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_words(address, data, byteorder, ws, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_dwords(address, data, byteorder, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_qwords(address, data, byteorder, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_byte(address, data, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_word(address, data, byteorder, ws, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_dword(address, data, byteorder, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
|
||||
return await self.write_if.write_qword(address, data, byteorder, awid,
|
||||
burst, size, lock, cache, prot, qos, region, user, wuser)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -22,278 +22,32 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import cocotb
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiBurstType, AxiProt, AxiResp
|
||||
from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource
|
||||
from .axi_slave import AxiSlaveWrite, AxiSlaveRead
|
||||
from .memory import Memory
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
class AxiRamWrite(Memory, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
class AxiRamWrite(AxiSlaveWrite, Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
|
||||
|
||||
self.log.info("AXI RAM model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
async def _write(self, address, data):
|
||||
self.write(address % self.size, data)
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
|
||||
self.aw_channel.queue_occupancy_limit = 2
|
||||
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
|
||||
self.w_channel.queue_occupancy_limit = 2
|
||||
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
|
||||
self.b_channel.queue_occupancy_limit = 2
|
||||
class AxiRamRead(AxiSlaveRead, Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
|
||||
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.log.info("AXI RAM model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
|
||||
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI RAM model signals:")
|
||||
for bus in (self.bus.aw, self.bus.w, self.bus.b):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
|
||||
|
||||
self._process_write_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_write_cr is not None:
|
||||
self._process_write_cr.kill()
|
||||
self._process_write_cr = None
|
||||
|
||||
self.aw_channel.clear()
|
||||
self.w_channel.clear()
|
||||
self.b_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_write_cr is None:
|
||||
self._process_write_cr = cocotb.fork(self._process_write())
|
||||
|
||||
async def _process_write(self):
|
||||
while True:
|
||||
aw = await self.aw_channel.recv()
|
||||
|
||||
awid = int(aw.awid)
|
||||
addr = int(aw.awaddr)
|
||||
length = int(aw.awlen)
|
||||
size = int(aw.awsize)
|
||||
burst = int(aw.awburst)
|
||||
prot = AxiProt(int(aw.awprot))
|
||||
|
||||
self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
|
||||
awid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_lanes
|
||||
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
length += 1
|
||||
|
||||
transfer_size = num_bytes*length
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
lower_wrap_boundary = (addr // transfer_size) * transfer_size
|
||||
upper_wrap_boundary = lower_wrap_boundary + transfer_size
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
for n in range(length):
|
||||
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
|
||||
|
||||
w = await self.w_channel.recv()
|
||||
|
||||
data = int(w.wdata)
|
||||
strb = int(w.wstrb)
|
||||
last = int(w.wlast)
|
||||
|
||||
# todo latency
|
||||
|
||||
self.mem.seek(cur_word_addr % self.size)
|
||||
|
||||
data = data.to_bytes(self.byte_lanes, 'little')
|
||||
|
||||
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
|
||||
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_lanes):
|
||||
if strb & (1 << i):
|
||||
self.mem.write(data[i:i+1])
|
||||
else:
|
||||
self.mem.seek(1, 1)
|
||||
|
||||
assert last == (n == length-1)
|
||||
|
||||
if burst != AxiBurstType.FIXED:
|
||||
cur_addr += num_bytes
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
if cur_addr == upper_wrap_boundary:
|
||||
cur_addr = lower_wrap_boundary
|
||||
|
||||
b = self.b_channel._transaction_obj()
|
||||
b.bid = awid
|
||||
b.bresp = AxiResp.OKAY
|
||||
|
||||
await self.b_channel.send(b)
|
||||
|
||||
|
||||
class AxiRamRead(Memory, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
|
||||
self.log.info("AXI RAM model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
|
||||
self.ar_channel.queue_occupancy_limit = 2
|
||||
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
|
||||
self.r_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
|
||||
self.log.info("AXI RAM model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
|
||||
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI RAM model signals:")
|
||||
for bus in (self.bus.ar, self.bus.r):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
|
||||
|
||||
self._process_read_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_read_cr is not None:
|
||||
self._process_read_cr.kill()
|
||||
self._process_read_cr = None
|
||||
|
||||
self.ar_channel.clear()
|
||||
self.r_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_read_cr is None:
|
||||
self._process_read_cr = cocotb.fork(self._process_read())
|
||||
|
||||
async def _process_read(self):
|
||||
while True:
|
||||
ar = await self.ar_channel.recv()
|
||||
|
||||
arid = int(ar.arid)
|
||||
addr = int(ar.araddr)
|
||||
length = int(ar.arlen)
|
||||
size = int(ar.arsize)
|
||||
burst = int(ar.arburst)
|
||||
prot = AxiProt(ar.arprot)
|
||||
|
||||
self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
|
||||
arid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_lanes
|
||||
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
length += 1
|
||||
|
||||
transfer_size = num_bytes*length
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
lower_wrap_boundary = (addr // transfer_size) * transfer_size
|
||||
upper_wrap_boundary = lower_wrap_boundary + transfer_size
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
for n in range(length):
|
||||
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
|
||||
|
||||
self.mem.seek(cur_word_addr % self.size)
|
||||
|
||||
data = self.mem.read(self.byte_lanes)
|
||||
|
||||
r = self.r_channel._transaction_obj()
|
||||
r.rid = arid
|
||||
r.rdata = int.from_bytes(data, 'little')
|
||||
r.rlast = n == length-1
|
||||
r.rresp = AxiResp.OKAY
|
||||
|
||||
await self.r_channel.send(r)
|
||||
|
||||
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
|
||||
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
if burst != AxiBurstType.FIXED:
|
||||
cur_addr += num_bytes
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
if cur_addr == upper_wrap_boundary:
|
||||
cur_addr = lower_wrap_boundary
|
||||
async def _read(self, address, length):
|
||||
return self.read(address % self.size, length)
|
||||
|
||||
|
||||
class AxiRam(Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
super().__init__(size, mem, **kwargs)
|
||||
|
||||
self.write_if = AxiRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem)
|
||||
self.read_if = AxiRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem)
|
||||
|
||||
335
cocotbext/axi/axi_slave.py
Normal file
335
cocotbext/axi/axi_slave.py
Normal file
@@ -0,0 +1,335 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import cocotb
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiBurstType, AxiProt, AxiResp
|
||||
from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
class AxiSlaveWrite(Reset):
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
|
||||
self.log.info("AXI slave model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
|
||||
self.aw_channel.queue_occupancy_limit = 2
|
||||
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
|
||||
self.w_channel.queue_occupancy_limit = 2
|
||||
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
|
||||
self.b_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.address_width = len(self.aw_channel.bus.awaddr)
|
||||
self.id_width = len(self.aw_channel.bus.awid)
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.max_burst_size = (self.byte_lanes-1).bit_length()
|
||||
|
||||
self.log.info("AXI slave model configuration:")
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" ID width: %d bits", self.id_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI slave model signals:")
|
||||
for bus in (self.bus.aw, self.bus.w, self.bus.b):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
|
||||
|
||||
self._process_write_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
async def _write(self, address, data):
|
||||
await self.target.write(address, data)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_write_cr is not None:
|
||||
self._process_write_cr.kill()
|
||||
self._process_write_cr = None
|
||||
|
||||
self.aw_channel.clear()
|
||||
self.w_channel.clear()
|
||||
self.b_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_write_cr is None:
|
||||
self._process_write_cr = cocotb.fork(self._process_write())
|
||||
|
||||
async def _process_write(self):
|
||||
while True:
|
||||
aw = await self.aw_channel.recv()
|
||||
|
||||
awid = int(getattr(aw, 'awid', 0))
|
||||
addr = int(aw.awaddr)
|
||||
length = int(getattr(aw, 'awlen', 0))
|
||||
size = int(getattr(aw, 'awsize', self.max_burst_size))
|
||||
burst = AxiBurstType(getattr(aw, 'awburst', AxiBurstType.INCR))
|
||||
prot = AxiProt(getattr(aw, 'awprot', AxiProt.NONSECURE))
|
||||
|
||||
self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
|
||||
awid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_lanes
|
||||
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
length += 1
|
||||
|
||||
transfer_size = num_bytes*length
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
lower_wrap_boundary = (addr // transfer_size) * transfer_size
|
||||
upper_wrap_boundary = lower_wrap_boundary + transfer_size
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
b = self.b_channel._transaction_obj()
|
||||
b.bid = awid
|
||||
b.bresp = AxiResp.OKAY
|
||||
|
||||
for n in range(length):
|
||||
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
|
||||
|
||||
w = await self.w_channel.recv()
|
||||
|
||||
data = int(w.wdata)
|
||||
strb = int(getattr(w, 'wstrb', self.strb_mask))
|
||||
last = int(w.wlast)
|
||||
|
||||
# generate operation list
|
||||
offset = 0
|
||||
start_offset = None
|
||||
write_ops = []
|
||||
|
||||
data = data.to_bytes(self.byte_lanes, 'little')
|
||||
|
||||
if self.log.isEnabledFor(logging.DEBUG):
|
||||
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
|
||||
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_lanes):
|
||||
if strb & (1 << i):
|
||||
if start_offset is None:
|
||||
start_offset = offset
|
||||
else:
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((cur_word_addr+start_offset, data[start_offset:offset]))
|
||||
start_offset = None
|
||||
|
||||
offset += 1
|
||||
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((cur_word_addr+start_offset, data[start_offset:offset]))
|
||||
|
||||
# perform writes
|
||||
try:
|
||||
for addr, data in write_ops:
|
||||
await self._write(addr, data)
|
||||
except Exception:
|
||||
self.log.warning("Write operation failed")
|
||||
b.bresp = AxiResp.SLVERR
|
||||
|
||||
assert last == (n == length-1)
|
||||
|
||||
if burst != AxiBurstType.FIXED:
|
||||
cur_addr += num_bytes
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
if cur_addr == upper_wrap_boundary:
|
||||
cur_addr = lower_wrap_boundary
|
||||
|
||||
await self.b_channel.send(b)
|
||||
|
||||
|
||||
class AxiSlaveRead(Reset):
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
|
||||
self.log.info("AXI slave model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
|
||||
self.ar_channel.queue_occupancy_limit = 2
|
||||
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
|
||||
self.r_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.address_width = len(self.ar_channel.bus.araddr)
|
||||
self.id_width = len(self.ar_channel.bus.arid)
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
|
||||
self.max_burst_size = (self.byte_lanes-1).bit_length()
|
||||
|
||||
self.log.info("AXI slave model configuration:")
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" ID width: %d bits", self.id_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI slave model signals:")
|
||||
for bus in (self.bus.ar, self.bus.r):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
|
||||
|
||||
self._process_read_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
async def _read(self, address, length):
|
||||
return await self.target.read(address, length)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_read_cr is not None:
|
||||
self._process_read_cr.kill()
|
||||
self._process_read_cr = None
|
||||
|
||||
self.ar_channel.clear()
|
||||
self.r_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_read_cr is None:
|
||||
self._process_read_cr = cocotb.fork(self._process_read())
|
||||
|
||||
async def _process_read(self):
|
||||
while True:
|
||||
ar = await self.ar_channel.recv()
|
||||
|
||||
arid = int(getattr(ar, 'arid', 0))
|
||||
addr = int(ar.araddr)
|
||||
length = int(getattr(ar, 'arlen', 0))
|
||||
size = int(getattr(ar, 'arsize', self.max_burst_size))
|
||||
burst = AxiBurstType(getattr(ar, 'arburst', AxiBurstType.INCR))
|
||||
prot = AxiProt(getattr(ar, 'arprot', AxiProt.NONSECURE))
|
||||
|
||||
self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
|
||||
arid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_lanes
|
||||
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
length += 1
|
||||
|
||||
transfer_size = num_bytes*length
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
lower_wrap_boundary = (addr // transfer_size) * transfer_size
|
||||
upper_wrap_boundary = lower_wrap_boundary + transfer_size
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
for n in range(length):
|
||||
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
|
||||
|
||||
r = self.r_channel._transaction_obj()
|
||||
r.rid = arid
|
||||
r.rlast = n == length-1
|
||||
r.rresp = AxiResp.OKAY
|
||||
|
||||
try:
|
||||
data = await self._read(cur_word_addr, self.byte_lanes)
|
||||
except Exception:
|
||||
self.log.warning("Read operation failed")
|
||||
data = bytes(self.byte_lanes)
|
||||
r.rresp = AxiResp.SLVERR
|
||||
|
||||
r.rdata = int.from_bytes(data, 'little')
|
||||
|
||||
await self.r_channel.send(r)
|
||||
|
||||
if self.log.isEnabledFor(logging.DEBUG):
|
||||
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
|
||||
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
if burst != AxiBurstType.FIXED:
|
||||
cur_addr += num_bytes
|
||||
|
||||
if burst == AxiBurstType.WRAP:
|
||||
if cur_addr == upper_wrap_boundary:
|
||||
cur_addr = lower_wrap_boundary
|
||||
|
||||
|
||||
class AxiSlave:
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.write_if = AxiSlaveWrite(bus.write, clock, reset, target, reset_active_level)
|
||||
self.read_if = AxiSlaveRead(bus.read, clock, reset, target, reset_active_level)
|
||||
@@ -23,7 +23,7 @@ THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from typing import NamedTuple
|
||||
|
||||
import cocotb
|
||||
from cocotb.queue import Queue
|
||||
@@ -32,21 +32,59 @@ from cocotb.triggers import Event
|
||||
from .version import __version__
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSource, AxiLiteWSource, AxiLiteBSink, AxiLiteARSource, AxiLiteRSink
|
||||
from .address_space import Region
|
||||
from .reset import Reset
|
||||
|
||||
# AXI lite master write
|
||||
AxiLiteWriteCmd = namedtuple("AxiLiteWriteCmd", ["address", "data", "prot", "event"])
|
||||
AxiLiteWriteRespCmd = namedtuple("AxiLiteWriteRespCmd", ["address", "length", "cycles", "prot", "event"])
|
||||
AxiLiteWriteResp = namedtuple("AxiLiteWriteResp", ["address", "length", "resp"])
|
||||
|
||||
# AXI lite master read
|
||||
AxiLiteReadCmd = namedtuple("AxiLiteReadCmd", ["address", "length", "prot", "event"])
|
||||
AxiLiteReadRespCmd = namedtuple("AxiLiteReadRespCmd", ["address", "length", "cycles", "prot", "event"])
|
||||
AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
|
||||
# AXI lite master write helper objects
|
||||
class AxiLiteWriteCmd(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiLiteMasterWrite(Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True):
|
||||
class AxiLiteWriteRespCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
cycles: int
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiLiteWriteResp(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
resp: AxiResp
|
||||
|
||||
|
||||
# AXI lite master read helper objects
|
||||
class AxiLiteReadCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiLiteReadRespCmd(NamedTuple):
|
||||
address: int
|
||||
length: int
|
||||
cycles: int
|
||||
prot: AxiProt
|
||||
event: Event
|
||||
|
||||
|
||||
class AxiLiteReadResp(NamedTuple):
|
||||
address: int
|
||||
data: bytes
|
||||
resp: AxiResp
|
||||
|
||||
def __bytes__(self):
|
||||
return self.data
|
||||
|
||||
|
||||
class AxiLiteMasterWrite(Region, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
@@ -74,6 +112,7 @@ class AxiLiteMasterWrite(Reset):
|
||||
self._idle = Event()
|
||||
self._idle.set()
|
||||
|
||||
self.address_width = len(self.aw_channel.bus.awaddr)
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
@@ -81,8 +120,10 @@ class AxiLiteMasterWrite(Reset):
|
||||
|
||||
self.awprot_present = hasattr(self.bus.aw, "awprot")
|
||||
|
||||
super().__init__(2**self.address_width, **kwargs)
|
||||
|
||||
self.log.info("AXI lite master configuration:")
|
||||
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
@@ -109,13 +150,19 @@ class AxiLiteMasterWrite(Reset):
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if isinstance(data, int):
|
||||
raise ValueError("Expected bytes or bytearray for data")
|
||||
|
||||
if not self.awprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("awprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
self.in_flight_operations += 1
|
||||
self._idle.clear()
|
||||
|
||||
self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event))
|
||||
self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytes(data), prot, event))
|
||||
|
||||
return event
|
||||
|
||||
@@ -131,31 +178,6 @@ class AxiLiteMasterWrite(Reset):
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
words = data
|
||||
data = bytearray()
|
||||
for w in words:
|
||||
data.extend(w.to_bytes(ws, byteorder))
|
||||
await self.write(address, data, prot)
|
||||
|
||||
async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
await self.write_words(address, data, byteorder, 4, prot)
|
||||
|
||||
async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
await self.write_words(address, data, byteorder, 8, prot)
|
||||
|
||||
async def write_byte(self, address, data, prot=AxiProt.NONSECURE):
|
||||
await self.write(address, [data], prot)
|
||||
|
||||
async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
await self.write_words(address, [data], byteorder, ws, prot)
|
||||
|
||||
async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
await self.write_dwords(address, [data], byteorder, prot)
|
||||
|
||||
async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
await self.write_qwords(address, [data], byteorder, prot)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
@@ -222,8 +244,9 @@ class AxiLiteMasterWrite(Reset):
|
||||
|
||||
offset = 0
|
||||
|
||||
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
|
||||
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
|
||||
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
|
||||
|
||||
for k in range(cycles):
|
||||
start = 0
|
||||
@@ -265,13 +288,13 @@ class AxiLiteMasterWrite(Reset):
|
||||
for k in range(cmd.cycles):
|
||||
b = await self.b_channel.recv()
|
||||
|
||||
cycle_resp = AxiResp(b.bresp)
|
||||
cycle_resp = AxiResp(getattr(b, 'bresp', AxiResp.OKAY))
|
||||
|
||||
if cycle_resp != AxiResp.OKAY:
|
||||
resp = cycle_resp
|
||||
|
||||
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
|
||||
cmd.address, cmd.prot, resp, cmd.length)
|
||||
cmd.address, cmd.prot, resp, cmd.length)
|
||||
|
||||
write_resp = AxiLiteWriteResp(cmd.address, cmd.length, resp)
|
||||
|
||||
@@ -285,8 +308,8 @@ class AxiLiteMasterWrite(Reset):
|
||||
self._idle.set()
|
||||
|
||||
|
||||
class AxiLiteMasterRead(Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True):
|
||||
class AxiLiteMasterRead(Region, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
@@ -312,14 +335,17 @@ class AxiLiteMasterRead(Reset):
|
||||
self._idle = Event()
|
||||
self._idle.set()
|
||||
|
||||
self.address_width = len(self.ar_channel.bus.araddr)
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
|
||||
self.arprot_present = hasattr(self.bus.ar, "arprot")
|
||||
|
||||
super().__init__(2**self.address_width, **kwargs)
|
||||
|
||||
self.log.info("AXI lite master configuration:")
|
||||
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
@@ -345,6 +371,9 @@ class AxiLiteMasterRead(Reset):
|
||||
if not isinstance(event, Event):
|
||||
raise ValueError("Expected event object")
|
||||
|
||||
if address < 0 or address >= 2**self.address_width:
|
||||
raise ValueError("Address out of range")
|
||||
|
||||
if not self.arprot_present and prot != AxiProt.NONSECURE:
|
||||
raise ValueError("arprot sideband signal value specified, but signal is not connected")
|
||||
|
||||
@@ -367,31 +396,6 @@ class AxiLiteMasterRead(Reset):
|
||||
await event.wait()
|
||||
return event.data
|
||||
|
||||
async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
data = await self.read(address, count*ws, prot)
|
||||
words = []
|
||||
for k in range(count):
|
||||
words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder))
|
||||
return words
|
||||
|
||||
async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_words(address, count, byteorder, 4, prot)
|
||||
|
||||
async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_words(address, count, byteorder, 8, prot)
|
||||
|
||||
async def read_byte(self, address, prot=AxiProt.NONSECURE):
|
||||
return (await self.read(address, 1, prot)).data[0]
|
||||
|
||||
async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
return (await self.read_words(address, 1, byteorder, ws, prot))[0]
|
||||
|
||||
async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return (await self.read_dwords(address, 1, byteorder, prot))[0]
|
||||
|
||||
async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return (await self.read_qwords(address, 1, byteorder, prot))[0]
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
@@ -450,7 +454,7 @@ class AxiLiteMasterRead(Reset):
|
||||
await self.int_read_resp_command_queue.put(resp_cmd)
|
||||
|
||||
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
|
||||
cmd.address, cmd.prot, cmd.length)
|
||||
cmd.address, cmd.prot, cmd.length)
|
||||
|
||||
for k in range(cycles):
|
||||
ar = self.ar_channel._transaction_obj()
|
||||
@@ -477,7 +481,7 @@ class AxiLiteMasterRead(Reset):
|
||||
r = await self.r_channel.recv()
|
||||
|
||||
cycle_data = int(r.rdata)
|
||||
cycle_resp = AxiResp(r.rresp)
|
||||
cycle_resp = AxiResp(getattr(r, 'rresp', AxiResp.OKAY))
|
||||
|
||||
if cycle_resp != AxiResp.OKAY:
|
||||
resp = cycle_resp
|
||||
@@ -493,10 +497,11 @@ class AxiLiteMasterRead(Reset):
|
||||
for j in range(start, stop):
|
||||
data.extend(bytearray([(cycle_data >> j*8) & 0xff]))
|
||||
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
read_resp = AxiLiteReadResp(cmd.address, data, resp)
|
||||
read_resp = AxiLiteReadResp(cmd.address, bytes(data), resp)
|
||||
|
||||
cmd.event.set(read_resp)
|
||||
|
||||
@@ -508,13 +513,15 @@ class AxiLiteMasterRead(Reset):
|
||||
self._idle.set()
|
||||
|
||||
|
||||
class AxiLiteMaster:
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True):
|
||||
class AxiLiteMaster(Region):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level)
|
||||
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level)
|
||||
self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level, **kwargs)
|
||||
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level, **kwargs)
|
||||
|
||||
super().__init__(max(self.write_if.size, self.read_if.size), **kwargs)
|
||||
|
||||
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
|
||||
return self.read_if.init_read(address, length, prot, event)
|
||||
@@ -539,47 +546,5 @@ class AxiLiteMaster:
|
||||
async def read(self, address, length, prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read(address, length, prot)
|
||||
|
||||
async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_words(address, count, byteorder, ws, prot)
|
||||
|
||||
async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_dwords(address, count, byteorder, prot)
|
||||
|
||||
async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_qwords(address, count, byteorder, prot)
|
||||
|
||||
async def read_byte(self, address, prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_byte(address, prot)
|
||||
|
||||
async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_word(address, byteorder, ws, prot)
|
||||
|
||||
async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_dword(address, byteorder, prot)
|
||||
|
||||
async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.read_if.read_qword(address, byteorder, prot)
|
||||
|
||||
async def write(self, address, data, prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write(address, data, prot)
|
||||
|
||||
async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_words(address, data, byteorder, ws, prot)
|
||||
|
||||
async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_dwords(address, data, byteorder, prot)
|
||||
|
||||
async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_qwords(address, data, byteorder, prot)
|
||||
|
||||
async def write_byte(self, address, data, prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_byte(address, data, prot)
|
||||
|
||||
async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_word(address, data, byteorder, ws, prot)
|
||||
|
||||
async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_dword(address, data, byteorder, prot)
|
||||
|
||||
async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_qword(address, data, byteorder, prot)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2020 Alex Forencich
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
@@ -22,198 +22,32 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import cocotb
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
|
||||
from .axil_slave import AxiLiteSlaveWrite, AxiLiteSlaveRead
|
||||
from .memory import Memory
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
class AxiLiteRamWrite(Memory, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
class AxiLiteRamWrite(AxiLiteSlaveWrite, Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
|
||||
|
||||
self.log.info("AXI lite RAM model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
|
||||
self.aw_channel.queue_occupancy_limit = 2
|
||||
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
|
||||
self.w_channel.queue_occupancy_limit = 2
|
||||
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
|
||||
self.b_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.log.info("AXI lite RAM model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI lite RAM model signals:")
|
||||
for bus in (self.bus.aw, self.bus.w, self.bus.b):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self._process_write_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_write_cr is not None:
|
||||
self._process_write_cr.kill()
|
||||
self._process_write_cr = None
|
||||
|
||||
self.aw_channel.clear()
|
||||
self.w_channel.clear()
|
||||
self.b_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_write_cr is None:
|
||||
self._process_write_cr = cocotb.fork(self._process_write())
|
||||
|
||||
async def _process_write(self):
|
||||
while True:
|
||||
aw = await self.aw_channel.recv()
|
||||
|
||||
addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes
|
||||
prot = AxiProt(aw.awprot)
|
||||
|
||||
w = await self.w_channel.recv()
|
||||
|
||||
data = int(w.wdata)
|
||||
strb = int(w.wstrb)
|
||||
|
||||
# todo latency
|
||||
|
||||
self.mem.seek(addr % self.size)
|
||||
|
||||
data = data.to_bytes(self.byte_lanes, 'little')
|
||||
|
||||
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
|
||||
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_lanes):
|
||||
if strb & (1 << i):
|
||||
self.mem.write(data[i:i+1])
|
||||
else:
|
||||
self.mem.seek(1, 1)
|
||||
|
||||
b = self.b_channel._transaction_obj()
|
||||
b.bresp = AxiResp.OKAY
|
||||
|
||||
await self.b_channel.send(b)
|
||||
async def _write(self, address, data):
|
||||
self.write(address % self.size, data)
|
||||
|
||||
|
||||
class AxiLiteRamRead(Memory, Reset):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
class AxiLiteRamRead(AxiLiteSlaveRead, Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
|
||||
|
||||
self.log.info("AXI lite RAM model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
|
||||
self.ar_channel.queue_occupancy_limit = 2
|
||||
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
|
||||
self.r_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
|
||||
self.log.info("AXI lite RAM model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI lite RAM model signals:")
|
||||
for bus in (self.bus.ar, self.bus.r):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self._process_read_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_read_cr is not None:
|
||||
self._process_read_cr.kill()
|
||||
self._process_read_cr = None
|
||||
|
||||
self.ar_channel.clear()
|
||||
self.r_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_read_cr is None:
|
||||
self._process_read_cr = cocotb.fork(self._process_read())
|
||||
|
||||
async def _process_read(self):
|
||||
while True:
|
||||
ar = await self.ar_channel.recv()
|
||||
|
||||
addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes
|
||||
prot = AxiProt(ar.arprot)
|
||||
|
||||
# todo latency
|
||||
|
||||
self.mem.seek(addr % self.size)
|
||||
|
||||
data = self.mem.read(self.byte_lanes)
|
||||
|
||||
r = self.r_channel._transaction_obj()
|
||||
r.rdata = int.from_bytes(data, 'little')
|
||||
r.rresp = AxiResp.OKAY
|
||||
|
||||
await self.r_channel.send(r)
|
||||
|
||||
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
async def _read(self, address, length):
|
||||
return self.read(address % self.size, length)
|
||||
|
||||
|
||||
class AxiLiteRam(Memory):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
|
||||
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
super().__init__(size, mem, **kwargs)
|
||||
|
||||
self.write_if = AxiLiteRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem)
|
||||
self.read_if = AxiLiteRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem)
|
||||
|
||||
249
cocotbext/axi/axil_slave.py
Normal file
249
cocotbext/axi/axil_slave.py
Normal file
@@ -0,0 +1,249 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import cocotb
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
|
||||
from .reset import Reset
|
||||
|
||||
|
||||
class AxiLiteSlaveWrite(Reset):
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
|
||||
|
||||
self.log.info("AXI lite slave model (write)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
|
||||
self.aw_channel.queue_occupancy_limit = 2
|
||||
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
|
||||
self.w_channel.queue_occupancy_limit = 2
|
||||
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
|
||||
self.b_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.address_width = len(self.aw_channel.bus.awaddr)
|
||||
self.width = len(self.w_channel.bus.wdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
self.strb_mask = 2**self.byte_lanes-1
|
||||
|
||||
self.log.info("AXI lite slave model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI lite slave model signals:")
|
||||
for bus in (self.bus.aw, self.bus.w, self.bus.b):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self._process_write_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
async def _write(self, address, data):
|
||||
await self.target.write(address, data)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_write_cr is not None:
|
||||
self._process_write_cr.kill()
|
||||
self._process_write_cr = None
|
||||
|
||||
self.aw_channel.clear()
|
||||
self.w_channel.clear()
|
||||
self.b_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_write_cr is None:
|
||||
self._process_write_cr = cocotb.fork(self._process_write())
|
||||
|
||||
async def _process_write(self):
|
||||
while True:
|
||||
aw = await self.aw_channel.recv()
|
||||
|
||||
addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes
|
||||
prot = AxiProt(getattr(aw, 'awprot', AxiProt.NONSECURE))
|
||||
|
||||
w = await self.w_channel.recv()
|
||||
|
||||
data = int(w.wdata)
|
||||
strb = int(getattr(w, 'wstrb', self.strb_mask))
|
||||
|
||||
# generate operation list
|
||||
offset = 0
|
||||
start_offset = None
|
||||
write_ops = []
|
||||
|
||||
data = data.to_bytes(self.byte_lanes, 'little')
|
||||
|
||||
b = self.b_channel._transaction_obj()
|
||||
b.bresp = AxiResp.OKAY
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
|
||||
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_lanes):
|
||||
if strb & (1 << i):
|
||||
if start_offset is None:
|
||||
start_offset = offset
|
||||
else:
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((addr+start_offset, data[start_offset:offset]))
|
||||
start_offset = None
|
||||
|
||||
offset += 1
|
||||
|
||||
if start_offset is not None and offset != start_offset:
|
||||
write_ops.append((addr+start_offset, data[start_offset:offset]))
|
||||
|
||||
# perform writes
|
||||
try:
|
||||
for addr, data in write_ops:
|
||||
await self._write(addr, data)
|
||||
except Exception:
|
||||
self.log.warning("Write operation failed")
|
||||
b.bresp = AxiResp.SLVERR
|
||||
|
||||
await self.b_channel.send(b)
|
||||
|
||||
|
||||
class AxiLiteSlaveRead(Reset):
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.bus = bus
|
||||
self.clock = clock
|
||||
self.reset = reset
|
||||
self.target = target
|
||||
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
|
||||
|
||||
self.log.info("AXI lite slave model (read)")
|
||||
self.log.info("cocotbext-axi version %s", __version__)
|
||||
self.log.info("Copyright (c) 2021 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
|
||||
self.ar_channel.queue_occupancy_limit = 2
|
||||
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
|
||||
self.r_channel.queue_occupancy_limit = 2
|
||||
|
||||
self.address_width = len(self.ar_channel.bus.araddr)
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
self.byte_size = 8
|
||||
self.byte_lanes = self.width // self.byte_size
|
||||
|
||||
self.log.info("AXI lite slave model configuration:")
|
||||
self.log.info(" Memory size: %d bytes", len(self.mem))
|
||||
self.log.info(" Address width: %d bits", self.address_width)
|
||||
self.log.info(" Byte size: %d bits", self.byte_size)
|
||||
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
|
||||
|
||||
self.log.info("AXI lite slave model signals:")
|
||||
for bus in (self.bus.ar, self.bus.r):
|
||||
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
|
||||
if hasattr(bus, sig):
|
||||
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
|
||||
else:
|
||||
self.log.info(" %s: not present", sig)
|
||||
|
||||
assert self.byte_lanes * self.byte_size == self.width
|
||||
|
||||
self._process_read_cr = None
|
||||
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
async def _read(self, address, length):
|
||||
return await self.target.read(address, length)
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
self.log.info("Reset asserted")
|
||||
if self._process_read_cr is not None:
|
||||
self._process_read_cr.kill()
|
||||
self._process_read_cr = None
|
||||
|
||||
self.ar_channel.clear()
|
||||
self.r_channel.clear()
|
||||
else:
|
||||
self.log.info("Reset de-asserted")
|
||||
if self._process_read_cr is None:
|
||||
self._process_read_cr = cocotb.fork(self._process_read())
|
||||
|
||||
async def _process_read(self):
|
||||
while True:
|
||||
ar = await self.ar_channel.recv()
|
||||
|
||||
addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes
|
||||
prot = AxiProt(getattr(ar, 'arprot', AxiProt.NONSECURE))
|
||||
|
||||
r = self.r_channel._transaction_obj()
|
||||
r.rresp = AxiResp.OKAY
|
||||
|
||||
try:
|
||||
data = await self._read(addr, self.byte_lanes)
|
||||
except Exception:
|
||||
self.log.warning("Read operation failed")
|
||||
data = bytes(self.byte_lanes)
|
||||
r.rresp = AxiResp.SLVERR
|
||||
|
||||
r.rdata = int.from_bytes(data, 'little')
|
||||
|
||||
await self.r_channel.send(r)
|
||||
|
||||
if self.log.isEnabledFor(logging.INFO):
|
||||
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
|
||||
class AxiLiteSlave:
|
||||
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.write_if = AxiLiteSlaveWrite(target, bus.write, clock, reset, reset_active_level)
|
||||
self.read_if = AxiLiteSlaveRead(target, bus.read, clock, reset, reset_active_level)
|
||||
92
cocotbext/axi/buddy_allocator.py
Normal file
92
cocotbext/axi/buddy_allocator.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class BuddyAllocator:
|
||||
def __init__(self, size, min_alloc=1):
|
||||
self.size = size
|
||||
self.min_alloc = min_alloc
|
||||
|
||||
self.free_lists = [[] for x in range((self.size-1).bit_length())]
|
||||
self.free_lists.append([0])
|
||||
self.allocations = {}
|
||||
|
||||
def alloc(self, size):
|
||||
if size < 1 or size > self.size:
|
||||
raise ValueError("size out of range")
|
||||
|
||||
size = max(size, self.min_alloc)
|
||||
|
||||
bucket = (size-1).bit_length()
|
||||
orig_bucket = bucket
|
||||
|
||||
while bucket < len(self.free_lists):
|
||||
if not self.free_lists[bucket]:
|
||||
# find free block
|
||||
bucket += 1
|
||||
continue
|
||||
|
||||
while bucket > orig_bucket:
|
||||
# split block
|
||||
block = self.free_lists[bucket].pop(0)
|
||||
bucket -= 1
|
||||
self.free_lists[bucket].append(block)
|
||||
self.free_lists[bucket].append(block+2**bucket)
|
||||
|
||||
if self.free_lists[bucket]:
|
||||
# allocate
|
||||
block = self.free_lists[bucket].pop(0)
|
||||
self.allocations[block] = bucket
|
||||
return block
|
||||
|
||||
break
|
||||
|
||||
raise Exception("out of memory")
|
||||
|
||||
def free(self, addr):
|
||||
if addr not in self.allocations:
|
||||
raise ValueError("unknown allocation")
|
||||
|
||||
bucket = self.allocations.pop(addr)
|
||||
|
||||
while bucket < len(self.free_lists):
|
||||
size = 2**bucket
|
||||
|
||||
# find buddy
|
||||
if (addr // size) % 2:
|
||||
buddy = addr - size
|
||||
else:
|
||||
buddy = addr + size
|
||||
|
||||
if buddy in self.free_lists[bucket]:
|
||||
# buddy is free, merge
|
||||
self.free_lists[bucket].remove(buddy)
|
||||
addr = min(addr, buddy)
|
||||
bucket += 1
|
||||
else:
|
||||
# buddy is not free, so add to free list
|
||||
self.free_lists[bucket].append(addr)
|
||||
return
|
||||
|
||||
raise Exception("failed to free memory")
|
||||
@@ -28,13 +28,13 @@ from .utils import hexdump, hexdump_lines, hexdump_str
|
||||
|
||||
|
||||
class Memory:
|
||||
def __init__(self, size=1024, mem=None, *args, **kwargs):
|
||||
def __init__(self, size=1024, mem=None, **kwargs):
|
||||
if mem is not None:
|
||||
self.mem = mem
|
||||
else:
|
||||
self.mem = mmap.mmap(-1, size)
|
||||
self.size = len(self.mem)
|
||||
super().__init__(*args, **kwargs)
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def read(self, address, length):
|
||||
self.mem.seek(address)
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.1.14"
|
||||
__version__ = "0.1.16"
|
||||
|
||||
44
tests/test_buddy_allocator.py
Normal file
44
tests/test_buddy_allocator.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""
|
||||
|
||||
Copyright (c) 2021 Alex Forencich
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
from cocotbext.axi.buddy_allocator import BuddyAllocator
|
||||
|
||||
|
||||
def test_allocator():
|
||||
ba = BuddyAllocator(1024)
|
||||
|
||||
lst = []
|
||||
|
||||
for k in range(1, 32):
|
||||
print(f"Alloc {k} bytes")
|
||||
addr = ba.alloc(k)
|
||||
print(f"Got address {addr}")
|
||||
assert addr & (2**((k-1).bit_length())-1) == 0
|
||||
lst.append(addr)
|
||||
|
||||
for addr in lst:
|
||||
print(f"Free {addr}")
|
||||
ba.free(addr)
|
||||
|
||||
assert ba.free_lists[-1] == [0]
|
||||
Reference in New Issue
Block a user