23 Commits

Author SHA1 Message Date
Alex Forencich
8668a6bb4f Release v0.1.4 2020-12-18 16:12:13 -08:00
Alex Forencich
bdaeaad66b Update readme 2020-12-18 16:10:26 -08:00
Alex Forencich
cd272b2a59 Convert send/recv to blocking, add nonblocking send_nowait/recv_nowait 2020-12-18 15:33:23 -08:00
Alex Forencich
01212e37cd Minor refactoring, remove extra conversion 2020-12-17 00:27:17 -08:00
Alex Forencich
40a9bb9eac Remove unnecessary __init__.py files 2020-12-15 18:42:23 -08:00
Alex Forencich
1103783b08 Return tdata when AxiStreamFrame is cast to bytes 2020-12-11 23:45:23 -08:00
Alex Forencich
2451921923 Replace SimLog with logger 2020-12-10 02:20:14 -08:00
Alex Forencich
e6e8a06dfe Accept byte_size or byte_lanes arguments in AXI stream constructors if tkeep is not used 2020-12-07 01:58:49 -08:00
Alex Forencich
3dd8114c05 Rename byte_width to byte_lanes 2020-12-07 01:50:48 -08:00
Alex Forencich
a84b52077b Update readme 2020-12-06 00:50:28 -08:00
Alex Forencich
200c8c0b26 Update readme 2020-12-06 00:02:07 -08:00
Alex Forencich
0ff3d64540 Update readme 2020-12-05 01:36:09 -08:00
Alex Forencich
237745792c Bump to dev version v0.1.3 2020-12-04 13:40:22 -08:00
Alex Forencich
5f032b3c9b Release v0.1.2 2020-12-04 13:39:22 -08:00
Alex Forencich
31081c579b Point badge at correct project 2020-12-04 13:17:53 -08:00
Tomasz Hemperek
595e0b1b49 Add status badges 2020-12-04 13:16:10 -08:00
Alex Forencich
22b10f2dd8 Preload ID queue list 2020-12-04 12:39:25 -08:00
Alex Forencich
db3841dd99 Improve alignment bit masks 2020-12-04 00:51:32 -08:00
Alex Forencich
1057058cb3 Shorten stress test worker name 2020-12-03 21:03:05 -08:00
Alex Forencich
e5f2be4468 Use logger formatting instead of F-strings 2020-12-03 21:02:17 -08:00
Alex Forencich
4b5b62419b Add missing parameters 2020-12-03 19:34:03 -08:00
Alex Forencich
03f8fe0fd3 Use pytest importlib mode 2020-12-03 19:33:40 -08:00
Tomasz Hemperek
435b9c9282 Add coverge reporting in CI and upload to codecov 2020-12-03 12:52:51 -08:00
17 changed files with 271 additions and 157 deletions

View File

@@ -30,3 +30,8 @@ jobs:
- name: Test with tox - name: Test with tox
run: tox run: tox
- name: Upload coverage to codecov
run: |
pip install codecov
codecov

1
.gitignore vendored
View File

@@ -11,3 +11,4 @@ __pycache__/
.tox .tox
sim_build_* sim_build_*
.coverage

View File

@@ -1,10 +1,14 @@
# AXI interface modules for Cocotb # AXI interface modules for Cocotb
[![Build Status](https://github.com/alexforencich/cocotbext-axi/workflows/Regression%20Tests/badge.svg?branch=master)](https://github.com/alexforencich/cocotbext-axi/actions/)
[![codecov](https://codecov.io/gh/alexforencich/cocotbext-axi/branch/master/graph/badge.svg)](https://codecov.io/gh/alexforencich/cocotbext-axi)
[![PyPI version](https://badge.fury.io/py/cocotbext-axi.svg)](https://pypi.org/project/cocotbext-axi)
GitHub repository: https://github.com/alexforencich/cocotbext-axi GitHub repository: https://github.com/alexforencich/cocotbext-axi
## Introduction ## Introduction
AXI, AXI lite, and AXI stream simulation models for cocotb. AXI, AXI lite, and AXI stream simulation models for [cocotb](https://github.com/cocotb/cocotb).
## Installation ## Installation
@@ -92,21 +96,21 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
* `write_resp_ready()`: determine if any write response is available * `write_resp_ready()`: determine if any write response is available
* `get_write_resp()`: fetch first available write response * `get_write_resp()`: fetch first available write response
* `read(address, length, ...)`: read _length_ bytes, starting at _address_ * `read(address, length, ...)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder, ws, ...)`: read _count_ _ws_-byte words, starting at _address_, default word size of `2`, default _byteorder_ `"little"` * `read_words(address, count, byteorder='little', ws=2, ...)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder, ...)`: read _count_ 4-byte dwords, starting at _address_, default _byteorder_ `"little"` * `read_dwords(address, count, byteorder='little', ...)`: read _count_ 4-byte dwords, starting at _address_
* `read_qwords(address, count, byteorder, ...)`: read _count_ 8-byte qwords, starting at _address_, default _byteorder_ `"little"` * `read_qwords(address, count, byteorder='little', ...)`: read _count_ 8-byte qwords, starting at _address_
* `read_byte(address, ...)`: read single byte at _address_ * `read_byte(address, ...)`: read single byte at _address_
* `read_word(address, byteorder, ws, ...)`: read single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"` * `read_word(address, byteorder='little', ws=2, ...)`: read single _ws_-byte word at _address_
* `read_dword(address, byteorder, ...)`: read single 4-byte dword at _address_, default _byteorder_ `"little"` * `read_dword(address, byteorder='little', ...)`: read single 4-byte dword at _address_
* `read_qword(address, byteorder, ...)`: read single 8-byte qword at _address_, default _byteorder_ `"little"` * `read_qword(address, byteorder='little', ...)`: read single 8-byte qword at _address_
* `write(address, data, ...)`: write _data_ (bytes), starting at _address_ * `write(address, data, ...)`: write _data_ (bytes), starting at _address_
* `write_words(address, data, byteorder, ws, ...)`: write _data_ (_ws_-byte words), starting at _address_, default word size of `2`, default _byteorder_ `"little"` * `write_words(address, data, byteorder='little', ws=2, ...)`: write _data_ (_ws_-byte words), starting at _address_
* `write_dwords(address, data, byteorder, ...)`: write _data_ (4-byte dwords), starting at _address_, default _byteorder_ `"little"` * `write_dwords(address, data, byteorder='little', ...)`: write _data_ (4-byte dwords), starting at _address_
* `write_qwords(address, data, byteorder, ...)`: write _data_ (8-byte qwords), starting at _address_, default _byteorder_ `"little"` * `write_qwords(address, data, byteorder='little', ...)`: write _data_ (8-byte qwords), starting at _address_
* `write_byte(address, data, ...)`: write single byte at _address_ * `write_byte(address, data, ...)`: write single byte at _address_
* `write_word(address, data, byteorder, ws, ...)`: write single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"` * `write_word(address, data, byteorder='little', ws=2, ...)`: write single _ws_-byte word at _address_
* `write_dword(address, data, byteorder, ...)`: write single 4-byte dword at _address_, default _byteorder_ `"little"` * `write_dword(address, data, byteorder='little', ...)`: write single 4-byte dword at _address_
* `write_qword(address, data, byteorder, ...)`: write single 8-byte qword at _address_, default _byteorder_ `"little"` * `write_qword(address, data, byteorder='little', ...)`: write single 8-byte qword at _address_
#### Additional optional arguments for `AxiMaster` #### Additional optional arguments for `AxiMaster`
@@ -169,24 +173,24 @@ Multi-port memories can be constructed by passing the `mem` object of the first
#### Methods #### Methods
* `read(address, length)`: read _length_ bytes, starting at _address_ * `read(address, length)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder, ws)`: read _count_ _ws_-byte words, starting at _address_, default word size of `2`, default _byteorder_ `"little"` * `read_words(address, count, byteorder='little', ws=2)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder)`: read _count_ 4-byte dwords, starting at _address_, default _byteorder_ `"little"` * `read_dwords(address, count, byteorder='little')`: read _count_ 4-byte dwords, starting at _address_
* `read_qwords(address, count, byteorder)`: read _count_ 8-byte qwords, starting at _address_, default _byteorder_ `"little"` * `read_qwords(address, count, byteorder='little')`: read _count_ 8-byte qwords, starting at _address_
* `read_byte(address)`: read single byte at _address_ * `read_byte(address)`: read single byte at _address_
* `read_word(address, byteorder, ws)`: read single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"` * `read_word(address, byteorder='little', ws=2)`: read single _ws_-byte word at _address_
* `read_dword(address, byteorder)`: read single 4-byte dword at _address_, default _byteorder_ `"little"` * `read_dword(address, byteorder='little')`: read single 4-byte dword at _address_
* `read_qword(address, byteorder)`: read single 8-byte qword at _address_, default _byteorder_ `"little"` * `read_qword(address, byteorder='little')`: read single 8-byte qword at _address_
* `write(address, data)`: write _data_ (bytes), starting at _address_ * `write(address, data)`: write _data_ (bytes), starting at _address_
* `write_words(address, data, byteorder, ws)`: write _data_ (_ws_-byte words), starting at _address_, default word size of `2`, default _byteorder_ `"little"` * `write_words(address, data, byteorder='little', ws=2)`: write _data_ (_ws_-byte words), starting at _address_
* `write_dwords(address, data, byteorder)`: write _data_ (4-byte dwords), starting at _address_, default _byteorder_ `"little"` * `write_dwords(address, data, byteorder='little')`: write _data_ (4-byte dwords), starting at _address_
* `write_qwords(address, data, byteorder)`: write _data_ (8-byte qwords), starting at _address_, default _byteorder_ `"little"` * `write_qwords(address, data, byteorder='little')`: write _data_ (8-byte qwords), starting at _address_
* `write_byte(address, data)`: write single byte at _address_ * `write_byte(address, data)`: write single byte at _address_
* `write_word(address, data, byteorder, ws)`: write single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"` * `write_word(address, data, byteorder='little', ws=2)`: write single _ws_-byte word at _address_
* `write_dword(address, data, byteorder)`: write single 4-byte dword at _address_, default _byteorder_ `"little"` * `write_dword(address, data, byteorder='little')`: write single 4-byte dword at _address_
* `write_qword(address, data, byteorder)`: write single 8-byte qword at _address_, default _byteorder_ `"little"` * `write_qword(address, data, byteorder='little')`: write single 8-byte qword at _address_
* `hexdump(address, length, prefix)`: print hex dump of _length_ bytes starting from `address`, prefix lines with optional `prefix` * `hexdump(address, length, prefix='')`: print hex dump of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_line(address, length, prefix)`: return hex dump (list of str) of _length_ bytes starting from `address`, prefix lines with optional `prefix` * `hexdump_line(address, length, prefix='')`: return hex dump (list of str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_str(address, length, prefix)`: return hex dump (str) of _length_ bytes starting from `address`, prefix lines with optional `prefix` * `hexdump_str(address, length, prefix='')`: return hex dump (str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
### AXI stream ### AXI stream
@@ -194,7 +198,7 @@ The `AxiStreamSource`, `AxiStreamSink`, and `AxiStreamMonitor` classes can be us
To use these modules, import the one you need and connect it to the DUT: To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiStreamSource, AxiStreamSink from cocotbext.axi import AxiStreamSource, AxiStreamSink, AxiStreamMonitor
axis_source = AxiStreamSource(dut, "s_axis", dut.clk, dut.rst) axis_source = AxiStreamSource(dut, "s_axis", dut.clk, dut.rst)
axis_sink = AxiStreamSink(dut, "m_axis", dut.clk, dut.rst) axis_sink = AxiStreamSink(dut, "m_axis", dut.clk, dut.rst)
@@ -229,6 +233,10 @@ To receive data with an `AxiStreamSink` or `AxiStreamMonitor`, call `recv()` or
* _name_: signal name prefix (e.g. for `m_axis_tdata`, the prefix is `m_axis`) * _name_: signal name prefix (e.g. for `m_axis_tdata`, the prefix is `m_axis`)
* _clock_: clock signal * _clock_: clock signal
* _reset_: reset signal (optional) * _reset_: reset signal (optional)
* _byte_size_: byte size (optional)
* _byte_lanes_: byte lane count (optional)
Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related, in that _byte_lanes_ is set from `tkeep` if it is connected, and `byte_size*byte_lanes == len(tdata)`. So, if `tkeep` is connected, both _byte_size_ and _byte_lanes_ will be computed internally and cannot be overridden. If `tkeep` is not connected, then either _byte_size_ or _byte_lanes_ can be specified, and the other will be computed such that `byte_size*byte_lanes == len(tdata)`.
#### Attributes: #### Attributes:
@@ -240,21 +248,27 @@ To receive data with an `AxiStreamSink` or `AxiStreamMonitor`, call `recv()` or
#### Methods #### Methods
* `send(frame)`: send _frame_ (source) * `send(frame)`: send _frame_ (blocking) (source)
* `write(data)`: send _data_ (alias of send) (source) * `send_nowait(frame)`: send _frame_ (non-blocking) (source)
* `recv(compact)`: receive a frame, optionally compact frame (sink/monitor) * `write(data)`: send _data_ (alias of send) (blocking) (source)
* `write_nowait(data)`: send _data_ (alias of send_nowait) (non-blocking) (source)
* `recv(compact=True)`: receive a frame as a `GmiiFrame` (blocking) (sink)
* `recv_nowait(compact=True)`: receive a frame as a `GmiiFrame` (non-blocking) (sink)
* `read(count)`: read _count_ bytes from buffer (blocking) (sink/monitor)
* `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor)
* `read(count)`: read _count_ bytes from buffer (sink/monitor) * `read(count)`: read _count_ bytes from buffer (sink/monitor)
* `count()`: returns the number of items in the queue (all) * `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all) * `empty()`: returns _True_ if the queue is empty (all)
* `full()`: returns _True_ if the queue occupancy limits are met (sink) * `full()`: returns _True_ if the queue occupancy limits are met (sink)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for idle (source) or frame received (sink/monitor) * `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
* `set_pause_generator(generator)`: set generator for pause signal, generator will be advanced on every clock cycle (source/sink) * `set_pause_generator(generator)`: set generator for pause signal, generator will be advanced on every clock cycle (source/sink)
* `clear_pause_generator()`: remove generator for pause signal * `clear_pause_generator()`: remove generator for pause signal (source/sink)
#### AxiStreamFrame object #### AxiStreamFrame object
The `AxiStreamFrame` object is a container for a frame to be transferred via AXI stream. The `tdata` field contains the packet data in the form of a list of bytes, a `bytearray` if the byte size is 8 bits or a `list` of `int`s otherwise. `tkeep`, `tid`, `tdest`, and `tuser` can either be `None`, an `int`, or a `list` of `int`s. The `AxiStreamFrame` object is a container for a frame to be transferred via AXI stream. The `tdata` field contains the packet data in the form of a list of bytes, which is either a `bytearray` if the byte size is 8 bits or a `list` of `int`s otherwise. `tkeep`, `tid`, `tdest`, and `tuser` can either be `None`, an `int`, or a `list` of `int`s.
Attributes: Attributes:

View File

@@ -22,11 +22,11 @@ THE SOFTWARE.
""" """
import logging
from collections import deque, namedtuple, Counter from collections import deque, namedtuple, Counter
import cocotb import cocotb
from cocotb.triggers import Event from cocotb.triggers import Event
from cocotb.log import SimLog
from .version import __version__ from .version import __version__
from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp
@@ -49,7 +49,7 @@ AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
class AxiMasterWrite(object): class AxiMasterWrite(object):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256): def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI master (write)") self.log.info("AXI master (write)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -74,7 +74,7 @@ class AxiMasterWrite(object):
self.int_write_resp_command_queue = deque() self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event() self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = {} self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
@@ -297,7 +297,7 @@ class AxiMasterWrite(object):
else: else:
w.wuser = 0 w.wuser = 0
self.w_channel.send(w) await self.w_channel.send(w)
cur_addr += num_bytes cur_addr += num_bytes
cycle_offset = (cycle_offset + num_bytes) % self.byte_width cycle_offset = (cycle_offset + num_bytes) % self.byte_width
@@ -318,18 +318,15 @@ class AxiMasterWrite(object):
user = [] user = []
for bid, burst_length in cmd.burst_list: for bid, burst_length in cmd.burst_list:
self.int_write_resp_queue_list.setdefault(bid, deque()) while not self.int_write_resp_queue_list[bid]:
while True: b = await self.b_channel.recv()
if self.int_write_resp_queue_list[bid]:
break
await self.b_channel.wait() i = int(b.bid)
b = self.b_channel.recv()
if self.active_id[int(b.bid)] <= 0: if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}") raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[int(b.bid)].append(b) self.int_write_resp_queue_list[i].append(b)
b = self.int_write_resp_queue_list[bid].popleft() b = self.int_write_resp_queue_list[bid].popleft()
@@ -366,7 +363,7 @@ class AxiMasterWrite(object):
class AxiMasterRead(object): class AxiMasterRead(object):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256): def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI master (read)") self.log.info("AXI master (read)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -390,7 +387,7 @@ class AxiMasterRead(object):
self.int_read_resp_command_queue = deque() self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event() self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = {} self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
@@ -599,18 +596,15 @@ class AxiMasterRead(object):
for rid, burst_length in cmd.burst_list: for rid, burst_length in cmd.burst_list:
for k in range(burst_length): for k in range(burst_length):
self.int_read_resp_queue_list.setdefault(rid, deque()) while not self.int_read_resp_queue_list[rid]:
while True: r = await self.r_channel.recv()
if self.int_read_resp_queue_list[rid]:
break
await self.r_channel.wait() i = int(r.rid)
r = self.r_channel.recv()
if self.active_id[int(r.rid)] <= 0: if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}") raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[int(r.rid)].append(r) self.int_read_resp_queue_list[i].append(r)
r = self.int_read_resp_queue_list[rid].popleft() r = self.int_read_resp_queue_list[rid].popleft()
@@ -672,13 +666,13 @@ class AxiMaster(object):
self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len) self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len)
self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len) self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len)
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None): lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, event) self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None): cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, wuser, event) self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
def idle(self): def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle()) return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())

View File

@@ -22,8 +22,9 @@ THE SOFTWARE.
""" """
import logging
import cocotb import cocotb
from cocotb.log import SimLog
from .version import __version__ from .version import __version__
from .constants import AxiBurstType, AxiProt, AxiResp from .constants import AxiBurstType, AxiProt, AxiResp
@@ -33,7 +34,7 @@ from .memory import Memory
class AxiRamWrite(Memory): class AxiRamWrite(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI RAM model (write)") self.log.info("AXI RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -71,8 +72,7 @@ class AxiRamWrite(Memory):
async def _process_write(self): async def _process_write(self):
while True: while True:
await self.aw_channel.wait() aw = await self.aw_channel.recv()
aw = self.aw_channel.recv()
awid = int(aw.awid) awid = int(aw.awid)
addr = int(aw.awaddr) addr = int(aw.awaddr)
@@ -105,8 +105,7 @@ class AxiRamWrite(Memory):
for n in range(length): for n in range(length):
cur_word_addr = (cur_addr // self.byte_width) * self.byte_width cur_word_addr = (cur_addr // self.byte_width) * self.byte_width
await self.w_channel.wait() w = await self.w_channel.recv()
w = self.w_channel.recv()
data = int(w.wdata) data = int(w.wdata)
strb = int(w.wstrb) strb = int(w.wstrb)
@@ -140,12 +139,12 @@ class AxiRamWrite(Memory):
b.bid = awid b.bid = awid
b.bresp = AxiResp.OKAY b.bresp = AxiResp.OKAY
self.b_channel.send(b) await self.b_channel.send(b)
class AxiRamRead(Memory): class AxiRamRead(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI RAM model (read)") self.log.info("AXI RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -180,8 +179,7 @@ class AxiRamRead(Memory):
async def _process_read(self): async def _process_read(self):
while True: while True:
await self.ar_channel.wait() ar = await self.ar_channel.recv()
ar = self.ar_channel.recv()
arid = int(ar.arid) arid = int(ar.arid)
addr = int(ar.araddr) addr = int(ar.araddr)
@@ -224,7 +222,7 @@ class AxiRamRead(Memory):
r.rlast = n == length-1 r.rlast = n == length-1
r.rresp = AxiResp.OKAY r.rresp = AxiResp.OKAY
self.r_channel.send(r) await self.r_channel.send(r)
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s", self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
arid, cur_addr, ' '.join((f'{c:02x}' for c in data))) arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))

View File

@@ -22,11 +22,11 @@ THE SOFTWARE.
""" """
import logging
from collections import deque, namedtuple from collections import deque, namedtuple
import cocotb import cocotb
from cocotb.triggers import Event from cocotb.triggers import Event
from cocotb.log import SimLog
from .version import __version__ from .version import __version__
from .constants import AxiProt, AxiResp from .constants import AxiProt, AxiResp
@@ -45,7 +45,7 @@ AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
class AxiLiteMasterWrite(object): class AxiLiteMasterWrite(object):
def __init__(self, entity, name, clock, reset=None): def __init__(self, entity, name, clock, reset=None):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite master (write)") self.log.info("AXI lite master (write)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -194,7 +194,7 @@ class AxiLiteMasterWrite(object):
w.wstrb = strb w.wstrb = strb
await self.aw_channel.drive(aw) await self.aw_channel.drive(aw)
self.w_channel.send(w) await self.w_channel.send(w)
async def _process_write_resp(self): async def _process_write_resp(self):
while True: while True:
@@ -207,8 +207,7 @@ class AxiLiteMasterWrite(object):
resp = AxiResp.OKAY resp = AxiResp.OKAY
for k in range(cmd.cycles): for k in range(cmd.cycles):
await self.b_channel.wait() b = await self.b_channel.recv()
b = self.b_channel.recv()
cycle_resp = AxiResp(b.bresp) cycle_resp = AxiResp(b.bresp)
@@ -231,7 +230,7 @@ class AxiLiteMasterWrite(object):
class AxiLiteMasterRead(object): class AxiLiteMasterRead(object):
def __init__(self, entity, name, clock, reset=None): def __init__(self, entity, name, clock, reset=None):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite master (read)") self.log.info("AXI lite master (read)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -366,8 +365,7 @@ class AxiLiteMasterRead(object):
resp = AxiResp.OKAY resp = AxiResp.OKAY
for k in range(cmd.cycles): for k in range(cmd.cycles):
await self.r_channel.wait() r = await self.r_channel.recv()
r = self.r_channel.recv()
cycle_data = int(r.rdata) cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp) cycle_resp = AxiResp(r.rresp)

View File

@@ -22,8 +22,9 @@ THE SOFTWARE.
""" """
import logging
import cocotb import cocotb
from cocotb.log import SimLog
from .version import __version__ from .version import __version__
from .constants import AxiProt, AxiResp from .constants import AxiProt, AxiResp
@@ -33,7 +34,7 @@ from .memory import Memory
class AxiLiteRamWrite(Memory): class AxiLiteRamWrite(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite RAM model (write)") self.log.info("AXI lite RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -68,14 +69,12 @@ class AxiLiteRamWrite(Memory):
async def _process_write(self): async def _process_write(self):
while True: while True:
await self.aw_channel.wait() aw = await self.aw_channel.recv()
aw = self.aw_channel.recv()
addr = (int(aw.awaddr) // self.byte_width) * self.byte_width addr = (int(aw.awaddr) // self.byte_width) * self.byte_width
prot = AxiProt(aw.awprot) prot = AxiProt(aw.awprot)
await self.w_channel.wait() w = await self.w_channel.recv()
w = self.w_channel.recv()
data = int(w.wdata) data = int(w.wdata)
strb = int(w.wstrb) strb = int(w.wstrb)
@@ -98,12 +97,12 @@ class AxiLiteRamWrite(Memory):
b = self.b_channel._transaction_obj() b = self.b_channel._transaction_obj()
b.bresp = AxiResp.OKAY b.bresp = AxiResp.OKAY
self.b_channel.send(b) await self.b_channel.send(b)
class AxiLiteRamRead(Memory): class AxiLiteRamRead(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite RAM model (read)") self.log.info("AXI lite RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__) self.log.info("cocotbext-axi version %s", __version__)
@@ -135,8 +134,7 @@ class AxiLiteRamRead(Memory):
async def _process_read(self): async def _process_read(self):
while True: while True:
await self.ar_channel.wait() ar = await self.ar_channel.recv()
ar = self.ar_channel.recv()
addr = (int(ar.araddr) // self.byte_width) * self.byte_width addr = (int(ar.araddr) // self.byte_width) * self.byte_width
prot = AxiProt(ar.arprot) prot = AxiProt(ar.arprot)
@@ -151,7 +149,7 @@ class AxiLiteRamRead(Memory):
r.rdata = int.from_bytes(data, 'little') r.rdata = int.from_bytes(data, 'little')
r.rresp = AxiResp.OKAY r.rresp = AxiResp.OKAY
self.r_channel.send(r) await self.r_channel.send(r)
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s", self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
addr, prot, ' '.join((f'{c:02x}' for c in data))) addr, prot, ' '.join((f'{c:02x}' for c in data)))

View File

@@ -22,12 +22,12 @@ THE SOFTWARE.
""" """
import logging
from collections import deque from collections import deque
import cocotb import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event
from cocotb.bus import Bus from cocotb.bus import Bus
from cocotb.log import SimLog
from .version import __version__ from .version import __version__
@@ -208,14 +208,17 @@ class AxiStreamFrame(object):
def __iter__(self): def __iter__(self):
return self.tdata.__iter__() return self.tdata.__iter__()
def __bytes__(self):
return bytes(self.tdata)
class AxiStreamSource(object): class AxiStreamSource(object):
_signals = ["tdata"] _signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity self.entity = entity
self.clock = clock self.clock = clock
self.reset = reset self.reset = reset
@@ -239,7 +242,7 @@ class AxiStreamSource(object):
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata) self.width = len(self.bus.tdata)
self.byte_width = 1 self.byte_lanes = 1
self.reset = reset self.reset = reset
@@ -249,7 +252,6 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tlast"): if hasattr(self.bus, "tlast"):
self.bus.tlast.setimmediatevalue(0) self.bus.tlast.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"): if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.bus.tkeep.setimmediatevalue(0) self.bus.tkeep.setimmediatevalue(0)
if hasattr(self.bus, "tid"): if hasattr(self.bus, "tid"):
self.bus.tid.setimmediatevalue(0) self.bus.tid.setimmediatevalue(0)
@@ -258,12 +260,24 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tuser"): if hasattr(self.bus, "tuser"):
self.bus.tuser.setimmediatevalue(0) self.bus.tuser.setimmediatevalue(0)
self.byte_size = self.width // self.byte_width if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1 self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream source configuration:") self.log.info("AXI stream source configuration:")
self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -284,16 +298,26 @@ class AxiStreamSource(object):
else: else:
self.log.info(" tuser: not present") self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run()) cocotb.fork(self._run())
def send(self, frame): async def send(self, frame):
self.send_nowait(frame)
def send_nowait(self, frame):
frame = AxiStreamFrame(frame) frame = AxiStreamFrame(frame)
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.append(frame)
def write(self, data): async def write(self, data):
self.send(data) await self.send(data)
def write_nowait(self, data):
self.send_nowait(data)
def count(self): def count(self):
return len(self.queue) return len(self.queue)
@@ -370,7 +394,7 @@ class AxiStreamSource(object):
tdest_val = 0 tdest_val = 0
tuser_val = 0 tuser_val = 0
for offset in range(self.byte_width): for offset in range(self.byte_lanes):
tdata_val |= (frame.tdata.pop(0) & self.byte_mask) << (offset * self.byte_size) tdata_val |= (frame.tdata.pop(0) & self.byte_mask) << (offset * self.byte_size)
tkeep_val |= (frame.tkeep.pop(0) & 1) << offset tkeep_val |= (frame.tkeep.pop(0) & 1) << offset
tid_val = frame.tid.pop(0) tid_val = frame.tid.pop(0)
@@ -413,8 +437,8 @@ class AxiStreamSink(object):
_signals = ["tdata"] _signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity self.entity = entity
self.clock = clock self.clock = clock
self.reset = reset self.reset = reset
@@ -442,21 +466,31 @@ class AxiStreamSink(object):
self.queue_occupancy_limit_frames = None self.queue_occupancy_limit_frames = None
self.width = len(self.bus.tdata) self.width = len(self.bus.tdata)
self.byte_width = 1 self.byte_lanes = 1
self.reset = reset self.reset = reset
if hasattr(self.bus, "tready"): if hasattr(self.bus, "tready"):
self.bus.tready.setimmediatevalue(0) self.bus.tready.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.byte_size = self.width // self.byte_width if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1 self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream sink configuration:") self.log.info("AXI stream sink configuration:")
self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -477,9 +511,19 @@ class AxiStreamSink(object):
else: else:
self.log.info(" tuser: not present") self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run()) cocotb.fork(self._run())
def recv(self, compact=True): async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue: if self.queue:
frame = self.queue.popleft() frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
@@ -489,11 +533,15 @@ class AxiStreamSink(object):
return frame return frame
return None return None
def read(self, count=-1): async def read(self, count=-1):
while True: while not self.read_queue:
frame = self.recv(compact=True) frame = await self.recv(compact=True)
if frame is None: self.read_queue.extend(frame.tdata)
break return self.read_nowait(count)
def read_nowait(self, count=-1):
while not self.empty():
frame = self.recv_nowait(compact=True)
self.read_queue.extend(frame.tdata) self.read_queue.extend(frame.tdata)
if count < 0: if count < 0:
count = len(self.read_queue) count = len(self.read_queue)
@@ -560,7 +608,7 @@ class AxiStreamSink(object):
continue continue
if tready_sample and tvalid_sample: if tready_sample and tvalid_sample:
for offset in range(self.byte_width): for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask) frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"): if hasattr(self.bus, "tkeep"):
@@ -602,8 +650,8 @@ class AxiStreamMonitor(object):
_signals = ["tdata"] _signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity self.entity = entity
self.clock = clock self.clock = clock
self.reset = reset self.reset = reset
@@ -625,19 +673,28 @@ class AxiStreamMonitor(object):
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata) self.width = len(self.bus.tdata)
self.byte_width = 1 self.byte_lanes = 1
self.reset = reset self.reset = reset
if hasattr(self.bus, "tkeep"): if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep) self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_width self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1 self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream monitor configuration:") self.log.info("AXI stream monitor configuration:")
self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -658,9 +715,19 @@ class AxiStreamMonitor(object):
else: else:
self.log.info(" tuser: not present") self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run()) cocotb.fork(self._run())
def recv(self, compact=True): async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue: if self.queue:
frame = self.queue.popleft() frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
@@ -670,11 +737,15 @@ class AxiStreamMonitor(object):
return frame return frame
return None return None
def read(self, count=-1): async def read(self, count=-1):
while True: while not self.read_queue:
frame = self.recv(compact=True) frame = await self.recv(compact=True)
if frame is None: self.read_queue.extend(frame.tdata)
break return self.read_nowait(count)
def read_nowait(self, count=-1):
while not self.empty():
frame = self.recv_nowait(compact=True)
self.read_queue.extend(frame.tdata) self.read_queue.extend(frame.tdata)
if count < 0: if count < 0:
count = len(self.read_queue) count = len(self.read_queue)
@@ -718,7 +789,7 @@ class AxiStreamMonitor(object):
continue continue
if tready_sample and tvalid_sample: if tready_sample and tvalid_sample:
for offset in range(self.byte_width): for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask) frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"): if hasattr(self.bus, "tkeep"):

View File

@@ -22,12 +22,12 @@ THE SOFTWARE.
""" """
import logging
from collections import deque
import cocotb import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event, First, Timer from cocotb.triggers import RisingEdge, ReadOnly, Event, First, Timer
from cocotb.bus import Bus from cocotb.bus import Bus
from cocotb.log import SimLog
from collections import deque
class StreamTransaction(object): class StreamTransaction(object):
@@ -65,7 +65,7 @@ class StreamBase(object):
_transaction_obj = StreamTransaction _transaction_obj = StreamTransaction
def __init__(self, entity, name, clock, reset=None, *args, **kwargs): def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity self.entity = entity
self.clock = clock self.clock = clock
self.reset = reset self.reset = reset
@@ -169,7 +169,10 @@ class StreamSource(StreamBase, StreamPause):
self.drive_obj = obj self.drive_obj = obj
def send(self, obj): async def send(self, obj):
self.send_nowait(obj)
def send_nowait(self, obj):
self.queue.append(obj) self.queue.append(obj)
self.queue_sync.set() self.queue_sync.set()
@@ -246,7 +249,13 @@ class StreamSink(StreamBase, StreamPause):
cocotb.fork(self._run_sink()) cocotb.fork(self._run_sink())
def recv(self): async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
def recv_nowait(self):
if self.queue: if self.queue:
return self.queue.popleft() return self.queue.popleft()
return None return None
@@ -310,7 +319,13 @@ class StreamMonitor(StreamBase):
cocotb.fork(self._run_monitor()) cocotb.fork(self._run_monitor())
def recv(self): async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
def recv_nowait(self):
if self.queue: if self.queue:
return self.queue.popleft() return self.queue.popleft()
return None return None

View File

@@ -1 +1 @@
__version__ = "0.1.0" __version__ = "0.1.4"

View File

@@ -40,6 +40,8 @@ include = cocotbext.*
[tool:pytest] [tool:pytest]
testpaths = testpaths =
tests tests
addopts =
--import-mode importlib
# tox configuration # tox configuration
[tox:tox] [tox:tox]
@@ -53,10 +55,30 @@ python =
3.9: py39 3.9: py39
[testenv] [testenv]
setenv =
COVERAGE=1
deps = deps =
pytest pytest
pytest-xdist pytest-xdist
cocotb-test cocotb-test
coverage
pytest-cov
commands = commands =
pytest -n auto pytest --cov=cocotbext --cov=tests --cov-branch -n auto
bash -c 'find . -type f -name "\.coverage" | xargs coverage combine --append'
whitelist_externals =
bash
# combine if paths are different
[coverage:paths]
source =
cocotbext/
/*/cocotbext
# do not report dependencies
[coverage:report]
omit =
.tox/*

View File

View File

@@ -98,7 +98,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
for length in list(range(1, byte_width*2))+[1024]: for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -106,7 +106,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
await tb.axi_master.write(addr, test_data, size=size) await tb.axi_master.write(addr, test_data, size=size)
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axi_ram.read(addr, length) == test_data assert tb.axi_ram.read(addr, length) == test_data
assert tb.axi_ram.read(addr-1, 1) == b'\xaa' assert tb.axi_ram.read(addr-1, 1) == b'\xaa'
@@ -133,7 +133,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
for length in list(range(1, byte_width*2))+[1024]: for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -157,7 +157,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -206,7 +206,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -254,7 +254,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter) tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter) tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16): async def worker(master, offset, aperture, count=16):
for k in range(count): for k in range(count):
length = random.randint(1, min(512, aperture)) length = random.randint(1, min(512, aperture))
addr = offset+random.randint(0, aperture-length) addr = offset+random.randint(0, aperture-length)
@@ -272,7 +272,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = [] workers = []
for k in range(16): for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axi_master, k*0x1000, 0x1000, count=16))) workers.append(cocotb.fork(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
while workers: while workers:
await workers.pop(0).join() await workers.pop(0).join()

View File

View File

@@ -91,7 +91,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
for length in range(1, byte_width*2): for length in range(1, byte_width*2):
for offset in range(byte_width): for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -99,7 +99,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
await tb.axil_master.write(addr, test_data) await tb.axil_master.write(addr, test_data)
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axil_ram.read(addr, length) == test_data assert tb.axil_ram.read(addr, length) == test_data
assert tb.axil_ram.read(addr-1, 1) == b'\xaa' assert tb.axil_ram.read(addr-1, 1) == b'\xaa'
@@ -122,7 +122,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
for length in range(1, byte_width*2): for length in range(1, byte_width*2):
for offset in range(byte_width): for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -146,7 +146,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -195,7 +195,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -243,7 +243,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter) tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter) tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16): async def worker(master, offset, aperture, count=16):
for k in range(count): for k in range(count):
length = random.randint(1, min(32, aperture)) length = random.randint(1, min(32, aperture))
addr = offset+random.randint(0, aperture-length) addr = offset+random.randint(0, aperture-length)
@@ -261,7 +261,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = [] workers = []
for k in range(16): for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axil_master, k*0x1000, 0x1000, count=16))) workers.append(cocotb.fork(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
while workers: while workers:
await workers.pop(0).join() await workers.pop(0).join()

View File

View File

@@ -90,23 +90,21 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N
test_frame = AxiStreamFrame(test_data) test_frame = AxiStreamFrame(test_data)
test_frame.tid = cur_id test_frame.tid = cur_id
test_frame.tdest = cur_id test_frame.tdest = cur_id
tb.source.send(test_frame) await tb.source.send(test_frame)
test_frames.append(test_frame) test_frames.append(test_frame)
cur_id = (cur_id + 1) % id_count cur_id = (cur_id + 1) % id_count
for test_frame in test_frames: for test_frame in test_frames:
await tb.sink.wait() rx_frame = await tb.sink.recv()
rx_frame = tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser assert not rx_frame.tuser
await tb.monitor.wait() rx_frame = await tb.monitor.recv()
rx_frame = tb.monitor.recv()
assert rx_frame.tdata == test_frame.tdata assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid assert rx_frame.tid == test_frame.tid