45 Commits

Author SHA1 Message Date
Alex Forencich
74dd47ca99 Release v0.1.14 2021-11-07 12:39:42 -08:00
Reto Meier
cde2056bb0 Remove deprecated <= assignments
Starting from cocotb v1.6 the use of <= syntax has been deprecated. This
commit replaces all use of this syntax with the ``.value =`` syntax.
2021-10-25 18:27:18 +02:00
Alex Forencich
b6870716ed Fix active state tracking for AXI stream sink/monitor 2021-09-15 00:46:01 -07:00
Alex Forencich
44da562db9 Add cocotb framework classifier 2021-08-31 14:43:59 -07:00
Alex Forencich
8dcdbfefb8 Bump to dev version 2021-04-12 22:56:51 -07:00
Alex Forencich
b8919a095b Release v0.1.12 2021-04-12 22:46:32 -07:00
Alex Forencich
bc7edec289 Make resp and prot signals optional 2021-04-12 22:04:22 -07:00
Alex Forencich
e7c3a31eb0 Improve handling for optional signals 2021-04-12 21:24:33 -07:00
Alex Forencich
ce907ffbb9 Print out signal summary 2021-04-12 19:29:41 -07:00
Alex Forencich
95e2d5800d Store parameters 2021-04-12 19:27:38 -07:00
Alex Forencich
82853b31ff Rename byte_width to byte_lanes 2021-04-12 15:08:30 -07:00
Alex Forencich
8bbabd92df Update readme 2021-04-12 15:07:26 -07:00
Alex Forencich
c060f6c963 Transmit data without using pop 2021-04-12 13:53:28 -07:00
Alex Forencich
a767e00ce5 Transmit frames without using pop 2021-04-12 13:49:20 -07:00
Alex Forencich
d1d7313b98 Add tag context manager to AXI master to reuse per-ID processing components 2021-04-08 19:03:46 -07:00
Alex Forencich
01b43b97f2 Update readme 2021-04-08 19:01:27 -07:00
Alex Forencich
b5b6df84fe Improve burst handling in AXI master 2021-03-25 18:03:36 -07:00
Alex Forencich
babe69f4d3 Bump to dev version 2021-03-24 21:50:10 -07:00
Alex Forencich
c4873ad14c Release v0.1.10 2021-03-24 21:03:16 -07:00
Alex Forencich
77a40bdc8f Limit channel queue depth 2021-03-24 17:55:07 -07:00
Alex Forencich
f991096272 Separate processing coroutines for each ID 2021-03-24 17:07:16 -07:00
Alex Forencich
9e28bd7fbb Revert back to cocotb.fork 2021-03-24 16:20:08 -07:00
Alex Forencich
8c74f747a4 Update readme 2021-03-22 23:19:14 -07:00
Alex Forencich
a285f008ca Refactor reset handling code 2021-03-22 22:02:53 -07:00
Alex Forencich
c677ab245c Reset more internal state 2021-03-22 22:02:06 -07:00
Alex Forencich
11f9db8b06 Add test cases for init_read and init_write 2021-03-22 21:22:13 -07:00
Alex Forencich
344ec8d4ce Return event object from init_read and init_write; remove get_write_resp and get_read_data 2021-03-22 21:21:34 -07:00
Alex Forencich
4ff390481e Extract parameter values from cocotb.top 2021-03-22 15:51:07 -07:00
Alex Forencich
4bee96ea9a Enforce max queue depth on streaming sources 2021-03-21 22:24:59 -07:00
Alex Forencich
a66dfea6f7 Factor out common recv code; throw QueueEmpty exception in get_nowait 2021-03-21 21:02:28 -07:00
Alex Forencich
f1a89e6c12 Trigger transmit complete events when flushing queue to prevent deadlocks 2021-03-21 18:46:30 -07:00
Alex Forencich
11205bde46 Handle dropped transmit frames during reset 2021-03-21 18:39:35 -07:00
Alex Forencich
bce364eef5 Ensure idle event is set when queue is empty 2021-03-21 18:39:10 -07:00
Alex Forencich
e934b69776 Use start_soon instead of fork 2021-03-21 12:22:22 -07:00
Alex Forencich
7fb8c4e28b Reset processing on assert edge only to permit operations to be queued while reset is asserted 2021-03-21 12:13:19 -07:00
Alex Forencich
156fada616 Store commands currently being processed so they can be released when the processing coroutines are killed 2021-03-21 12:04:30 -07:00
Alex Forencich
d88ba7caf3 Warn when operations are dropped during reset 2021-03-21 11:41:25 -07:00
Alex Forencich
6c66776518 Use start_soon instead of fork 2021-03-21 11:40:25 -07:00
Alex Forencich
a71678c7e3 Bump to dev version 2021-03-17 18:32:42 -07:00
Alex Forencich
c0ebb90cd4 Release v0.1.8 2021-03-17 18:31:54 -07:00
Alex Forencich
56caf57fa4 Fix method name 2021-03-17 18:19:30 -07:00
Alex Forencich
abb78308ff Defer idle event until completion of transfer 2021-03-17 18:02:11 -07:00
Alex Forencich
f19ca9f651 Use cocotb async queues 2021-03-17 17:34:26 -07:00
Alex Forencich
1c40b8fa58 Use cocotb-bus 2021-03-16 18:47:32 -07:00
Alex Forencich
cfd5dae6ea Bump to dev version v0.1.7 2021-03-06 18:30:19 -08:00
14 changed files with 929 additions and 612 deletions

View File

@@ -42,36 +42,27 @@ To use these modules, import the one you need and connect it to the DUT:
axi_master = AxiMaster(AxiBus.from_prefix(dut, "s_axi"), dut.clk, dut.rst)
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object, as appropriate. These objects are containers for the interface signals and include class methods to automate connections.
Once the module is instantiated, read and write operations can be initiated in a few different ways.
Once the module is instantiated, read and write operations can be initiated in a couple of different ways.
First, non-blocking operations can be started with `init_read()` and `init_write()`. These methods will queue up a read or write operation to be carried out over the interface. The result of the operation can be retrieved with `get_read_data()` and `get_write_resp()`. To monitor the status of the module, `idle()`, `wait()`, `wait_read()`, and `wait_write()` can be used. For example:
axi_master.init_write(0x0000, b'test')
await axi_master.wait()
resp = axi_master.get_write_resp()
axi_master.init_read(0x0000, 4)
await axi_master.wait()
data = axi_master.get_read_data()
Alternatively, an event object can be provided as an argument to `init_read()` and `init_write()`, and the result can be retrieved from `Event.data`. For example:
event = Event()
axi_master.init_write(0x0000, b'test', event=event)
await event.wait()
resp = event.data
event = Event()
axi_master.init_read(0x0000, 4, event=event)
await event.wait()
resp = event.data
Second, blocking operations can be carried out with `read()` and `write()` and their associated word-access wrappers. Multiple concurrent operations started from different coroutines are handled correctly. For example:
First, operations can be carried out with async blocking `read()`, `write()`, and their associated word-access wrappers. Multiple concurrent operations started from different coroutines are handled correctly, with results returned in the order that the operations complete. For example:
await axi_master.write(0x0000, b'test')
data = await axi_master.read(0x0000, 4)
`read()`, `write()`, `get_read_data()`, and `get_write_resp()` return `namedtuple` objects containing _address_, _data_ or _length_, and _resp_.
Additional parameters can be specified to control sideband signals and burst settings. The transfer will be split into one or more bursts according to the AXI specification. All bursts generated from the same call to `read()` or `write()` will use the same ID, which will be automatically generated if not specified. `read()` and `write()` return `namedtuple` objects containing _address_, _data_ or _length_, and _resp_. This is the preferred style, and this is the only style supported by the word-access wrappers.
Alternatively, operations can be initiated with non-blocking `init_read()` and `init_write()`. These functions return `Event` objects which are triggered when the operation completes, and the result can be retrieved from `Event.data`. For example:
write_op = axi_master.init_write(0x0000, b'test')
await write_op.wait()
resp = write_op.data
read_op = axi_master.init_read(0x0000, 4)
await read_op.wait()
resp = read_op.data
With this method, it is possible to start multiple concurrent operations from the same coroutine. It is also possible to use the events with `Combine`, `First`, and `with_timeout`.
#### `AxiMaster` and `AxiLiteMaster` constructor parameters
@@ -86,16 +77,12 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
#### Methods
* `init_read(address, length, ...)`: initiate reading _length_ bytes, starting at _address_
* `init_write(address, data, ...)`: initiate writing _data_ (bytes), starting from _address_
* `init_read(address, length, ...)`: initiate reading _length_ bytes, starting at _address_. Returns an `Event` object.
* `init_write(address, data, ...)`: initiate writing _data_ (bytes), starting from _address_. Returns an `Event` object.
* `idle()`: returns _True_ when there are no outstanding operations in progress
* `wait()`: blocking wait until all outstanding operations complete
* `wait_read()`: wait until all outstanding read operations complete
* `wait_write()`: wait until all outstanding write operations complete
* `read_data_ready()`: determine if any read read data is available
* `get_read_data()`: fetch first available read data
* `write_resp_ready()`: determine if any write response is available
* `get_write_resp()`: fetch first available write response
* `read(address, length, ...)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder='little', ws=2, ...)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder='little', ...)`: read _count_ 4-byte dwords, starting at _address_
@@ -125,16 +112,16 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
* _region_: AXI region field, default `0`
* _user_: AXI user signal (awuser/aruser), default `0`
* _wuser_: AXI wuser signal, default `0` (write-related methods only)
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None` (`init_read()` and `init_write()` only). If provided, the event will be triggered when the operation completes and the result returned via `Event.data` instead of `get_read_data()` or `get_write_resp()`.
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
#### Additional optional arguments for `AxiLiteMaster`
* _prot_: AXI protection flags, default `AxiProt.NONSECURE`
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None` (`init_read()` and `init_write()` only). If provided, the event will be triggered when the operation completes and the result returned via `Event.data` instead of `get_read_data()` or `get_write_resp()`.
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
#### `AxiBus` and `AxiLiteBus` objects
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are extensions of `cocotb.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
### AXI and AXI lite RAM
@@ -154,6 +141,7 @@ Once the module is instantiated, the memory contents can be accessed in a couple
axi_ram.write(0x0000, b'test')
data = axi_ram.read(0x0000, 4)
axi_ram.hexdump(0x0000, 4, prefix="RAM")
Multi-port memories can be constructed by passing the `mem` object of the first instance to the other instances. For example, here is how to create a four-port RAM:
@@ -255,8 +243,8 @@ Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related,
* _pause_: stall the interface (deassert `tready` or `tvalid`) (source/sink only)
* _queue_occupancy_bytes_: number of bytes in queue (all)
* _queue_occupancy_frames_: number of frames in queue (all)
* _queue_occupancy_limit_bytes_: max number of bytes in queue allowed before tready deassert (sink only)
* _queue_occupancy_limit_frames_: max number of frames in queue allowed before tready deassert (sink only)
* _queue_occupancy_limit_bytes_: max number of bytes in queue allowed before backpressure is applied (source/sink only)
* _queue_occupancy_limit_frames_: max number of frames in queue allowed before backpressure is applied (source/sink only)
#### Methods
@@ -270,7 +258,7 @@ Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related,
* `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor)
* `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all)
* `full()`: returns _True_ if the queue occupancy limits are met (sink)
* `full()`: returns _True_ if the queue occupancy limits are met (source/sink)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source)

View File

@@ -26,8 +26,8 @@ from .stream import define_stream
# Write address channel
AxiAWBus, AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW",
signals=["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"],
optional_signals=["awlock", "awcache", "awqos", "awregion", "awuser"],
signals=["awid", "awaddr", "awlen", "awsize", "awburst", "awvalid", "awready"],
optional_signals=["awlock", "awcache", "awprot", "awqos", "awregion", "awuser"],
signal_widths={"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1,
"awcache": 4, "awprot": 3, "awqos": 4, "awregion": 4}
)
@@ -41,23 +41,23 @@ AxiWBus, AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("Axi
# Write response channel
AxiBBus, AxiBTransaction, AxiBSource, AxiBSink, AxiBMonitor = define_stream("AxiB",
signals=["bid", "bresp", "bvalid", "bready"],
optional_signals=["buser"],
signals=["bid", "bvalid", "bready"],
optional_signals=["bresp", "buser"],
signal_widths={"bresp": 2}
)
# Read address channel
AxiARBus, AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR",
signals=["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"],
optional_signals=["arlock", "arcache", "arqos", "arregion", "aruser"],
signals=["arid", "araddr", "arlen", "arsize", "arburst", "arvalid", "arready"],
optional_signals=["arlock", "arcache", "arprot", "arqos", "arregion", "aruser"],
signal_widths={"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1,
"arcache": 4, "arprot": 3, "arqos": 4, "arregion": 4}
)
# Read data channel
AxiRBus, AxiRTransaction, AxiRSource, AxiRSink, AxiRMonitor = define_stream("AxiR",
signals=["rid", "rdata", "rresp", "rlast", "rvalid", "rready"],
optional_signals=["ruser"],
signals=["rid", "rdata", "rlast", "rvalid", "rready"],
optional_signals=["rresp", "ruser"],
signal_widths={"rresp": 2, "rlast": 1}
)

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple, Counter
from collections import namedtuple, Counter
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event
from .version import __version__
@@ -48,8 +49,97 @@ AxiReadRespCmd = namedtuple("AxiReadRespCmd", ["address", "length", "size", "cyc
AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
class TagContext:
def __init__(self, manager):
self.current_tag = 0
self._cmd_queue = Queue()
self._current_cmd = None
self._resp_queue = Queue()
self._cr = None
self._manager = manager
async def get_resp(self):
return await self._resp_queue.get()
def get_resp_nowait(self):
return self._resp_queue.get_nowait()
def _start(self):
if self._cr is None:
self._cr = cocotb.fork(self._process_queue())
def _flush(self):
flushed_cmds = []
if self._cr is not None:
self._cr.kill()
self._cr = None
self._manager._set_idle(self)
if self._current_cmd is not None:
flushed_cmds.append(self._current_cmd)
self._current_cmd = None
while not self._cmd_queue.empty():
flushed_cmds.append(self._cmd_queue.get_nowait())
while not self._resp_queue.empty():
self._resp_queue.get_nowait()
return flushed_cmds
async def _process_queue(self):
while True:
cmd = await self._cmd_queue.get()
self._current_cmd = cmd
await self._manager._process(self, cmd)
self._current_cmd = None
if self._cmd_queue.empty() and self._resp_queue.empty():
self._manager._set_idle(self)
class TagContextManager:
def __init__(self, process):
self._context_list = []
self._context_idle_list = []
self._context_mapping = {}
self._process = process
def _get_context(self, tag):
if tag in self._context_mapping:
return self._context_mapping[tag]
elif self._context_idle_list:
context = self._context_idle_list.pop()
else:
context = TagContext(self)
self._context_list.append(context)
context._start()
context.current_tag = tag
self._context_mapping[tag] = context
return context
def start_cmd(self, tag, cmd):
context = self._get_context(tag)
context._cmd_queue.put_nowait(cmd)
def put_resp(self, tag, resp):
context = self._get_context(tag)
context._resp_queue.put_nowait(resp)
def _set_idle(self, context):
if context.current_tag in self._context_mapping:
del self._context_mapping[context.current_tag]
self._context_idle_list.append(context)
context.current_tag = None
def flush(self):
flushed_cmds = []
for c in self._context_list:
flushed_cmds.extend(c._flush())
return flushed_cmds
class AxiMasterWrite(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI master (write)")
@@ -58,21 +148,20 @@ class AxiMasterWrite(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.aw_channel = AxiAWSource(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSource(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSink(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = deque()
self.write_command_sync = Event()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.write_command_queue = Queue()
self.current_write_command = None
self.id_count = 2**len(self.aw_channel.bus.awid)
self.cur_id = 0
self.active_id = Counter()
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.tag_context_manager = TagContextManager(self._process_write_resp_id)
self.in_flight_operations = 0
self._idle = Event()
@@ -80,23 +169,40 @@ class AxiMasterWrite(Reset):
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**self.byte_width-1
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.max_burst_len = max(min(max_burst_len, 256), 1)
self.max_burst_size = (self.byte_width-1).bit_length()
self.max_burst_size = (self.byte_lanes-1).bit_length()
self.awlock_present = hasattr(self.bus.aw, "awlock")
self.awcache_present = hasattr(self.bus.aw, "awcache")
self.awprot_present = hasattr(self.bus.aw, "awprot")
self.awqos_present = hasattr(self.bus.aw, "awqos")
self.awregion_present = hasattr(self.bus.aw, "awregion")
self.awuser_present = hasattr(self.bus.aw, "awuser")
self.wuser_present = hasattr(self.bus.w, "wuser")
self.buser_present = hasattr(self.bus.b, "buser")
self.log.info("AXI master configuration:")
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
self.log.info(" Max burst length: %d cycles (%d bytes)",
self.max_burst_len, self.max_burst_len*self.byte_width)
self.max_burst_len, self.max_burst_len*self.byte_lanes)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI master signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
@@ -108,7 +214,10 @@ class AxiMasterWrite(Reset):
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if awid is None or awid < 0:
@@ -126,6 +235,27 @@ class AxiMasterWrite(Reset):
lock = AxiLockType(lock)
prot = AxiProt(prot)
if not self.awlock_present and lock != AxiLockType.NORMAL:
raise ValueError("awlock sideband signal value specified, but signal is not connected")
if not self.awcache_present and cache != 0b0011:
raise ValueError("awcache sideband signal value specified, but signal is not connected")
if not self.awprot_present and prot != AxiProt.NONSECURE:
raise ValueError("awprot sideband signal value specified, but signal is not connected")
if not self.awqos_present and qos != 0:
raise ValueError("awqos sideband signal value specified, but signal is not connected")
if not self.awregion_present and region != 0:
raise ValueError("awregion sideband signal value specified, but signal is not connected")
if not self.awuser_present and user != 0:
raise ValueError("awuser sideband signal value specified, but signal is not connected")
if not self.wuser_present and wuser != 0:
raise ValueError("wuser sideband signal value specified, but signal is not connected")
if wuser is None:
wuser = 0
elif isinstance(wuser, int):
@@ -138,8 +268,9 @@ class AxiMasterWrite(Reset):
cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock,
cache, prot, qos, region, user, wuser, event)
self.write_command_queue.append(cmd)
self.write_command_sync.set()
self.write_command_queue.put_nowait(cmd)
return event
def idle(self):
return not self.in_flight_operations
@@ -148,18 +279,9 @@ class AxiMasterWrite(Reset):
while not self.idle():
await self._idle.wait()
def write_resp_ready(self):
return bool(self.write_resp_queue)
def get_write_resp(self):
if self.write_resp_queue:
return self.write_resp_queue.popleft()
return None
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
event = Event()
self.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
event = self.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser)
await event.wait()
return event.data
@@ -209,6 +331,33 @@ class AxiMasterWrite(Reset):
if self._process_write_resp_cr is not None:
self._process_write_resp_cr.kill()
self._process_write_resp_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed write operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.write_command_queue.empty():
cmd = self.write_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_command:
cmd = self.current_write_command
self.current_write_command = None
flush_cmd(cmd)
for cmd in self.tag_context_manager.flush():
flush_cmd(cmd)
self.cur_id = 0
self.active_id = Counter()
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
@@ -216,40 +365,18 @@ class AxiMasterWrite(Reset):
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
while self.write_command_queue:
cmd = self.write_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_write_resp_command_queue:
cmd = self.int_write_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.write_resp_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_write(self):
while True:
if not self.write_command_queue:
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = self.write_command_queue.popleft()
cmd = await self.write_command_queue.get()
self.current_write_command = cmd
num_bytes = 2**cmd.size
aligned_addr = (cmd.address // num_bytes) * num_bytes
word_addr = (cmd.address // self.byte_width) * self.byte_width
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
start_offset = cmd.address % self.byte_width
end_offset = ((cmd.address + len(cmd.data) - 1) % self.byte_width) + 1
start_offset = cmd.address % self.byte_lanes
end_offset = ((cmd.address + len(cmd.data) - 1) % self.byte_lanes) + 1
cycles = (len(cmd.data) + (cmd.address % num_bytes) + num_bytes-1) // num_bytes
@@ -282,7 +409,7 @@ class AxiMasterWrite(Reset):
if k == cycles-1:
stop = end_offset
strb = (self.strb_mask << start) & self.strb_mask & (self.strb_mask >> (self.byte_width - stop))
strb = (self.strb_mask << start) & self.strb_mask & (self.strb_mask >> (self.byte_lanes - stop))
val = 0
for j in range(start, stop):
@@ -298,7 +425,7 @@ class AxiMasterWrite(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((awid, burst_length))
burst_list.append(burst_length)
aw = self.aw_channel._transaction_obj()
aw.awid = awid
@@ -329,45 +456,41 @@ class AxiMasterWrite(Reset):
if isinstance(wuser, int):
w.wuser = wuser
else:
if wuser:
w.wuser = wuser.pop(0)
if wuser and k < len(wuser):
w.wuser = wuser[k]
else:
w.wuser = 0
await self.w_channel.send(w)
cur_addr += num_bytes
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
cycle_offset = (cycle_offset + num_bytes) % self.byte_lanes
resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd)
self.int_write_resp_command_sync.set()
self.tag_context_manager.start_cmd(awid, resp_cmd)
self.current_write_command = None
async def _process_write_resp(self):
while True:
if not self.int_write_resp_command_queue:
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
b = await self.b_channel.recv()
cmd = self.int_write_resp_command_queue.popleft()
bid = int(b.bid)
if self.active_id[bid] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
self.tag_context_manager.put_resp(bid, b)
async def _process_write_resp_id(self, context, cmd):
bid = context.current_tag
resp = AxiResp.OKAY
user = []
for bid, burst_length in cmd.burst_list:
while not self.int_write_resp_queue_list[bid]:
b = await self.b_channel.recv()
for burst_length in cmd.burst_list:
b = await context.get_resp()
i = int(b.bid)
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[i].append(b)
b = self.int_write_resp_queue_list[bid].popleft()
burst_id = int(b.bid)
burst_resp = AxiResp(b.bresp)
burst_user = int(b.buser)
@@ -382,18 +505,17 @@ class AxiMasterWrite(Reset):
self.active_id[bid] -= 1
self.log.info("Write burst complete bid: 0x%x bresp: %s", burst_id, burst_resp)
self.log.info("Write burst complete bid: 0x%x bresp: %s", bid, burst_resp)
if not self.buser_present:
user = None
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
cmd.address, cmd.prot, resp, cmd.length)
write_resp = AxiWriteResp(cmd.address, cmd.length, resp, user)
if cmd.event is not None:
cmd.event.set(write_resp)
else:
self.write_resp_queue.append(write_resp)
self.write_resp_sync.set()
self.in_flight_operations -= 1
@@ -403,6 +525,9 @@ class AxiMasterWrite(Reset):
class AxiMasterRead(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI master (read)")
@@ -411,20 +536,18 @@ class AxiMasterRead(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSink(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = deque()
self.read_command_sync = Event()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.read_command_queue = Queue()
self.current_read_command = None
self.id_count = 2**len(self.ar_channel.bus.arid)
self.cur_id = 0
self.active_id = Counter()
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.tag_context_manager = TagContextManager(self._process_read_resp_id)
self.in_flight_operations = 0
self._idle = Event()
@@ -432,21 +555,37 @@ class AxiMasterRead(Reset):
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.byte_lanes = self.width // self.byte_size
self.max_burst_len = max(min(max_burst_len, 256), 1)
self.max_burst_size = (self.byte_width-1).bit_length()
self.max_burst_size = (self.byte_lanes-1).bit_length()
self.arlock_present = hasattr(self.bus.ar, "arlock")
self.arcache_present = hasattr(self.bus.ar, "arcache")
self.arprot_present = hasattr(self.bus.ar, "arprot")
self.arqos_present = hasattr(self.bus.ar, "arqos")
self.arregion_present = hasattr(self.bus.ar, "arregion")
self.aruser_present = hasattr(self.bus.ar, "aruser")
self.ruser_present = hasattr(self.bus.r, "ruser")
self.log.info("AXI master configuration:")
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" Max burst size: %d (%d bytes)", self.max_burst_size, 2**self.max_burst_size)
self.log.info(" Max burst length: %d cycles (%d bytes)",
self.max_burst_len, self.max_burst_len*self.byte_width)
self.max_burst_len, self.max_burst_len*self.byte_lanes)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI master signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
@@ -458,7 +597,10 @@ class AxiMasterRead(Reset):
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if length < 0:
@@ -479,12 +621,31 @@ class AxiMasterRead(Reset):
lock = AxiLockType(lock)
prot = AxiProt(prot)
if not self.arlock_present and lock != AxiLockType.NORMAL:
raise ValueError("arlock sideband signal value specified, but signal is not connected")
if not self.arcache_present and cache != 0b0011:
raise ValueError("arcache sideband signal value specified, but signal is not connected")
if not self.arprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
if not self.arqos_present and qos != 0:
raise ValueError("arqos sideband signal value specified, but signal is not connected")
if not self.arregion_present and region != 0:
raise ValueError("arregion sideband signal value specified, but signal is not connected")
if not self.aruser_present and user != 0:
raise ValueError("aruser sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
self.read_command_queue.append(cmd)
self.read_command_sync.set()
self.read_command_queue.put_nowait(cmd)
return event
def idle(self):
return not self.in_flight_operations
@@ -493,18 +654,9 @@ class AxiMasterRead(Reset):
while not self.idle():
await self._idle.wait()
def read_data_ready(self):
return bool(self.read_data_queue)
def get_read_data(self):
if self.read_data_queue:
return self.read_data_queue.popleft()
return None
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
event = Event()
self.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
event = self.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user)
await event.wait()
return event.data
@@ -554,6 +706,32 @@ class AxiMasterRead(Reset):
if self._process_read_resp_cr is not None:
self._process_read_resp_cr.kill()
self._process_read_resp_cr = None
self.ar_channel.clear()
self.r_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed read operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.read_command_queue.empty():
cmd = self.read_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_command:
cmd = self.current_read_command
self.current_read_command = None
flush_cmd(cmd)
for cmd in self.tag_context_manager.flush():
flush_cmd(cmd)
self.cur_id = 0
self.active_id = Counter()
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
@@ -561,31 +739,10 @@ class AxiMasterRead(Reset):
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self.ar_channel.clear()
self.r_channel.clear()
while self.read_command_queue:
cmd = self.read_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_read_resp_command_queue:
cmd = self.int_read_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.read_data_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_read(self):
while True:
if not self.read_command_queue:
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = self.read_command_queue.popleft()
cmd = await self.read_command_queue.get()
self.current_read_command = cmd
num_bytes = 2**cmd.size
@@ -619,7 +776,7 @@ class AxiMasterRead(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((arid, burst_length))
burst_list.append(burst_length)
ar = self.r_channel._transaction_obj()
ar.arid = arid
@@ -643,23 +800,42 @@ class AxiMasterRead(Reset):
cur_addr += num_bytes
resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd)
self.int_read_resp_command_sync.set()
self.tag_context_manager.start_cmd(arid, resp_cmd)
self.current_read_command = None
async def _process_read_resp(self):
while True:
if not self.int_read_resp_command_queue:
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
burst = []
cur_rid = None
cmd = self.int_read_resp_command_queue.popleft()
while True:
r = await self.r_channel.recv()
rid = int(r.rid)
if cur_rid is not None and cur_rid != rid:
raise Exception(f"ID not constant within burst (expected {cur_rid}, got {rid})")
if self.active_id[rid] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
burst.append(r)
cur_rid = rid
if int(r.rlast):
self.tag_context_manager.put_resp(rid, burst)
burst = []
cur_rid = None
async def _process_read_resp_id(self, context, cmd):
rid = context.current_tag
num_bytes = 2**cmd.size
aligned_addr = (cmd.address // num_bytes) * num_bytes
word_addr = (cmd.address // self.byte_width) * self.byte_width
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
start_offset = cmd.address % self.byte_width
start_offset = cmd.address % self.byte_lanes
cycle_offset = aligned_addr - word_addr
data = bytearray()
@@ -669,24 +845,15 @@ class AxiMasterRead(Reset):
first = True
for rid, burst_length in cmd.burst_list:
for k in range(burst_length):
while not self.int_read_resp_queue_list[rid]:
r = await self.r_channel.recv()
for burst_length in cmd.burst_list:
burst = await context.get_resp()
i = int(r.rid)
if len(burst) != burst_length:
raise Exception(f"Burst length incorrect (ID {rid}, expected {burst_length}, got {len(burst)}")
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[i].append(r)
r = self.int_read_resp_queue_list[rid].popleft()
cycle_id = int(r.rid)
for r in burst:
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)
cycle_last = int(r.rlast)
cycle_user = int(r.ruser)
if cycle_resp != AxiResp.OKAY:
@@ -701,34 +868,28 @@ class AxiMasterRead(Reset):
if first:
start = start_offset
assert cycle_last == (k == burst_length - 1)
for j in range(start, stop):
data.append((cycle_data >> j*8) & 0xff)
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
cycle_offset = (cycle_offset + num_bytes) % self.byte_lanes
first = False
if self.active_id[rid] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
self.active_id[rid] -= 1
self.log.info("Read burst complete rid: 0x%x rresp: %s", cycle_id, resp)
self.log.info("Read burst complete rid: 0x%x rresp: %s", rid, resp)
data = data[:cmd.length]
if not self.ruser_present:
user = None
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in data)))
read_resp = AxiReadResp(cmd.address, data, resp, user)
if cmd.event is not None:
cmd.event.set(read_resp)
else:
self.read_data_queue.append(read_resp)
self.read_data_sync.set()
self.in_flight_operations -= 1
@@ -746,11 +907,11 @@ class AxiMaster:
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
return self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
return self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
@@ -766,18 +927,6 @@ class AxiMaster:
async def wait_write(self):
await self.write_if.wait()
def read_data_ready(self):
return self.read_if.read_data_ready()
def get_read_data(self):
return self.read_if.get_read_data()
def write_resp_ready(self):
return self.write_if.write_resp_ready()
def get_write_resp(self):
return self.write_if.get_write_resp()
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read(address, length, arid,

View File

@@ -35,6 +35,9 @@ from .reset import Reset
class AxiRamWrite(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI RAM model (write)")
@@ -45,23 +48,34 @@ class AxiRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**self.byte_width-1
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.log.info("AXI RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" ID width: %d bits", len(self.aw_channel.bus.awid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI RAM model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
@@ -75,14 +89,14 @@ class AxiRamWrite(Memory, Reset):
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
async def _process_write(self):
while True:
@@ -99,7 +113,7 @@ class AxiRamWrite(Memory, Reset):
awid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_width
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
@@ -117,7 +131,7 @@ class AxiRamWrite(Memory, Reset):
cur_addr = aligned_addr
for n in range(length):
cur_word_addr = (cur_addr // self.byte_width) * self.byte_width
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
w = await self.w_channel.recv()
@@ -129,12 +143,12 @@ class AxiRamWrite(Memory, Reset):
self.mem.seek(cur_word_addr % self.size)
data = data.to_bytes(self.byte_width, 'little')
data = data.to_bytes(self.byte_lanes, 'little')
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_width):
for i in range(self.byte_lanes):
if strb & (1 << i):
self.mem.write(data[i:i+1])
else:
@@ -158,6 +172,9 @@ class AxiRamWrite(Memory, Reset):
class AxiRamRead(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI RAM model (read)")
@@ -168,20 +185,30 @@ class AxiRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.byte_lanes = self.width // self.byte_size
self.log.info("AXI RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" ID width: %d bits", len(self.ar_channel.bus.arid))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI RAM model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
@@ -195,14 +222,14 @@ class AxiRamRead(Memory, Reset):
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self.ar_channel.clear()
self.r_channel.clear()
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
@@ -218,7 +245,7 @@ class AxiRamRead(Memory, Reset):
arid, addr, length, size, prot)
num_bytes = 2**size
assert 0 < num_bytes <= self.byte_width
assert 0 < num_bytes <= self.byte_lanes
aligned_addr = (addr // num_bytes) * num_bytes
length += 1
@@ -236,11 +263,11 @@ class AxiRamRead(Memory, Reset):
cur_addr = aligned_addr
for n in range(length):
cur_word_addr = (cur_addr // self.byte_width) * self.byte_width
cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes
self.mem.seek(cur_word_addr % self.size)
data = self.mem.read(self.byte_width)
data = self.mem.read(self.byte_lanes)
r = self.r_channel._transaction_obj()
r.rid = arid

View File

@@ -26,7 +26,8 @@ from .stream import define_stream
# Write address channel
AxiLiteAWBus, AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMonitor = define_stream("AxiLiteAW",
signals=["awaddr", "awprot", "awvalid", "awready"],
signals=["awaddr", "awvalid", "awready"],
optional_signals=["awprot"],
signal_widths={"awprot": 3}
)
@@ -37,19 +38,22 @@ AxiLiteWBus, AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor
# Write response channel
AxiLiteBBus, AxiLiteBTransaction, AxiLiteBSource, AxiLiteBSink, AxiLiteBMonitor = define_stream("AxiLiteB",
signals=["bresp", "bvalid", "bready"],
signals=["bvalid", "bready"],
optional_signals=["bresp"],
signal_widths={"bresp": 2}
)
# Read address channel
AxiLiteARBus, AxiLiteARTransaction, AxiLiteARSource, AxiLiteARSink, AxiLiteARMonitor = define_stream("AxiLiteAR",
signals=["araddr", "arprot", "arvalid", "arready"],
signals=["araddr", "arvalid", "arready"],
optional_signals=["arprot"],
signal_widths={"arprot": 3}
)
# Read data channel
AxiLiteRBus, AxiLiteRTransaction, AxiLiteRSource, AxiLiteRSink, AxiLiteRMonitor = define_stream("AxiLiteR",
signals=["rdata", "rresp", "rvalid", "rready"],
signals=["rdata", "rvalid", "rready"],
optional_signals=["rresp"],
signal_widths={"rresp": 2}
)

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple
from collections import namedtuple
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event
from .version import __version__
@@ -46,6 +47,9 @@ AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
class AxiLiteMasterWrite(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI lite master (write)")
@@ -54,16 +58,17 @@ class AxiLiteMasterWrite(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.aw_channel = AxiLiteAWSource(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSource(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSink(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = deque()
self.write_command_sync = Event()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.write_command_queue = Queue()
self.current_write_command = None
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_command_queue = Queue()
self.current_write_resp_command = None
self.in_flight_operations = 0
self._idle = Event()
@@ -71,16 +76,26 @@ class AxiLiteMasterWrite(Reset):
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**self.byte_width-1
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.awprot_present = hasattr(self.bus.aw, "awprot")
self.log.info("AXI lite master configuration:")
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI lite master signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
self._process_write_cr = None
self._process_write_resp_cr = None
@@ -88,14 +103,21 @@ class AxiLiteMasterWrite(Reset):
self._init_reset(reset, reset_active_level)
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if not self.awprot_present and prot != AxiProt.NONSECURE:
raise ValueError("awprot sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
self.write_command_queue.append(AxiLiteWriteCmd(address, bytearray(data), prot, event))
self.write_command_sync.set()
self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event))
return event
def idle(self):
return not self.in_flight_operations
@@ -104,17 +126,8 @@ class AxiLiteMasterWrite(Reset):
while not self.idle():
await self._idle.wait()
def write_resp_ready(self):
return bool(self.write_resp_queue)
def get_write_resp(self):
if self.write_resp_queue:
return self.write_resp_queue.popleft()
return None
async def write(self, address, data, prot=AxiProt.NONSECURE):
event = Event()
self.init_write(address, data, prot, event)
event = self.init_write(address, data, prot)
await event.wait()
return event.data
@@ -152,6 +165,36 @@ class AxiLiteMasterWrite(Reset):
if self._process_write_resp_cr is not None:
self._process_write_resp_cr.kill()
self._process_write_resp_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed write operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.write_command_queue.empty():
cmd = self.write_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_command:
cmd = self.current_write_command
self.current_write_command = None
flush_cmd(cmd)
while not self.int_write_resp_command_queue.empty():
cmd = self.int_write_resp_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_resp_command:
cmd = self.current_write_resp_command
self.current_write_resp_command = None
flush_cmd(cmd)
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
@@ -159,46 +202,23 @@ class AxiLiteMasterWrite(Reset):
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
while self.write_command_queue:
cmd = self.write_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_write_resp_command_queue:
cmd = self.int_write_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.write_resp_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_write(self):
while True:
if not self.write_command_queue:
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = await self.write_command_queue.get()
self.current_write_command = cmd
cmd = self.write_command_queue.popleft()
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
word_addr = (cmd.address // self.byte_width) * self.byte_width
start_offset = cmd.address % self.byte_width
end_offset = ((cmd.address + len(cmd.data) - 1) % self.byte_width) + 1
start_offset = cmd.address % self.byte_lanes
end_offset = ((cmd.address + len(cmd.data) - 1) % self.byte_lanes) + 1
strb_start = (self.strb_mask << start_offset) & self.strb_mask
strb_end = self.strb_mask >> (self.byte_width - end_offset)
strb_end = self.strb_mask >> (self.byte_lanes - end_offset)
cycles = (len(cmd.data) + (cmd.address % self.byte_width) + self.byte_width-1) // self.byte_width
cycles = (len(cmd.data) + (cmd.address % self.byte_lanes) + self.byte_lanes-1) // self.byte_lanes
resp_cmd = AxiLiteWriteRespCmd(cmd.address, len(cmd.data), cycles, cmd.prot, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd)
self.int_write_resp_command_sync.set()
await self.int_write_resp_command_queue.put(resp_cmd)
offset = 0
@@ -207,7 +227,7 @@ class AxiLiteMasterWrite(Reset):
for k in range(cycles):
start = 0
stop = self.byte_width
stop = self.byte_lanes
strb = self.strb_mask
if k == 0:
@@ -223,7 +243,7 @@ class AxiLiteMasterWrite(Reset):
offset += 1
aw = self.aw_channel._transaction_obj()
aw.awaddr = word_addr + k*self.byte_width
aw.awaddr = word_addr + k*self.byte_lanes
aw.awprot = cmd.prot
w = self.w_channel._transaction_obj()
@@ -233,13 +253,12 @@ class AxiLiteMasterWrite(Reset):
await self.aw_channel.send(aw)
await self.w_channel.send(w)
self.current_write_command = None
async def _process_write_resp(self):
while True:
if not self.int_write_resp_command_queue:
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
cmd = self.int_write_resp_command_queue.popleft()
cmd = await self.int_write_resp_command_queue.get()
self.current_write_resp_command = cmd
resp = AxiResp.OKAY
@@ -256,11 +275,9 @@ class AxiLiteMasterWrite(Reset):
write_resp = AxiLiteWriteResp(cmd.address, cmd.length, resp)
if cmd.event is not None:
cmd.event.set(write_resp)
else:
self.write_resp_queue.append(write_resp)
self.write_resp_sync.set()
self.current_write_resp_command = None
self.in_flight_operations -= 1
@@ -270,6 +287,9 @@ class AxiLiteMasterWrite(Reset):
class AxiLiteMasterRead(Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI lite master (read)")
@@ -278,15 +298,15 @@ class AxiLiteMasterRead(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSink(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = deque()
self.read_command_sync = Event()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.read_command_queue = Queue()
self.current_read_command = None
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_command_queue = Queue()
self.current_read_resp_command = None
self.in_flight_operations = 0
self._idle = Event()
@@ -294,14 +314,24 @@ class AxiLiteMasterRead(Reset):
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.byte_lanes = self.width // self.byte_size
self.arprot_present = hasattr(self.bus.ar, "arprot")
self.log.info("AXI lite master configuration:")
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI lite master signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
self._process_read_cr = None
self._process_read_resp_cr = None
@@ -309,14 +339,21 @@ class AxiLiteMasterRead(Reset):
self._init_reset(reset, reset_active_level)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if not self.arprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
self.in_flight_operations += 1
self._idle.clear()
self.read_command_queue.append(AxiLiteReadCmd(address, length, prot, event))
self.read_command_sync.set()
self.read_command_queue.put_nowait(AxiLiteReadCmd(address, length, prot, event))
return event
def idle(self):
return not self.in_flight_operations
@@ -325,17 +362,8 @@ class AxiLiteMasterRead(Reset):
while not self.idle():
await self._idle.wait()
def read_data_ready(self):
return bool(self.read_data_queue)
def get_read_data(self):
if self.read_data_queue:
return self.read_data_queue.popleft()
return None
async def read(self, address, length, prot=AxiProt.NONSECURE):
event = Event()
self.init_read(address, length, prot, event)
event = self.init_read(address, length, prot)
await event.wait()
return event.data
@@ -373,6 +401,35 @@ class AxiLiteMasterRead(Reset):
if self._process_read_resp_cr is not None:
self._process_read_resp_cr.kill()
self._process_read_resp_cr = None
self.ar_channel.clear()
self.r_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed read operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.read_command_queue.empty():
cmd = self.read_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_command:
cmd = self.current_read_command
self.current_read_command = None
flush_cmd(cmd)
while not self.int_read_resp_command_queue.empty():
cmd = self.int_read_resp_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_resp_command:
cmd = self.current_read_resp_command
self.current_read_resp_command = None
flush_cmd(cmd)
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
@@ -380,60 +437,37 @@ class AxiLiteMasterRead(Reset):
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self.ar_channel.clear()
self.r_channel.clear()
while self.read_command_queue:
cmd = self.read_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_read_resp_command_queue:
cmd = self.int_read_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.read_data_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_read(self):
while True:
if not self.read_command_queue:
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = await self.read_command_queue.get()
self.current_read_command = cmd
cmd = self.read_command_queue.popleft()
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
word_addr = (cmd.address // self.byte_width) * self.byte_width
cycles = (cmd.length + self.byte_width-1 + (cmd.address % self.byte_width)) // self.byte_width
cycles = (cmd.length + self.byte_lanes-1 + (cmd.address % self.byte_lanes)) // self.byte_lanes
resp_cmd = AxiLiteReadRespCmd(cmd.address, cmd.length, cycles, cmd.prot, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd)
self.int_read_resp_command_sync.set()
await self.int_read_resp_command_queue.put(resp_cmd)
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
cmd.address, cmd.prot, cmd.length)
for k in range(cycles):
ar = self.ar_channel._transaction_obj()
ar.araddr = word_addr + k*self.byte_width
ar.araddr = word_addr + k*self.byte_lanes
ar.arprot = cmd.prot
await self.ar_channel.send(ar)
self.current_read_command = None
async def _process_read_resp(self):
while True:
if not self.int_read_resp_command_queue:
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
cmd = await self.int_read_resp_command_queue.get()
self.current_read_resp_command = cmd
cmd = self.int_read_resp_command_queue.popleft()
start_offset = cmd.address % self.byte_width
end_offset = ((cmd.address + cmd.length - 1) % self.byte_width) + 1
start_offset = cmd.address % self.byte_lanes
end_offset = ((cmd.address + cmd.length - 1) % self.byte_lanes) + 1
data = bytearray()
@@ -449,7 +483,7 @@ class AxiLiteMasterRead(Reset):
resp = cycle_resp
start = 0
stop = self.byte_width
stop = self.byte_lanes
if k == 0:
start = start_offset
@@ -464,11 +498,9 @@ class AxiLiteMasterRead(Reset):
read_resp = AxiLiteReadResp(cmd.address, data, resp)
if cmd.event is not None:
cmd.event.set(read_resp)
else:
self.read_data_queue.append(read_resp)
self.read_data_sync.set()
self.current_read_resp_command = None
self.in_flight_operations -= 1
@@ -485,10 +517,10 @@ class AxiLiteMaster:
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
self.read_if.init_read(address, length, prot, event)
return self.read_if.init_read(address, length, prot, event)
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
self.write_if.init_write(address, data, prot, event)
return self.write_if.init_write(address, data, prot, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
@@ -504,18 +536,6 @@ class AxiLiteMaster:
async def wait_write(self):
await self.write_if.wait()
def read_data_ready(self):
return self.read_if.read_data_ready()
def get_read_data(self):
return self.read_if.get_read_data()
def write_resp_ready(self):
return self.write_if.write_resp_ready()
def get_write_resp(self):
return self.write_if.get_write_resp()
async def read(self, address, length, prot=AxiProt.NONSECURE):
return await self.read_if.read(address, length, prot)

View File

@@ -35,6 +35,9 @@ from .reset import Reset
class AxiLiteRamWrite(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI lite RAM model (write)")
@@ -45,22 +48,33 @@ class AxiLiteRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**self.byte_width-1
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.log.info("AXI lite RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.aw_channel.bus.awaddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI lite RAM model signals:")
for bus in (self.bus.aw, self.bus.w, self.bus.b):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes == len(self.w_channel.bus.wstrb)
assert self.byte_lanes * self.byte_size == self.width
self._process_write_cr = None
@@ -72,20 +86,20 @@ class AxiLiteRamWrite(Memory, Reset):
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
addr = (int(aw.awaddr) // self.byte_width) * self.byte_width
addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(aw.awprot)
w = await self.w_channel.recv()
@@ -97,12 +111,12 @@ class AxiLiteRamWrite(Memory, Reset):
self.mem.seek(addr % self.size)
data = data.to_bytes(self.byte_width, 'little')
data = data.to_bytes(self.byte_lanes, 'little')
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_width):
for i in range(self.byte_lanes):
if strb & (1 << i):
self.mem.write(data[i:i+1])
else:
@@ -116,6 +130,9 @@ class AxiLiteRamWrite(Memory, Reset):
class AxiLiteRamRead(Memory, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI lite RAM model (read)")
@@ -126,19 +143,29 @@ class AxiLiteRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.byte_lanes = self.width // self.byte_size
self.log.info("AXI lite RAM model configuration:")
self.log.info(" Memory size: %d bytes", len(self.mem))
self.log.info(" Address width: %d bits", len(self.ar_channel.bus.araddr))
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
assert self.byte_width * self.byte_size == self.width
self.log.info("AXI lite RAM model signals:")
for bus in (self.bus.ar, self.bus.r):
for sig in sorted(list(set().union(bus._signals, bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
assert self.byte_lanes * self.byte_size == self.width
self._process_read_cr = None
@@ -150,26 +177,26 @@ class AxiLiteRamRead(Memory, Reset):
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self.ar_channel.clear()
self.r_channel.clear()
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()
addr = (int(ar.araddr) // self.byte_width) * self.byte_width
addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes
prot = AxiProt(ar.arprot)
# todo latency
self.mem.seek(addr % self.size)
data = self.mem.read(self.byte_width)
data = self.mem.read(self.byte_lanes)
r = self.r_channel._transaction_obj()
r.rdata = int.from_bytes(data, 'little')

View File

@@ -23,12 +23,12 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time
from cocotb.bus import Bus
from cocotb_bus.bus import Bus
from .version import __version__
from .reset import Reset
@@ -276,8 +276,12 @@ class AxiStreamBase(Reset):
super().__init__(*args, **kwargs)
self.active = False
self.queue = deque()
self.queue_sync = Event()
self.queue = Queue()
self.dequeue_event = Event()
self.current_frame = None
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
@@ -315,25 +319,13 @@ class AxiStreamBase(Reset):
self.log.info("AXI stream %s configuration:", self._type)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
if hasattr(self.bus, "tkeep"):
self.log.info(" tkeep width: %d bits", len(self.bus.tkeep))
self.log.info("AXI stream %s signals:", self._type)
for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))):
if hasattr(self.bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(self.bus, sig)))
else:
self.log.info(" tkeep: not present")
if hasattr(self.bus, "tid"):
self.log.info(" tid width: %d bits", len(self.bus.tid))
else:
self.log.info(" tid: not present")
if hasattr(self.bus, "tdest"):
self.log.info(" tdest width: %d bits", len(self.bus.tdest))
else:
self.log.info(" tdest: not present")
if hasattr(self.bus, "tuser"):
self.log.info(" tuser width: %d bits", len(self.bus.tuser))
else:
self.log.info(" tuser: not present")
self.log.info(" %s: not present", sig)
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
@@ -344,13 +336,19 @@ class AxiStreamBase(Reset):
self._init_reset(reset, reset_active_level)
def count(self):
return len(self.queue)
return self.queue.qsize()
def empty(self):
return not self.queue
return self.queue.empty()
def clear(self):
self.queue.clear()
while not self.queue.empty():
frame = self.queue.get_nowait()
frame.sim_time_end = None
frame.handle_tx_complete()
self.dequeue_event.set()
self.idle_event.set()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
@@ -360,13 +358,16 @@ class AxiStreamBase(Reset):
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
self.active = False
if self.queue.empty():
self.idle_event.set()
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
raise NotImplementedError()
@@ -407,14 +408,32 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
_valid_init = 0
_ready_init = None
async def send(self, frame):
self.send_nowait(frame)
def __init__(self, bus, clock, reset=None, reset_active_level=True,
byte_size=None, byte_lanes=None, *args, **kwargs):
def send_nowait(self, frame):
super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1
async def send(self, frame):
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
frame = AxiStreamFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame):
if self.full():
raise QueueFull()
frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
async def write(self, data):
await self.send(data)
@@ -422,32 +441,46 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
def write_nowait(self, data):
self.send_nowait(data)
def full(self):
if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames:
return True
else:
return False
def idle(self):
return self.empty() and not self.active
async def wait(self):
while not self.idle():
await RisingEdge(self.clock)
await self.idle_event.wait()
def _handle_reset(self, state):
super()._handle_reset(state)
self.bus.tdata <= 0
if state:
self.bus.tdata.value = 0
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 0
self.bus.tvalid.value = 0
if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0
self.bus.tlast.value = 0
if hasattr(self.bus, "tkeep"):
self.bus.tkeep <= 0
self.bus.tkeep.value = 0
if hasattr(self.bus, "tid"):
self.bus.tid <= 0
self.bus.tid.value = 0
if hasattr(self.bus, "tdest"):
self.bus.tdest <= 0
self.bus.tdest.value = 0
if hasattr(self.bus, "tuser"):
self.bus.tuser <= 0
self.bus.tuser.value = 0
if self.current_frame:
self.log.warning("Flushed transmit frame during reset: %s", self.current_frame)
self.current_frame.handle_tx_complete()
self.current_frame = None
async def _run(self):
frame = None
frame_offset = 0
self.active = False
while True:
@@ -458,15 +491,18 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
if (tready_sample and tvalid_sample) or not tvalid_sample:
if frame is None and self.queue:
frame = self.queue.popleft()
if frame is None and not self.queue.empty():
frame = self.queue.get_nowait()
self.dequeue_event.set()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
self.current_frame = frame
frame.sim_time_start = get_sim_time()
frame.sim_time_end = None
self.log.info("TX frame: %s", frame)
frame.normalize()
self.active = True
frame_offset = 0
if frame and not self.pause:
tdata_val = 0
@@ -477,38 +513,42 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
tuser_val = 0
for offset in range(self.byte_lanes):
tdata_val |= (frame.tdata.pop(0) & self.byte_mask) << (offset * self.byte_size)
tkeep_val |= (frame.tkeep.pop(0) & 1) << offset
tid_val = frame.tid.pop(0)
tdest_val = frame.tdest.pop(0)
tuser_val = frame.tuser.pop(0)
tdata_val |= (frame.tdata[frame_offset] & self.byte_mask) << (offset * self.byte_size)
tkeep_val |= (frame.tkeep[frame_offset] & 1) << offset
tid_val = frame.tid[frame_offset]
tdest_val = frame.tdest[frame_offset]
tuser_val = frame.tuser[frame_offset]
frame_offset += 1
if len(frame.tdata) == 0:
if frame_offset >= len(frame.tdata):
tlast_val = 1
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None
self.current_frame = None
break
self.bus.tdata <= tdata_val
self.bus.tdata.value = tdata_val
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 1
self.bus.tvalid.value = 1
if hasattr(self.bus, "tlast"):
self.bus.tlast <= tlast_val
self.bus.tlast.value = tlast_val
if hasattr(self.bus, "tkeep"):
self.bus.tkeep <= tkeep_val
self.bus.tkeep.value = tkeep_val
if hasattr(self.bus, "tid"):
self.bus.tid <= tid_val
self.bus.tid.value = tid_val
if hasattr(self.bus, "tdest"):
self.bus.tdest <= tdest_val
self.bus.tdest.value = tdest_val
if hasattr(self.bus, "tuser"):
self.bus.tuser <= tuser_val
self.bus.tuser.value = tuser_val
else:
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 0
self.bus.tvalid.value = 0
if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0
self.bus.tlast.value = 0
self.active = bool(frame)
if not frame and self.queue.empty():
self.idle_event.set()
class AxiStreamMonitor(AxiStreamBase):
@@ -527,21 +567,22 @@ class AxiStreamMonitor(AxiStreamBase):
self.read_queue = []
async def recv(self, compact=True):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue:
frame = self.queue.popleft()
def _recv(self, frame, compact=True):
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
return None
async def recv(self, compact=True):
frame = await self.queue.get()
return self._recv(frame, compact)
def recv_nowait(self, compact=True):
frame = self.queue.get_nowait()
return self._recv(frame, compact)
async def read(self, count=-1):
while not self.read_queue:
@@ -565,11 +606,10 @@ class AxiStreamMonitor(AxiStreamBase):
async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty():
return
self.queue_sync.clear()
if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else:
await self.queue_sync.wait()
await self.active_event.wait()
async def _run(self):
frame = None
@@ -589,9 +629,9 @@ class AxiStreamMonitor(AxiStreamBase):
else:
frame = AxiStreamFrame([], [], [], [], [])
frame.sim_time_start = get_sim_time()
self.active = True
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
@@ -609,10 +649,12 @@ class AxiStreamMonitor(AxiStreamBase):
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.queue_sync.set()
self.queue.put_nowait(frame)
self.active_event.set()
frame = None
else:
self.active = bool(frame)
class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
@@ -643,8 +685,9 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
def _handle_reset(self, state):
super()._handle_reset(state)
if state:
if hasattr(self.bus, "tready"):
self.bus.tready <= 0
self.bus.tready.value = 0
async def _run(self):
frame = None
@@ -664,9 +707,9 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
else:
frame = AxiStreamFrame([], [], [], [], [])
frame.sim_time_start = get_sim_time()
self.active = True
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
@@ -684,10 +727,12 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.queue_sync.set()
self.queue.put_nowait(frame)
self.active_event.set()
frame = None
else:
self.active = bool(frame)
if hasattr(self.bus, "tready"):
self.bus.tready <= (not self.full() and not self.pause)
self.bus.tready.value = (not self.full() and not self.pause)

View File

@@ -23,11 +23,11 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Event, First, Timer
from cocotb.bus import Bus
from cocotb_bus.bus import Bus
from .reset import Reset
@@ -94,8 +94,11 @@ class StreamBase(Reset):
self.active = False
self.queue = deque()
self.queue_sync = Event()
self.queue = Queue()
self.dequeue_event = Event()
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.ready = None
self.valid = None
@@ -124,13 +127,17 @@ class StreamBase(Reset):
self._init_reset(reset, reset_active_level)
def count(self):
return len(self.queue)
return self.queue.qsize()
def empty(self):
return not self.queue
return self.queue.empty()
def clear(self):
self.queue.clear()
while not self.queue.empty():
self.queue.get_nowait()
self.dequeue_event.set()
self.idle_event.set()
self.active_event.clear()
def _handle_reset(self, state):
if state:
@@ -138,13 +145,16 @@ class StreamBase(Reset):
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
self.active = False
if self.queue.empty():
self.idle_event.set()
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
raise NotImplementedError()
@@ -183,24 +193,42 @@ class StreamSource(StreamBase, StreamPause):
_valid_init = 0
_ready_init = None
def __init__(self, bus, clock, reset=None, reset_active_level=True, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, *args, **kwargs)
self.queue_occupancy_limit = -1
async def send(self, obj):
self.send_nowait(obj)
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
await self.queue.put(obj)
self.idle_event.clear()
def send_nowait(self, obj):
self.queue.append(obj)
if self.full():
raise QueueFull()
self.queue.put_nowait(obj)
self.idle_event.clear()
def full(self):
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True
else:
return False
def idle(self):
return self.empty() and not self.active
async def wait(self):
while not self.idle():
await RisingEdge(self.clock)
await self.idle_event.wait()
def _handle_reset(self, state):
super()._handle_reset(state)
if state:
if self.valid is not None:
self.valid <= 0
self.valid.value = 0
async def _run(self):
while True:
@@ -211,15 +239,18 @@ class StreamSource(StreamBase, StreamPause):
valid_sample = self.valid is None or self.valid.value
if (ready_sample and valid_sample) or (not valid_sample):
if self.queue and not self.pause:
self.bus.drive(self.queue.popleft())
if not self.queue.empty() and not self.pause:
self.bus.drive(self.queue.get_nowait())
self.dequeue_event.set()
if self.valid is not None:
self.valid <= 1
self.valid.value = 1
self.active = True
else:
if self.valid is not None:
self.valid <= 0
self.active = bool(self.queue)
self.valid.value = 0
self.active = not self.queue.empty()
if self.queue.empty():
self.idle_event.set()
class StreamMonitor(StreamBase):
@@ -229,25 +260,26 @@ class StreamMonitor(StreamBase):
_valid_init = None
_ready_init = None
def _recv(self, item):
if self.queue.empty():
self.active_event.clear()
return item
async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
item = await self.queue.get()
return self._recv(item)
def recv_nowait(self):
if self.queue:
return self.queue.popleft()
return None
item = self.queue.get_nowait()
return self._recv(item)
async def wait(self, timeout=0, timeout_unit=None):
if not self.empty():
return
self.queue_sync.clear()
if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else:
await self.queue_sync.wait()
await self.active_event.wait()
async def _run(self):
while True:
@@ -260,8 +292,8 @@ class StreamMonitor(StreamBase):
if ready_sample and valid_sample:
obj = self._transaction_obj()
self.bus.sample(obj)
self.queue.append(obj)
self.queue_sync.set()
self.queue.put_nowait(obj)
self.active_event.set()
class StreamSink(StreamMonitor, StreamPause):
@@ -277,7 +309,7 @@ class StreamSink(StreamMonitor, StreamPause):
self.queue_occupancy_limit = -1
def full(self):
if self.queue_occupancy_limit > 0 and len(self.queue) >= self.queue_occupancy_limit:
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True
else:
return False
@@ -285,8 +317,9 @@ class StreamSink(StreamMonitor, StreamPause):
def _handle_reset(self, state):
super()._handle_reset(state)
if state:
if self.ready is not None:
self.ready <= 0
self.ready.value = 0
async def _run(self):
while True:
@@ -299,11 +332,11 @@ class StreamSink(StreamMonitor, StreamPause):
if ready_sample and valid_sample:
obj = self._transaction_obj()
self.bus.sample(obj)
self.queue.append(obj)
self.queue_sync.set()
self.queue.put_nowait(obj)
self.active_event.set()
if self.ready is not None:
self.ready <= (not self.full() and not self.pause)
self.ready.value = (not self.full() and not self.pause)
def define_stream(name, signals, optional_signals=None, valid_signal=None, ready_signal=None, signal_widths=None):

View File

@@ -1 +1 @@
__version__ = "0.1.6"
__version__ = "0.1.14"

View File

@@ -17,9 +17,10 @@ long-description-content-type = text/markdown
platforms = any
classifiers =
Development Status :: 3 - Alpha
Programming Language :: Python :: 3
Framework :: cocotb
License :: OSI Approved :: MIT License
Operating System :: OS Independent
Programming Language :: Python :: 3
Topic :: Scientific/Engineering :: Electronic Design Automation (EDA)
[options]
@@ -27,6 +28,7 @@ packages = find_namespace:
python_requires = >=3.6
install_requires =
cocotb
cocotb-bus
[options.extras_require]
test =

View File

@@ -73,10 +73,10 @@ class TB:
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 1
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 0
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
@@ -85,7 +85,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
tb = TB(dut)
byte_width = tb.axi_master.write_if.byte_width
byte_lanes = tb.axi_master.write_if.byte_lanes
max_burst_size = tb.axi_master.write_if.max_burst_size
if size is None:
@@ -96,8 +96,8 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -120,7 +120,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
tb = TB(dut)
byte_width = tb.axi_master.write_if.byte_width
byte_lanes = tb.axi_master.write_if.byte_lanes
max_burst_size = tb.axi_master.write_if.max_burst_size
if size is None:
@@ -131,8 +131,8 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
for length in list(range(1, byte_lanes*2))+[1024]:
for offset in list(range(byte_lanes))+list(range(4096-byte_lanes, 4096)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -151,15 +151,20 @@ async def run_test_write_words(dut):
tb = TB(dut)
byte_width = tb.axi_master.write_if.byte_width
byte_lanes = tb.axi_master.write_if.byte_lanes
await tb.cycle_reset()
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
for offset in list(range(byte_lanes)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
event = tb.axi_master.init_write(addr, test_data)
await event.wait()
assert tb.axi_ram.read(addr, length) == test_data
test_data = bytearray([x % 256 for x in range(length)])
await tb.axi_master.write(addr, test_data)
assert tb.axi_ram.read(addr, length) == test_data
@@ -200,15 +205,21 @@ async def run_test_read_words(dut):
tb = TB(dut)
byte_width = tb.axi_master.write_if.byte_width
byte_lanes = tb.axi_master.write_if.byte_lanes
await tb.cycle_reset()
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
for offset in list(range(byte_lanes)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(addr, test_data)
event = tb.axi_master.init_read(addr, length)
await event.wait()
assert event.data.data == test_data
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(addr, test_data)
assert (await tb.axi_master.read(addr, length)).data == test_data
@@ -287,9 +298,9 @@ def cycle_pause():
if cocotb.SIM_NAME:
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
byte_width = data_width // 8
max_burst_size = (byte_width-1).bit_length()
data_width = len(cocotb.top.axi_wdata)
byte_lanes = data_width // 8
max_burst_size = (byte_lanes-1).bit_length()
for test in [run_test_write, run_test_read]:

View File

@@ -70,10 +70,10 @@ class TB:
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 1
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 0
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
@@ -82,15 +82,15 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
tb = TB(dut)
byte_width = tb.axil_master.write_if.byte_width
byte_lanes = tb.axil_master.write_if.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in range(1, byte_width*2):
for offset in range(byte_width):
for length in range(1, byte_lanes*2):
for offset in range(byte_lanes):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -113,15 +113,15 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
tb = TB(dut)
byte_width = tb.axil_master.write_if.byte_width
byte_lanes = tb.axil_master.write_if.byte_lanes
await tb.cycle_reset()
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
for length in range(1, byte_width*2):
for offset in range(byte_width):
for length in range(1, byte_lanes*2):
for offset in range(byte_lanes):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -140,15 +140,20 @@ async def run_test_write_words(dut):
tb = TB(dut)
byte_width = tb.axil_master.write_if.byte_width
byte_lanes = tb.axil_master.write_if.byte_lanes
await tb.cycle_reset()
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
for offset in list(range(byte_lanes)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
event = tb.axil_master.init_write(addr, test_data)
await event.wait()
assert tb.axil_ram.read(addr, length) == test_data
test_data = bytearray([x % 256 for x in range(length)])
await tb.axil_master.write(addr, test_data)
assert tb.axil_ram.read(addr, length) == test_data
@@ -189,15 +194,21 @@ async def run_test_read_words(dut):
tb = TB(dut)
byte_width = tb.axil_master.write_if.byte_width
byte_lanes = tb.axil_master.write_if.byte_lanes
await tb.cycle_reset()
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
for offset in list(range(byte_lanes)):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axil_ram.write(addr, test_data)
event = tb.axil_master.init_read(addr, length)
await event.wait()
assert event.data.data == test_data
test_data = bytearray([x % 256 for x in range(length)])
tb.axil_ram.write(addr, test_data)
assert (await tb.axil_master.read(addr, length)).data == test_data

View File

@@ -63,10 +63,10 @@ class TB:
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 1
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 0
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
@@ -126,7 +126,7 @@ def cycle_pause():
def size_list():
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
data_width = len(cocotb.top.axis_tdata)
byte_width = data_width // 8
return list(range(1, byte_width*4+1)) + [512] + [1]*64