23 Commits

Author SHA1 Message Date
Alex Forencich
8668a6bb4f Release v0.1.4 2020-12-18 16:12:13 -08:00
Alex Forencich
bdaeaad66b Update readme 2020-12-18 16:10:26 -08:00
Alex Forencich
cd272b2a59 Convert send/recv to blocking, add nonblocking send_nowait/recv_nowait 2020-12-18 15:33:23 -08:00
Alex Forencich
01212e37cd Minor refactoring, remove extra conversion 2020-12-17 00:27:17 -08:00
Alex Forencich
40a9bb9eac Remove unnecessary __init__.py files 2020-12-15 18:42:23 -08:00
Alex Forencich
1103783b08 Return tdata when AxiStreamFrame is cast to bytes 2020-12-11 23:45:23 -08:00
Alex Forencich
2451921923 Replace SimLog with logger 2020-12-10 02:20:14 -08:00
Alex Forencich
e6e8a06dfe Accept byte_size or byte_lanes arguments in AXI stream constructors if tkeep is not used 2020-12-07 01:58:49 -08:00
Alex Forencich
3dd8114c05 Rename byte_width to byte_lanes 2020-12-07 01:50:48 -08:00
Alex Forencich
a84b52077b Update readme 2020-12-06 00:50:28 -08:00
Alex Forencich
200c8c0b26 Update readme 2020-12-06 00:02:07 -08:00
Alex Forencich
0ff3d64540 Update readme 2020-12-05 01:36:09 -08:00
Alex Forencich
237745792c Bump to dev version v0.1.3 2020-12-04 13:40:22 -08:00
Alex Forencich
5f032b3c9b Release v0.1.2 2020-12-04 13:39:22 -08:00
Alex Forencich
31081c579b Point badge at correct project 2020-12-04 13:17:53 -08:00
Tomasz Hemperek
595e0b1b49 Add status badges 2020-12-04 13:16:10 -08:00
Alex Forencich
22b10f2dd8 Preload ID queue list 2020-12-04 12:39:25 -08:00
Alex Forencich
db3841dd99 Improve alignment bit masks 2020-12-04 00:51:32 -08:00
Alex Forencich
1057058cb3 Shorten stress test worker name 2020-12-03 21:03:05 -08:00
Alex Forencich
e5f2be4468 Use logger formatting instead of F-strings 2020-12-03 21:02:17 -08:00
Alex Forencich
4b5b62419b Add missing parameters 2020-12-03 19:34:03 -08:00
Alex Forencich
03f8fe0fd3 Use pytest importlib mode 2020-12-03 19:33:40 -08:00
Tomasz Hemperek
435b9c9282 Add coverge reporting in CI and upload to codecov 2020-12-03 12:52:51 -08:00
17 changed files with 271 additions and 157 deletions

View File

@@ -30,3 +30,8 @@ jobs:
- name: Test with tox
run: tox
- name: Upload coverage to codecov
run: |
pip install codecov
codecov

1
.gitignore vendored
View File

@@ -11,3 +11,4 @@ __pycache__/
.tox
sim_build_*
.coverage

View File

@@ -1,10 +1,14 @@
# AXI interface modules for Cocotb
[![Build Status](https://github.com/alexforencich/cocotbext-axi/workflows/Regression%20Tests/badge.svg?branch=master)](https://github.com/alexforencich/cocotbext-axi/actions/)
[![codecov](https://codecov.io/gh/alexforencich/cocotbext-axi/branch/master/graph/badge.svg)](https://codecov.io/gh/alexforencich/cocotbext-axi)
[![PyPI version](https://badge.fury.io/py/cocotbext-axi.svg)](https://pypi.org/project/cocotbext-axi)
GitHub repository: https://github.com/alexforencich/cocotbext-axi
## Introduction
AXI, AXI lite, and AXI stream simulation models for cocotb.
AXI, AXI lite, and AXI stream simulation models for [cocotb](https://github.com/cocotb/cocotb).
## Installation
@@ -92,21 +96,21 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
* `write_resp_ready()`: determine if any write response is available
* `get_write_resp()`: fetch first available write response
* `read(address, length, ...)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder, ws, ...)`: read _count_ _ws_-byte words, starting at _address_, default word size of `2`, default _byteorder_ `"little"`
* `read_dwords(address, count, byteorder, ...)`: read _count_ 4-byte dwords, starting at _address_, default _byteorder_ `"little"`
* `read_qwords(address, count, byteorder, ...)`: read _count_ 8-byte qwords, starting at _address_, default _byteorder_ `"little"`
* `read_words(address, count, byteorder='little', ws=2, ...)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder='little', ...)`: read _count_ 4-byte dwords, starting at _address_
* `read_qwords(address, count, byteorder='little', ...)`: read _count_ 8-byte qwords, starting at _address_
* `read_byte(address, ...)`: read single byte at _address_
* `read_word(address, byteorder, ws, ...)`: read single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"`
* `read_dword(address, byteorder, ...)`: read single 4-byte dword at _address_, default _byteorder_ `"little"`
* `read_qword(address, byteorder, ...)`: read single 8-byte qword at _address_, default _byteorder_ `"little"`
* `read_word(address, byteorder='little', ws=2, ...)`: read single _ws_-byte word at _address_
* `read_dword(address, byteorder='little', ...)`: read single 4-byte dword at _address_
* `read_qword(address, byteorder='little', ...)`: read single 8-byte qword at _address_
* `write(address, data, ...)`: write _data_ (bytes), starting at _address_
* `write_words(address, data, byteorder, ws, ...)`: write _data_ (_ws_-byte words), starting at _address_, default word size of `2`, default _byteorder_ `"little"`
* `write_dwords(address, data, byteorder, ...)`: write _data_ (4-byte dwords), starting at _address_, default _byteorder_ `"little"`
* `write_qwords(address, data, byteorder, ...)`: write _data_ (8-byte qwords), starting at _address_, default _byteorder_ `"little"`
* `write_words(address, data, byteorder='little', ws=2, ...)`: write _data_ (_ws_-byte words), starting at _address_
* `write_dwords(address, data, byteorder='little', ...)`: write _data_ (4-byte dwords), starting at _address_
* `write_qwords(address, data, byteorder='little', ...)`: write _data_ (8-byte qwords), starting at _address_
* `write_byte(address, data, ...)`: write single byte at _address_
* `write_word(address, data, byteorder, ws, ...)`: write single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"`
* `write_dword(address, data, byteorder, ...)`: write single 4-byte dword at _address_, default _byteorder_ `"little"`
* `write_qword(address, data, byteorder, ...)`: write single 8-byte qword at _address_, default _byteorder_ `"little"`
* `write_word(address, data, byteorder='little', ws=2, ...)`: write single _ws_-byte word at _address_
* `write_dword(address, data, byteorder='little', ...)`: write single 4-byte dword at _address_
* `write_qword(address, data, byteorder='little', ...)`: write single 8-byte qword at _address_
#### Additional optional arguments for `AxiMaster`
@@ -169,24 +173,24 @@ Multi-port memories can be constructed by passing the `mem` object of the first
#### Methods
* `read(address, length)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder, ws)`: read _count_ _ws_-byte words, starting at _address_, default word size of `2`, default _byteorder_ `"little"`
* `read_dwords(address, count, byteorder)`: read _count_ 4-byte dwords, starting at _address_, default _byteorder_ `"little"`
* `read_qwords(address, count, byteorder)`: read _count_ 8-byte qwords, starting at _address_, default _byteorder_ `"little"`
* `read_words(address, count, byteorder='little', ws=2)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder='little')`: read _count_ 4-byte dwords, starting at _address_
* `read_qwords(address, count, byteorder='little')`: read _count_ 8-byte qwords, starting at _address_
* `read_byte(address)`: read single byte at _address_
* `read_word(address, byteorder, ws)`: read single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"`
* `read_dword(address, byteorder)`: read single 4-byte dword at _address_, default _byteorder_ `"little"`
* `read_qword(address, byteorder)`: read single 8-byte qword at _address_, default _byteorder_ `"little"`
* `read_word(address, byteorder='little', ws=2)`: read single _ws_-byte word at _address_
* `read_dword(address, byteorder='little')`: read single 4-byte dword at _address_
* `read_qword(address, byteorder='little')`: read single 8-byte qword at _address_
* `write(address, data)`: write _data_ (bytes), starting at _address_
* `write_words(address, data, byteorder, ws)`: write _data_ (_ws_-byte words), starting at _address_, default word size of `2`, default _byteorder_ `"little"`
* `write_dwords(address, data, byteorder)`: write _data_ (4-byte dwords), starting at _address_, default _byteorder_ `"little"`
* `write_qwords(address, data, byteorder)`: write _data_ (8-byte qwords), starting at _address_, default _byteorder_ `"little"`
* `write_words(address, data, byteorder='little', ws=2)`: write _data_ (_ws_-byte words), starting at _address_
* `write_dwords(address, data, byteorder='little')`: write _data_ (4-byte dwords), starting at _address_
* `write_qwords(address, data, byteorder='little')`: write _data_ (8-byte qwords), starting at _address_
* `write_byte(address, data)`: write single byte at _address_
* `write_word(address, data, byteorder, ws)`: write single _ws_-byte word at _address_, default word size of `2`, default _byteorder_ `"little"`
* `write_dword(address, data, byteorder)`: write single 4-byte dword at _address_, default _byteorder_ `"little"`
* `write_qword(address, data, byteorder)`: write single 8-byte qword at _address_, default _byteorder_ `"little"`
* `hexdump(address, length, prefix)`: print hex dump of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_line(address, length, prefix)`: return hex dump (list of str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_str(address, length, prefix)`: return hex dump (str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `write_word(address, data, byteorder='little', ws=2)`: write single _ws_-byte word at _address_
* `write_dword(address, data, byteorder='little')`: write single 4-byte dword at _address_
* `write_qword(address, data, byteorder='little')`: write single 8-byte qword at _address_
* `hexdump(address, length, prefix='')`: print hex dump of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_line(address, length, prefix='')`: return hex dump (list of str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
* `hexdump_str(address, length, prefix='')`: return hex dump (str) of _length_ bytes starting from `address`, prefix lines with optional `prefix`
### AXI stream
@@ -194,7 +198,7 @@ The `AxiStreamSource`, `AxiStreamSink`, and `AxiStreamMonitor` classes can be us
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiStreamSource, AxiStreamSink
from cocotbext.axi import AxiStreamSource, AxiStreamSink, AxiStreamMonitor
axis_source = AxiStreamSource(dut, "s_axis", dut.clk, dut.rst)
axis_sink = AxiStreamSink(dut, "m_axis", dut.clk, dut.rst)
@@ -229,6 +233,10 @@ To receive data with an `AxiStreamSink` or `AxiStreamMonitor`, call `recv()` or
* _name_: signal name prefix (e.g. for `m_axis_tdata`, the prefix is `m_axis`)
* _clock_: clock signal
* _reset_: reset signal (optional)
* _byte_size_: byte size (optional)
* _byte_lanes_: byte lane count (optional)
Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related, in that _byte_lanes_ is set from `tkeep` if it is connected, and `byte_size*byte_lanes == len(tdata)`. So, if `tkeep` is connected, both _byte_size_ and _byte_lanes_ will be computed internally and cannot be overridden. If `tkeep` is not connected, then either _byte_size_ or _byte_lanes_ can be specified, and the other will be computed such that `byte_size*byte_lanes == len(tdata)`.
#### Attributes:
@@ -240,21 +248,27 @@ To receive data with an `AxiStreamSink` or `AxiStreamMonitor`, call `recv()` or
#### Methods
* `send(frame)`: send _frame_ (source)
* `write(data)`: send _data_ (alias of send) (source)
* `recv(compact)`: receive a frame, optionally compact frame (sink/monitor)
* `send(frame)`: send _frame_ (blocking) (source)
* `send_nowait(frame)`: send _frame_ (non-blocking) (source)
* `write(data)`: send _data_ (alias of send) (blocking) (source)
* `write_nowait(data)`: send _data_ (alias of send_nowait) (non-blocking) (source)
* `recv(compact=True)`: receive a frame as a `GmiiFrame` (blocking) (sink)
* `recv_nowait(compact=True)`: receive a frame as a `GmiiFrame` (non-blocking) (sink)
* `read(count)`: read _count_ bytes from buffer (blocking) (sink/monitor)
* `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor)
* `read(count)`: read _count_ bytes from buffer (sink/monitor)
* `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all)
* `full()`: returns _True_ if the queue occupancy limits are met (sink)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for idle (source) or frame received (sink/monitor)
* `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
* `set_pause_generator(generator)`: set generator for pause signal, generator will be advanced on every clock cycle (source/sink)
* `clear_pause_generator()`: remove generator for pause signal
* `clear_pause_generator()`: remove generator for pause signal (source/sink)
#### AxiStreamFrame object
The `AxiStreamFrame` object is a container for a frame to be transferred via AXI stream. The `tdata` field contains the packet data in the form of a list of bytes, a `bytearray` if the byte size is 8 bits or a `list` of `int`s otherwise. `tkeep`, `tid`, `tdest`, and `tuser` can either be `None`, an `int`, or a `list` of `int`s.
The `AxiStreamFrame` object is a container for a frame to be transferred via AXI stream. The `tdata` field contains the packet data in the form of a list of bytes, which is either a `bytearray` if the byte size is 8 bits or a `list` of `int`s otherwise. `tkeep`, `tid`, `tdest`, and `tuser` can either be `None`, an `int`, or a `list` of `int`s.
Attributes:

View File

@@ -22,11 +22,11 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple, Counter
import cocotb
from cocotb.triggers import Event
from cocotb.log import SimLog
from .version import __version__
from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp
@@ -49,7 +49,7 @@ AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
class AxiMasterWrite(object):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI master (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -74,7 +74,7 @@ class AxiMasterWrite(object):
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = {}
self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0
@@ -297,7 +297,7 @@ class AxiMasterWrite(object):
else:
w.wuser = 0
self.w_channel.send(w)
await self.w_channel.send(w)
cur_addr += num_bytes
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
@@ -318,18 +318,15 @@ class AxiMasterWrite(object):
user = []
for bid, burst_length in cmd.burst_list:
self.int_write_resp_queue_list.setdefault(bid, deque())
while True:
if self.int_write_resp_queue_list[bid]:
break
while not self.int_write_resp_queue_list[bid]:
b = await self.b_channel.recv()
await self.b_channel.wait()
b = self.b_channel.recv()
i = int(b.bid)
if self.active_id[int(b.bid)] <= 0:
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[int(b.bid)].append(b)
self.int_write_resp_queue_list[i].append(b)
b = self.int_write_resp_queue_list[bid].popleft()
@@ -366,7 +363,7 @@ class AxiMasterWrite(object):
class AxiMasterRead(object):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI master (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -390,7 +387,7 @@ class AxiMasterRead(object):
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = {}
self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0
@@ -599,18 +596,15 @@ class AxiMasterRead(object):
for rid, burst_length in cmd.burst_list:
for k in range(burst_length):
self.int_read_resp_queue_list.setdefault(rid, deque())
while True:
if self.int_read_resp_queue_list[rid]:
break
while not self.int_read_resp_queue_list[rid]:
r = await self.r_channel.recv()
await self.r_channel.wait()
r = self.r_channel.recv()
i = int(r.rid)
if self.active_id[int(r.rid)] <= 0:
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[int(r.rid)].append(r)
self.int_read_resp_queue_list[i].append(r)
r = self.int_read_resp_queue_list[rid].popleft()
@@ -672,13 +666,13 @@ class AxiMaster(object):
self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len)
self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len)
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None,
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, event)
self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, wuser, event)
self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())

View File

@@ -22,8 +22,9 @@ THE SOFTWARE.
"""
import logging
import cocotb
from cocotb.log import SimLog
from .version import __version__
from .constants import AxiBurstType, AxiProt, AxiResp
@@ -33,7 +34,7 @@ from .memory import Memory
class AxiRamWrite(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -71,8 +72,7 @@ class AxiRamWrite(Memory):
async def _process_write(self):
while True:
await self.aw_channel.wait()
aw = self.aw_channel.recv()
aw = await self.aw_channel.recv()
awid = int(aw.awid)
addr = int(aw.awaddr)
@@ -105,8 +105,7 @@ class AxiRamWrite(Memory):
for n in range(length):
cur_word_addr = (cur_addr // self.byte_width) * self.byte_width
await self.w_channel.wait()
w = self.w_channel.recv()
w = await self.w_channel.recv()
data = int(w.wdata)
strb = int(w.wstrb)
@@ -140,12 +139,12 @@ class AxiRamWrite(Memory):
b.bid = awid
b.bresp = AxiResp.OKAY
self.b_channel.send(b)
await self.b_channel.send(b)
class AxiRamRead(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -180,8 +179,7 @@ class AxiRamRead(Memory):
async def _process_read(self):
while True:
await self.ar_channel.wait()
ar = self.ar_channel.recv()
ar = await self.ar_channel.recv()
arid = int(ar.arid)
addr = int(ar.araddr)
@@ -224,7 +222,7 @@ class AxiRamRead(Memory):
r.rlast = n == length-1
r.rresp = AxiResp.OKAY
self.r_channel.send(r)
await self.r_channel.send(r)
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))

View File

@@ -22,11 +22,11 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple
import cocotb
from cocotb.triggers import Event
from cocotb.log import SimLog
from .version import __version__
from .constants import AxiProt, AxiResp
@@ -45,7 +45,7 @@ AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
class AxiLiteMasterWrite(object):
def __init__(self, entity, name, clock, reset=None):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite master (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -194,7 +194,7 @@ class AxiLiteMasterWrite(object):
w.wstrb = strb
await self.aw_channel.drive(aw)
self.w_channel.send(w)
await self.w_channel.send(w)
async def _process_write_resp(self):
while True:
@@ -207,8 +207,7 @@ class AxiLiteMasterWrite(object):
resp = AxiResp.OKAY
for k in range(cmd.cycles):
await self.b_channel.wait()
b = self.b_channel.recv()
b = await self.b_channel.recv()
cycle_resp = AxiResp(b.bresp)
@@ -231,7 +230,7 @@ class AxiLiteMasterWrite(object):
class AxiLiteMasterRead(object):
def __init__(self, entity, name, clock, reset=None):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite master (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -366,8 +365,7 @@ class AxiLiteMasterRead(object):
resp = AxiResp.OKAY
for k in range(cmd.cycles):
await self.r_channel.wait()
r = self.r_channel.recv()
r = await self.r_channel.recv()
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)

View File

@@ -22,8 +22,9 @@ THE SOFTWARE.
"""
import logging
import cocotb
from cocotb.log import SimLog
from .version import __version__
from .constants import AxiProt, AxiResp
@@ -33,7 +34,7 @@ from .memory import Memory
class AxiLiteRamWrite(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -68,14 +69,12 @@ class AxiLiteRamWrite(Memory):
async def _process_write(self):
while True:
await self.aw_channel.wait()
aw = self.aw_channel.recv()
aw = await self.aw_channel.recv()
addr = (int(aw.awaddr) // self.byte_width) * self.byte_width
prot = AxiProt(aw.awprot)
await self.w_channel.wait()
w = self.w_channel.recv()
w = await self.w_channel.recv()
data = int(w.wdata)
strb = int(w.wstrb)
@@ -98,12 +97,12 @@ class AxiLiteRamWrite(Memory):
b = self.b_channel._transaction_obj()
b.bresp = AxiResp.OKAY
self.b_channel.send(b)
await self.b_channel.send(b)
class AxiLiteRamRead(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.log.info("AXI lite RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -135,8 +134,7 @@ class AxiLiteRamRead(Memory):
async def _process_read(self):
while True:
await self.ar_channel.wait()
ar = self.ar_channel.recv()
ar = await self.ar_channel.recv()
addr = (int(ar.araddr) // self.byte_width) * self.byte_width
prot = AxiProt(ar.arprot)
@@ -151,7 +149,7 @@ class AxiLiteRamRead(Memory):
r.rdata = int.from_bytes(data, 'little')
r.rresp = AxiResp.OKAY
self.r_channel.send(r)
await self.r_channel.send(r)
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
addr, prot, ' '.join((f'{c:02x}' for c in data)))

View File

@@ -22,12 +22,12 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event
from cocotb.bus import Bus
from cocotb.log import SimLog
from .version import __version__
@@ -208,14 +208,17 @@ class AxiStreamFrame(object):
def __iter__(self):
return self.tdata.__iter__()
def __bytes__(self):
return bytes(self.tdata)
class AxiStreamSource(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
@@ -239,7 +242,7 @@ class AxiStreamSource(object):
self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata)
self.byte_width = 1
self.byte_lanes = 1
self.reset = reset
@@ -249,7 +252,6 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tlast"):
self.bus.tlast.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.bus.tkeep.setimmediatevalue(0)
if hasattr(self.bus, "tid"):
self.bus.tid.setimmediatevalue(0)
@@ -258,12 +260,24 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tuser"):
self.bus.tuser.setimmediatevalue(0)
self.byte_size = self.width // self.byte_width
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream source configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -284,16 +298,26 @@ class AxiStreamSource(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def send(self, frame):
async def send(self, frame):
self.send_nowait(frame)
def send_nowait(self, frame):
frame = AxiStreamFrame(frame)
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
def write(self, data):
self.send(data)
async def write(self, data):
await self.send(data)
def write_nowait(self, data):
self.send_nowait(data)
def count(self):
return len(self.queue)
@@ -370,7 +394,7 @@ class AxiStreamSource(object):
tdest_val = 0
tuser_val = 0
for offset in range(self.byte_width):
for offset in range(self.byte_lanes):
tdata_val |= (frame.tdata.pop(0) & self.byte_mask) << (offset * self.byte_size)
tkeep_val |= (frame.tkeep.pop(0) & 1) << offset
tid_val = frame.tid.pop(0)
@@ -413,8 +437,8 @@ class AxiStreamSink(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
@@ -442,21 +466,31 @@ class AxiStreamSink(object):
self.queue_occupancy_limit_frames = None
self.width = len(self.bus.tdata)
self.byte_width = 1
self.byte_lanes = 1
self.reset = reset
if hasattr(self.bus, "tready"):
self.bus.tready.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.byte_size = self.width // self.byte_width
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream sink configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -477,9 +511,19 @@ class AxiStreamSink(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def recv(self, compact=True):
async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue:
frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame)
@@ -489,11 +533,15 @@ class AxiStreamSink(object):
return frame
return None
def read(self, count=-1):
while True:
frame = self.recv(compact=True)
if frame is None:
break
async def read(self, count=-1):
while not self.read_queue:
frame = await self.recv(compact=True)
self.read_queue.extend(frame.tdata)
return self.read_nowait(count)
def read_nowait(self, count=-1):
while not self.empty():
frame = self.recv_nowait(compact=True)
self.read_queue.extend(frame.tdata)
if count < 0:
count = len(self.read_queue)
@@ -560,7 +608,7 @@ class AxiStreamSink(object):
continue
if tready_sample and tvalid_sample:
for offset in range(self.byte_width):
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
@@ -602,8 +650,8 @@ class AxiStreamMonitor(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
@@ -625,19 +673,28 @@ class AxiStreamMonitor(object):
self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata)
self.byte_width = 1
self.byte_lanes = 1
self.reset = reset
if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_width
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream monitor configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
@@ -658,9 +715,19 @@ class AxiStreamMonitor(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def recv(self, compact=True):
async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue:
frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame)
@@ -670,11 +737,15 @@ class AxiStreamMonitor(object):
return frame
return None
def read(self, count=-1):
while True:
frame = self.recv(compact=True)
if frame is None:
break
async def read(self, count=-1):
while not self.read_queue:
frame = await self.recv(compact=True)
self.read_queue.extend(frame.tdata)
return self.read_nowait(count)
def read_nowait(self, count=-1):
while not self.empty():
frame = self.recv_nowait(compact=True)
self.read_queue.extend(frame.tdata)
if count < 0:
count = len(self.read_queue)
@@ -718,7 +789,7 @@ class AxiStreamMonitor(object):
continue
if tready_sample and tvalid_sample:
for offset in range(self.byte_width):
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):

View File

@@ -22,12 +22,12 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event, First, Timer
from cocotb.bus import Bus
from cocotb.log import SimLog
from collections import deque
class StreamTransaction(object):
@@ -65,7 +65,7 @@ class StreamBase(object):
_transaction_obj = StreamTransaction
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
@@ -169,7 +169,10 @@ class StreamSource(StreamBase, StreamPause):
self.drive_obj = obj
def send(self, obj):
async def send(self, obj):
self.send_nowait(obj)
def send_nowait(self, obj):
self.queue.append(obj)
self.queue_sync.set()
@@ -246,7 +249,13 @@ class StreamSink(StreamBase, StreamPause):
cocotb.fork(self._run_sink())
def recv(self):
async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
def recv_nowait(self):
if self.queue:
return self.queue.popleft()
return None
@@ -310,7 +319,13 @@ class StreamMonitor(StreamBase):
cocotb.fork(self._run_monitor())
def recv(self):
async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
def recv_nowait(self):
if self.queue:
return self.queue.popleft()
return None

View File

@@ -1 +1 @@
__version__ = "0.1.0"
__version__ = "0.1.4"

View File

@@ -40,6 +40,8 @@ include = cocotbext.*
[tool:pytest]
testpaths =
tests
addopts =
--import-mode importlib
# tox configuration
[tox:tox]
@@ -53,10 +55,30 @@ python =
3.9: py39
[testenv]
setenv =
COVERAGE=1
deps =
pytest
pytest-xdist
cocotb-test
coverage
pytest-cov
commands =
pytest -n auto
pytest --cov=cocotbext --cov=tests --cov-branch -n auto
bash -c 'find . -type f -name "\.coverage" | xargs coverage combine --append'
whitelist_externals =
bash
# combine if paths are different
[coverage:paths]
source =
cocotbext/
/*/cocotbext
# do not report dependencies
[coverage:report]
omit =
.tox/*

View File

View File

@@ -98,7 +98,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -106,7 +106,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
await tb.axi_master.write(addr, test_data, size=size)
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axi_ram.read(addr, length) == test_data
assert tb.axi_ram.read(addr-1, 1) == b'\xaa'
@@ -133,7 +133,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -157,7 +157,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -206,7 +206,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -254,7 +254,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16):
async def worker(master, offset, aperture, count=16):
for k in range(count):
length = random.randint(1, min(512, aperture))
addr = offset+random.randint(0, aperture-length)
@@ -272,7 +272,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = []
for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
workers.append(cocotb.fork(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()

View File

View File

@@ -91,7 +91,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
for length in range(1, byte_width*2):
for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -99,7 +99,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
await tb.axil_master.write(addr, test_data)
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axil_ram.read(addr, length) == test_data
assert tb.axil_ram.read(addr-1, 1) == b'\xaa'
@@ -122,7 +122,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
for length in range(1, byte_width*2):
for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -146,7 +146,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -195,7 +195,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)):
for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}")
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
@@ -243,7 +243,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16):
async def worker(master, offset, aperture, count=16):
for k in range(count):
length = random.randint(1, min(32, aperture))
addr = offset+random.randint(0, aperture-length)
@@ -261,7 +261,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = []
for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
workers.append(cocotb.fork(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
while workers:
await workers.pop(0).join()

View File

View File

@@ -90,23 +90,21 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N
test_frame = AxiStreamFrame(test_data)
test_frame.tid = cur_id
test_frame.tdest = cur_id
tb.source.send(test_frame)
await tb.source.send(test_frame)
test_frames.append(test_frame)
cur_id = (cur_id + 1) % id_count
for test_frame in test_frames:
await tb.sink.wait()
rx_frame = tb.sink.recv()
rx_frame = await tb.sink.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
await tb.monitor.wait()
rx_frame = tb.monitor.recv()
rx_frame = await tb.monitor.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid