Clean up and lint removal
This commit is contained in:
@@ -24,7 +24,7 @@ THE SOFTWARE.
|
||||
|
||||
from .version import __version__
|
||||
|
||||
from .constants import *
|
||||
from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp
|
||||
|
||||
from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink
|
||||
|
||||
@@ -33,4 +33,3 @@ from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam
|
||||
|
||||
from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster
|
||||
from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam
|
||||
|
||||
|
||||
@@ -26,36 +26,37 @@ from .stream import define_stream
|
||||
|
||||
# Write address channel
|
||||
AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW",
|
||||
signals = ["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"],
|
||||
optional_signals = ["awlock", "awcache", "awqos", "awregion", "awuser"],
|
||||
signal_widths = {"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1, "awcache": 4, "awprot": 3, "awqos": 4, "awregion": 4}
|
||||
signals=["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"],
|
||||
optional_signals=["awlock", "awcache", "awqos", "awregion", "awuser"],
|
||||
signal_widths={"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1,
|
||||
"awcache": 4, "awprot": 3, "awqos": 4, "awregion": 4}
|
||||
)
|
||||
|
||||
# Write data channel
|
||||
AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("AxiW",
|
||||
signals = ["wdata", "wstrb", "wlast", "wvalid", "wready"],
|
||||
optional_signals = ["wuser"],
|
||||
signal_widths = {"wlast": 1}
|
||||
signals=["wdata", "wstrb", "wlast", "wvalid", "wready"],
|
||||
optional_signals=["wuser"],
|
||||
signal_widths={"wlast": 1}
|
||||
)
|
||||
|
||||
# Write response channel
|
||||
AxiBTransaction, AxiBSource, AxiBSink, AxiBMonitor = define_stream("AxiB",
|
||||
signals = ["bid", "bresp", "bvalid", "bready"],
|
||||
optional_signals = ["buser"],
|
||||
signal_widths = {"bresp": 2}
|
||||
signals=["bid", "bresp", "bvalid", "bready"],
|
||||
optional_signals=["buser"],
|
||||
signal_widths={"bresp": 2}
|
||||
)
|
||||
|
||||
# Read address channel
|
||||
AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR",
|
||||
signals = ["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"],
|
||||
optional_signals = ["arlock", "arcache", "arqos", "arregion", "aruser"],
|
||||
signal_widths = {"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1, "arcache": 4, "arprot": 3, "arqos": 4, "arregion": 4}
|
||||
signals=["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"],
|
||||
optional_signals=["arlock", "arcache", "arqos", "arregion", "aruser"],
|
||||
signal_widths={"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1,
|
||||
"arcache": 4, "arprot": 3, "arqos": 4, "arregion": 4}
|
||||
)
|
||||
|
||||
# Read data channel
|
||||
AxiRTransaction, AxiRSource, AxiRSink, AxiRMonitor = define_stream("AxiR",
|
||||
signals = ["rid", "rdata", "rresp", "rlast", "rvalid", "rready"],
|
||||
optional_signals = ["ruser"],
|
||||
signal_widths = {"rresp": 2, "rlast": 1}
|
||||
signals=["rid", "rdata", "rresp", "rlast", "rvalid", "rready"],
|
||||
optional_signals=["ruser"],
|
||||
signal_widths={"rresp": 2, "rlast": 1}
|
||||
)
|
||||
|
||||
|
||||
@@ -22,15 +22,15 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import RisingEdge, Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from collections import deque
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from .version import __version__
|
||||
from .constants import *
|
||||
from .axi_channels import *
|
||||
from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp
|
||||
from .axi_channels import AxiAWSource, AxiWSource, AxiBSink, AxiARSource, AxiRSink
|
||||
|
||||
|
||||
class AxiMasterWrite(object):
|
||||
@@ -81,7 +81,8 @@ class AxiMasterWrite(object):
|
||||
cocotb.fork(self._process_write())
|
||||
cocotb.fork(self._process_write_resp())
|
||||
|
||||
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
if token is not None:
|
||||
if token in self.active_tokens:
|
||||
raise Exception("Token is not unique")
|
||||
@@ -130,35 +131,43 @@ class AxiMasterWrite(object):
|
||||
return resp
|
||||
return None
|
||||
|
||||
async def write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
token = object()
|
||||
self.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, token)
|
||||
await self.wait_for_token(token)
|
||||
return self.get_write_resp(token)[1:3]
|
||||
|
||||
async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
words = data
|
||||
data = bytearray()
|
||||
for w in words:
|
||||
data.extend(w.to_bytes(ws, 'little'))
|
||||
await self.write(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write_words(address, data, 4, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write_words(address, data, 8, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write(address, [data], burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write_words(address, [data], ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write_dwords(address, [data], burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
await self.write_qwords(address, [data], burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def _process_write(self):
|
||||
@@ -167,7 +176,7 @@ class AxiMasterWrite(object):
|
||||
self.write_command_sync.clear()
|
||||
await self.write_command_sync.wait()
|
||||
|
||||
address, data, burst, size, lock, cache, prot, qos, region, user, token = self.write_command_queue.popleft()
|
||||
addr, data, burst, size, lock, cache, prot, qos, region, user, token = self.write_command_queue.popleft()
|
||||
|
||||
num_bytes = self.byte_width
|
||||
|
||||
@@ -177,13 +186,13 @@ class AxiMasterWrite(object):
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_width
|
||||
|
||||
aligned_addr = (address // num_bytes) * num_bytes
|
||||
word_addr = (address // self.byte_width) * self.byte_width
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
word_addr = (addr // self.byte_width) * self.byte_width
|
||||
|
||||
start_offset = address % self.byte_width
|
||||
end_offset = ((address + len(data) - 1) % self.byte_width) + 1
|
||||
start_offset = addr % self.byte_width
|
||||
end_offset = ((addr + len(data) - 1) % self.byte_width) + 1
|
||||
|
||||
cycles = (len(data) + (address % num_bytes) + num_bytes-1) // num_bytes
|
||||
cycles = (len(data) + (addr % num_bytes) + num_bytes-1) // num_bytes
|
||||
|
||||
cur_addr = aligned_addr
|
||||
offset = 0
|
||||
@@ -194,7 +203,8 @@ class AxiMasterWrite(object):
|
||||
burst_list = []
|
||||
burst_length = 0
|
||||
|
||||
self.log.info(f"Write start addr: {address:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for k in range(cycles):
|
||||
start = cycle_offset
|
||||
@@ -222,8 +232,10 @@ class AxiMasterWrite(object):
|
||||
transfer_count += 1
|
||||
n = 0
|
||||
|
||||
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len
|
||||
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align
|
||||
# split on burst length
|
||||
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256))
|
||||
# split on 4k address boundary
|
||||
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
|
||||
|
||||
burst_list.append((awid, burst_length))
|
||||
|
||||
@@ -242,7 +254,8 @@ class AxiMasterWrite(object):
|
||||
|
||||
await self.aw_channel.drive(aw)
|
||||
|
||||
self.log.info(f"Write burst start awid {awid:#x} awaddr: {cur_addr:#010x} awlen: {burst_length-1} awsize: {size}")
|
||||
self.log.info("Write burst start awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
|
||||
awid, cur_addr, burst_length-1, size, prot)
|
||||
|
||||
n += 1
|
||||
|
||||
@@ -256,7 +269,7 @@ class AxiMasterWrite(object):
|
||||
cur_addr += num_bytes
|
||||
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
|
||||
|
||||
self.int_write_resp_command_queue.append((address, len(data), size, cycles, prot, burst_list, token))
|
||||
self.int_write_resp_command_queue.append((addr, len(data), size, cycles, prot, burst_list, token))
|
||||
self.int_write_resp_command_sync.set()
|
||||
|
||||
async def _process_write_resp(self):
|
||||
@@ -298,9 +311,10 @@ class AxiMasterWrite(object):
|
||||
self.id_queue.append(bid)
|
||||
self.id_sync.set()
|
||||
|
||||
self.log.info(f"Write burst complete bid {burst_id:#x} bresp: {burst_resp!s}")
|
||||
self.log.info("Write burst complete bid: 0x%x bresp: %s", burst_id, burst_resp)
|
||||
|
||||
self.log.info(f"Write complete addr: {addr:#010x} prot: {prot} resp: {resp!s} length: {length}")
|
||||
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
|
||||
addr, prot, resp, length)
|
||||
|
||||
self.write_resp_queue.append((addr, length, resp, user, token))
|
||||
self.write_resp_sync.set()
|
||||
@@ -349,7 +363,8 @@ class AxiMasterRead(object):
|
||||
cocotb.fork(self._process_read())
|
||||
cocotb.fork(self._process_read_resp())
|
||||
|
||||
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
if token is not None:
|
||||
if token in self.active_tokens:
|
||||
raise Exception("Token is not unique")
|
||||
@@ -398,35 +413,43 @@ class AxiMasterRead(object):
|
||||
return resp
|
||||
return None
|
||||
|
||||
async def read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read(self, address, length, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
token = object()
|
||||
self.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, token)
|
||||
await self.wait_for_token(token)
|
||||
return self.get_read_data(token)[1:3]
|
||||
|
||||
async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
data = await self.read(address, count*ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
words = []
|
||||
for k in range(count):
|
||||
words.append(int.from_bytes(data[0][ws*k:ws*(k+1)], 'little'))
|
||||
return words
|
||||
|
||||
async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_words(address, count, 4, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_words(address, count, 8, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_byte(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read(address, 1, burst, size, lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_words(address, 1, ws, burst, size, lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_dword(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_dwords(address, 1, burst, size, lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_qword(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return (await self.read_qwords(address, 1, burst, size, lock, cache, prot, qos, region, user))[0]
|
||||
|
||||
async def _process_read(self):
|
||||
@@ -435,7 +458,7 @@ class AxiMasterRead(object):
|
||||
self.read_command_sync.clear()
|
||||
await self.read_command_sync.wait()
|
||||
|
||||
address, length, burst, size, lock, cache, prot, qos, region, user, token = self.read_command_queue.popleft()
|
||||
addr, length, burst, size, lock, cache, prot, qos, region, user, token = self.read_command_queue.popleft()
|
||||
|
||||
num_bytes = self.byte_width
|
||||
|
||||
@@ -445,10 +468,9 @@ class AxiMasterRead(object):
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_width
|
||||
|
||||
aligned_addr = (address // num_bytes) * num_bytes
|
||||
word_addr = (address // self.byte_width) * self.byte_width
|
||||
aligned_addr = (addr // num_bytes) * num_bytes
|
||||
|
||||
cycles = (length + num_bytes-1 + (address % num_bytes)) // num_bytes
|
||||
cycles = (length + num_bytes-1 + (addr % num_bytes)) // num_bytes
|
||||
|
||||
burst_list = []
|
||||
|
||||
@@ -457,6 +479,8 @@ class AxiMasterRead(object):
|
||||
|
||||
burst_length = 0
|
||||
|
||||
self.log.info("Read start addr: 0x%08x prot: %s", addr, prot)
|
||||
|
||||
for k in range(cycles):
|
||||
|
||||
n += 1
|
||||
@@ -469,8 +493,10 @@ class AxiMasterRead(object):
|
||||
|
||||
n = 0
|
||||
|
||||
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len
|
||||
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align
|
||||
# split on burst length
|
||||
burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256))
|
||||
# split on 4k address boundary
|
||||
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
|
||||
|
||||
burst_list.append((arid, burst_length))
|
||||
|
||||
@@ -489,11 +515,12 @@ class AxiMasterRead(object):
|
||||
|
||||
await self.ar_channel.drive(ar)
|
||||
|
||||
self.log.info(f"Read burst start arid {arid:#x} araddr: {cur_addr:#010x} arlen: {burst_length-1} arsize: {size}")
|
||||
self.log.info("Read burst start arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
|
||||
arid, cur_addr, burst_length-1, size, prot)
|
||||
|
||||
cur_addr += num_bytes
|
||||
|
||||
self.int_read_resp_command_queue.append((address, length, size, cycles, prot, burst_list, token))
|
||||
self.int_read_resp_command_queue.append((addr, length, size, cycles, prot, burst_list, token))
|
||||
self.int_read_resp_command_sync.set()
|
||||
|
||||
async def _process_read_resp(self):
|
||||
@@ -510,7 +537,6 @@ class AxiMasterRead(object):
|
||||
word_addr = (addr // self.byte_width) * self.byte_width
|
||||
|
||||
start_offset = addr % self.byte_width
|
||||
end_offset = ((addr + length - 1) % self.byte_width) + 1
|
||||
|
||||
cycle_offset = aligned_addr - word_addr
|
||||
data = bytearray()
|
||||
@@ -566,11 +592,12 @@ class AxiMasterRead(object):
|
||||
self.id_queue.append(rid)
|
||||
self.id_sync.set()
|
||||
|
||||
self.log.info(f"Read burst complete rid {cycle_id:#x} rresp: {resp!s}")
|
||||
self.log.info("Read burst complete rid: 0x%x rresp: %s", cycle_id, resp)
|
||||
|
||||
data = data[:length]
|
||||
|
||||
self.log.info(f"Read complete addr: {addr:#010x} prot: {prot} resp: {resp!s} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
addr, prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
self.read_data_queue.append((addr, data, resp, user, token))
|
||||
self.read_data_sync.set()
|
||||
@@ -587,10 +614,12 @@ class AxiMaster(object):
|
||||
self.write_if = AxiMasterWrite(entity, name, clock, reset)
|
||||
self.read_if = AxiMasterRead(entity, name, clock, reset)
|
||||
|
||||
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, token)
|
||||
|
||||
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
|
||||
self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, token)
|
||||
|
||||
def idle(self):
|
||||
@@ -619,52 +648,66 @@ class AxiMaster(object):
|
||||
def get_write_resp(self, token=None):
|
||||
return self.write_if.get_write_resp(token)
|
||||
|
||||
async def read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read(self, address, length, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read(address, length, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_words(address, count, ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_dwords(address, count, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_qwords(address, count, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_byte(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_byte(address, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_word(address, ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_dword(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_dword(address, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def read_qword(self, address, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.read_if.read_qword(address, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_words(address, data, ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_dwords(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_qwords(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_byte(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_word(address, data, ws, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_dword(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
|
||||
return await self.write_if.write_qword(address, data, burst, size, lock, cache, prot, qos, region, user)
|
||||
|
||||
|
||||
|
||||
@@ -22,16 +22,14 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import mmap
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
import mmap
|
||||
from collections import deque
|
||||
|
||||
from .version import __version__
|
||||
from .constants import *
|
||||
from .axi_channels import *
|
||||
from .constants import AxiBurstType, AxiProt, AxiResp
|
||||
from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource
|
||||
from .utils import hexdump, hexdump_str
|
||||
|
||||
|
||||
@@ -96,7 +94,8 @@ class AxiRamWrite(object):
|
||||
burst = int(aw.awburst)
|
||||
prot = AxiProt(int(aw.awprot))
|
||||
|
||||
self.log.info(f"Write burst awid: {awid:#x} awaddr: {addr:#010x} awlen: {length} awsize: {size} awprot: {prot}")
|
||||
self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s",
|
||||
awid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_width
|
||||
@@ -112,7 +111,7 @@ class AxiRamWrite(object):
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr&0xfff) >= transfer_size
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
@@ -132,7 +131,8 @@ class AxiRamWrite(object):
|
||||
|
||||
data = data.to_bytes(self.byte_width, 'little')
|
||||
|
||||
self.log.debug(f"Write word awid: {awid:#x} addr: {cur_addr:#010x} wstrb: {strb:#04x} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s",
|
||||
awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_width):
|
||||
if strb & (1 << i):
|
||||
@@ -171,9 +171,6 @@ class AxiRamRead(object):
|
||||
self.ar_channel = AxiARSink(entity, name, clock, reset)
|
||||
self.r_channel = AxiRSource(entity, name, clock, reset)
|
||||
|
||||
self.int_read_resp_command_queue = deque()
|
||||
self.int_read_resp_command_sync = Event()
|
||||
|
||||
self.in_flight_operations = 0
|
||||
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
@@ -212,7 +209,8 @@ class AxiRamRead(object):
|
||||
burst = int(ar.arburst)
|
||||
prot = AxiProt(ar.arprot)
|
||||
|
||||
self.log.info(f"Read burst arid: {arid:#x} araddr: {addr:#010x} arlen: {length} arsize: {size} arprot: {prot}")
|
||||
self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s",
|
||||
arid, addr, length, size, prot)
|
||||
|
||||
num_bytes = 2**size
|
||||
assert 0 < num_bytes <= self.byte_width
|
||||
@@ -228,7 +226,7 @@ class AxiRamRead(object):
|
||||
|
||||
if burst == AxiBurstType.INCR:
|
||||
# check 4k boundary crossing
|
||||
assert 0x1000-(aligned_addr&0xfff) >= transfer_size
|
||||
assert 0x1000-(aligned_addr & 0xfff) >= transfer_size
|
||||
|
||||
cur_addr = aligned_addr
|
||||
|
||||
@@ -247,7 +245,8 @@ class AxiRamRead(object):
|
||||
|
||||
self.r_channel.send(r)
|
||||
|
||||
self.log.debug(f"Read word arid: {arid:#x} addr: {cur_addr:#010x} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s",
|
||||
arid, cur_addr, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
if burst != AxiBurstType.FIXED:
|
||||
cur_addr += num_bytes
|
||||
@@ -284,4 +283,3 @@ class AxiRam(object):
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem, address, length, prefix=prefix)
|
||||
|
||||
|
||||
@@ -26,30 +26,29 @@ from .stream import define_stream
|
||||
|
||||
# Write address channel
|
||||
AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMonitor = define_stream("AxiLiteAW",
|
||||
signals = ["awaddr", "awprot", "awvalid", "awready"],
|
||||
signal_widths = {"awprot": 3}
|
||||
signals=["awaddr", "awprot", "awvalid", "awready"],
|
||||
signal_widths={"awprot": 3}
|
||||
)
|
||||
|
||||
# Write data channel
|
||||
AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor = define_stream("AxiLiteW",
|
||||
signals = ["wdata", "wstrb", "wvalid", "wready"]
|
||||
signals=["wdata", "wstrb", "wvalid", "wready"]
|
||||
)
|
||||
|
||||
# Write response channel
|
||||
AxiLiteBTransaction, AxiLiteBSource, AxiLiteBSink, AxiLiteBMonitor = define_stream("AxiLiteB",
|
||||
signals = ["bresp", "bvalid", "bready"],
|
||||
signal_widths = {"bresp": 2}
|
||||
signals=["bresp", "bvalid", "bready"],
|
||||
signal_widths={"bresp": 2}
|
||||
)
|
||||
|
||||
# Read address channel
|
||||
AxiLiteARTransaction, AxiLiteARSource, AxiLiteARSink, AxiLiteARMonitor = define_stream("AxiLiteAR",
|
||||
signals = ["araddr", "arprot", "arvalid", "arready"],
|
||||
signal_widths = {"arprot": 3}
|
||||
signals=["araddr", "arprot", "arvalid", "arready"],
|
||||
signal_widths={"arprot": 3}
|
||||
)
|
||||
|
||||
# Read data channel
|
||||
AxiLiteRTransaction, AxiLiteRSource, AxiLiteRSink, AxiLiteRMonitor = define_stream("AxiLiteR",
|
||||
signals = ["rdata", "rresp", "rvalid", "rready"],
|
||||
signal_widths = {"rresp": 2}
|
||||
signals=["rdata", "rresp", "rvalid", "rready"],
|
||||
signal_widths={"rresp": 2}
|
||||
)
|
||||
|
||||
|
||||
@@ -22,15 +22,15 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import RisingEdge, Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from collections import deque
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from .version import __version__
|
||||
from .constants import *
|
||||
from .axil_channels import *
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSource, AxiLiteWSource, AxiLiteBSink, AxiLiteARSource, AxiLiteRSink
|
||||
|
||||
|
||||
class AxiLiteMasterWrite(object):
|
||||
@@ -175,7 +175,8 @@ class AxiLiteMasterWrite(object):
|
||||
|
||||
offset = 0
|
||||
|
||||
self.log.info(f"Write start addr: {address:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
|
||||
address, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for k in range(cycles):
|
||||
start = 0
|
||||
@@ -224,7 +225,8 @@ class AxiLiteMasterWrite(object):
|
||||
if cycle_resp != AxiResp.OKAY:
|
||||
resp = cycle_resp
|
||||
|
||||
self.log.info(f"Write complete addr: {addr:#010x} prot: {prot} resp: {resp!s} length: {length}")
|
||||
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
|
||||
addr, prot, resp, length)
|
||||
|
||||
self.write_resp_queue.append((addr, length, resp, token))
|
||||
self.write_resp_sync.set()
|
||||
@@ -359,7 +361,8 @@ class AxiLiteMasterRead(object):
|
||||
self.int_read_resp_command_queue.append((address, length, cycles, prot, token))
|
||||
self.int_read_resp_command_sync.set()
|
||||
|
||||
self.log.info(f"Read start addr: {address:#010x} prot: {prot} length: {length}")
|
||||
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
|
||||
address, prot, length)
|
||||
|
||||
for k in range(cycles):
|
||||
ar = self.ar_channel._transaction_obj()
|
||||
@@ -376,8 +379,6 @@ class AxiLiteMasterRead(object):
|
||||
|
||||
addr, length, cycles, prot, token = self.int_read_resp_command_queue.popleft()
|
||||
|
||||
word_addr = (addr // self.byte_width) * self.byte_width
|
||||
|
||||
start_offset = addr % self.byte_width
|
||||
end_offset = ((addr + length - 1) % self.byte_width) + 1
|
||||
|
||||
@@ -406,7 +407,8 @@ class AxiLiteMasterRead(object):
|
||||
for j in range(start, stop):
|
||||
data.extend(bytearray([(cycle_data >> j*8) & 0xff]))
|
||||
|
||||
self.log.info(f"Read complete addr: {addr:#010x} prot: {prot} resp: {resp!s} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
|
||||
addr, prot, resp, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
self.read_data_queue.append((addr, data, resp, token))
|
||||
self.read_data_sync.set()
|
||||
@@ -502,4 +504,3 @@ class AxiLiteMaster(object):
|
||||
|
||||
async def write_qword(self, address, data, prot=AxiProt.NONSECURE):
|
||||
return await self.write_if.write_qword(address, data, prot)
|
||||
|
||||
|
||||
@@ -22,17 +22,14 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import mmap
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import Event
|
||||
from cocotb.log import SimLog
|
||||
|
||||
import mmap
|
||||
import queue
|
||||
from collections import deque
|
||||
|
||||
from .version import __version__
|
||||
from .constants import *
|
||||
from .axil_channels import *
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
|
||||
from .utils import hexdump, hexdump_str
|
||||
|
||||
|
||||
@@ -103,7 +100,8 @@ class AxiLiteRamWrite(object):
|
||||
|
||||
data = data.to_bytes(self.byte_width, 'little')
|
||||
|
||||
self.log.info(f"Write data addr: {addr:#010x} prot: {prot} wstrb: {strb:#04x} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s",
|
||||
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
for i in range(self.byte_width):
|
||||
if strb & (1 << i):
|
||||
@@ -132,9 +130,6 @@ class AxiLiteRamRead(object):
|
||||
self.mem = mmap.mmap(-1, size)
|
||||
self.size = len(self.mem)
|
||||
|
||||
self.int_read_resp_command_queue = deque()
|
||||
self.int_read_resp_command_sync = Event()
|
||||
|
||||
self.in_flight_operations = 0
|
||||
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
@@ -179,7 +174,8 @@ class AxiLiteRamRead(object):
|
||||
|
||||
self.r_channel.send(r)
|
||||
|
||||
self.log.info(f"Read data addr: {addr:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}")
|
||||
self.log.info("Read data araddr: 0x%08x arprot: %s data: %s",
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
|
||||
class AxiLiteRam(object):
|
||||
@@ -209,4 +205,3 @@ class AxiLiteRam(object):
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem, address, length, prefix=prefix)
|
||||
|
||||
|
||||
@@ -22,15 +22,16 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
from collections import deque
|
||||
|
||||
import cocotb
|
||||
from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event
|
||||
from cocotb.bus import Bus
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from collections import deque
|
||||
|
||||
from .version import __version__
|
||||
|
||||
|
||||
class AxiStreamFrame(object):
|
||||
def __init__(self, tdata=b'', tkeep=None, tid=None, tdest=None, tuser=None):
|
||||
self.tdata = bytearray()
|
||||
@@ -194,12 +195,12 @@ class AxiStreamFrame(object):
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"{type(self).__name__}(tdata={repr(self.tdata)}, " +
|
||||
f"tkeep={repr(self.tkeep)}, " +
|
||||
f"tid={repr(self.tid)}, " +
|
||||
f"tdest={repr(self.tdest)}, " +
|
||||
f"tuser={repr(self.tuser)})"
|
||||
)
|
||||
f"{type(self).__name__}(tdata={repr(self.tdata)}, "
|
||||
f"tkeep={repr(self.tkeep)}, "
|
||||
f"tid={repr(self.tid)}, "
|
||||
f"tdest={repr(self.tdest)}, "
|
||||
f"tuser={repr(self.tuser)})"
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.tdata)
|
||||
@@ -334,7 +335,7 @@ class AxiStreamSource(object):
|
||||
frame = self.queue.popleft()
|
||||
self.queue_occupancy_bytes -= len(frame)
|
||||
self.queue_occupancy_frames -= 1
|
||||
self.log.info(f"TX frame: {frame}")
|
||||
self.log.info("TX frame: %s", frame)
|
||||
frame.normalize()
|
||||
self.active = True
|
||||
|
||||
@@ -529,7 +530,7 @@ class AxiStreamSink(object):
|
||||
if self.byte_size == 8:
|
||||
frame.tdata = bytearray(frame.tdata)
|
||||
|
||||
self.log.info(f"RX frame: {frame}")
|
||||
self.log.info("RX frame: %s", frame)
|
||||
|
||||
self.queue_occupancy_bytes += len(frame)
|
||||
self.queue_occupancy_frames += 1
|
||||
|
||||
@@ -24,6 +24,7 @@ THE SOFTWARE.
|
||||
|
||||
import enum
|
||||
|
||||
|
||||
# Burst types
|
||||
# AWBURST/ARBURST
|
||||
class AxiBurstType(enum.IntEnum):
|
||||
@@ -31,6 +32,7 @@ class AxiBurstType(enum.IntEnum):
|
||||
INCR = 0b01
|
||||
WRAP = 0b10
|
||||
|
||||
|
||||
# Burst sizes (per beat)
|
||||
# AWSIZE/ARSIZE
|
||||
class AxiBurstSize(enum.IntEnum):
|
||||
@@ -43,12 +45,14 @@ class AxiBurstSize(enum.IntEnum):
|
||||
SIZE_64 = 0b110
|
||||
SIZE_128 = 0b111
|
||||
|
||||
|
||||
# Lock types
|
||||
# AWLOCK/ARLOCK
|
||||
class AxiLockType(enum.IntEnum):
|
||||
NORMAL = 0b0
|
||||
EXCLUSIVE = 0b1
|
||||
|
||||
|
||||
# Cache control
|
||||
# AWCACHE/ARCACHE
|
||||
class AxiCacheBit(enum.IntFlag):
|
||||
@@ -57,6 +61,7 @@ class AxiCacheBit(enum.IntFlag):
|
||||
RA = 0b0100
|
||||
WA = 0b1000
|
||||
|
||||
|
||||
# ARCACHE
|
||||
ARCACHE_DEVICE_NON_BUFFERABLE = 0b0000
|
||||
ARCACHE_DEVICE_BUFFERABLE = 0b0001
|
||||
@@ -85,6 +90,7 @@ AWCACHE_WRITE_BACK_READ_ALLOC = 0b0111
|
||||
AWCACHE_WRITE_BACK_WRITE_ALLOC = 0b1111
|
||||
AWCACHE_WRITE_BACK_READ_AND_WRITE_ALLOC = 0b1111
|
||||
|
||||
|
||||
# Protection bits
|
||||
# AWPROT/ARPROT
|
||||
class AxiProt(enum.IntFlag):
|
||||
@@ -92,6 +98,7 @@ class AxiProt(enum.IntFlag):
|
||||
NONSECURE = 0b010
|
||||
INSTRUCTION = 0b100
|
||||
|
||||
|
||||
# Operation status responses
|
||||
# BRESP/RRESP
|
||||
class AxiResp(enum.IntEnum):
|
||||
@@ -99,4 +106,3 @@ class AxiResp(enum.IntEnum):
|
||||
EXOKAY = 0b01
|
||||
SLVERR = 0b10
|
||||
DECERR = 0b11
|
||||
|
||||
|
||||
@@ -411,4 +411,3 @@ def define_stream(name, signals, optional_signals=None, valid_signal=None, ready
|
||||
monitor = type(name+"Monitor", (StreamMonitor,), attrib)
|
||||
|
||||
return transaction, source, sink, monitor
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def hexdump_line(data, offset):
|
||||
h = ""
|
||||
c = ""
|
||||
@@ -30,18 +31,20 @@ def hexdump_line(data, offset):
|
||||
c += chr(ch) if 32 < ch < 127 else "."
|
||||
return f"{offset:08x}: {h:48} {c}"
|
||||
|
||||
|
||||
def hexdump(data, start=0, length=None, prefix="", offset=0):
|
||||
stop = min(start+length, len(data)) if length else len(data)
|
||||
for k in range(start, stop, 16):
|
||||
print(prefix+hexdump_line(data[k:min(k+16,stop)], k+offset))
|
||||
print(prefix+hexdump_line(data[k:min(k+16, stop)], k+offset))
|
||||
|
||||
|
||||
def hexdump_lines(data, start=0, length=None, prefix="", offset=0):
|
||||
lines = []
|
||||
stop = min(start+length, len(data)) if length else len(data)
|
||||
for k in range(start, stop, 16):
|
||||
lines.append(prefix+hexdump_line(data[k:min(k+16,stop)], k+offset))
|
||||
lines.append(prefix+hexdump_line(data[k:min(k+16, stop)], k+offset))
|
||||
return lines
|
||||
|
||||
|
||||
def hexdump_str(data, start=0, length=None, prefix="", offset=0):
|
||||
return "\n".join(hexdump_lines(data, start, length, prefix, offset))
|
||||
|
||||
|
||||
14
setup.py
14
setup.py
@@ -11,19 +11,19 @@ with open("README.md", "r") as f:
|
||||
long_description = f.read()
|
||||
|
||||
setup(
|
||||
name = "cocotbext-axi",
|
||||
name="cocotbext-axi",
|
||||
author="Alex Forencich",
|
||||
author_email="alex@alexforencich.com",
|
||||
description="AXI modules for Cocotb",
|
||||
long_description=long_description,
|
||||
long_description_content_type='text/markdown',
|
||||
url="https://github.com/alexforencich/cocotbext-axi",
|
||||
download_url = 'http://github.com/alexforencich/cocotbext-axi/tarball/master',
|
||||
version = version,
|
||||
packages = find_namespace_packages(include=['cocotbext.*']),
|
||||
install_requires = ['cocotb'],
|
||||
python_requires = '>=3.6',
|
||||
classifiers = [
|
||||
download_url='http://github.com/alexforencich/cocotbext-axi/tarball/master',
|
||||
version=version,
|
||||
packages=find_namespace_packages(include=['cocotbext.*']),
|
||||
install_requires=['cocotb'],
|
||||
python_requires='>=3.6',
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
|
||||
@@ -31,18 +31,18 @@ import cocotb_test.simulator
|
||||
import pytest
|
||||
|
||||
import cocotb
|
||||
from cocotb.log import SimLog
|
||||
from cocotb.clock import Clock
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
from cocotb.regression import TestFactory
|
||||
|
||||
from cocotbext.axi import AxiMaster, AxiRam
|
||||
|
||||
|
||||
class TB(object):
|
||||
def __init__(self, dut):
|
||||
self.dut = dut
|
||||
|
||||
self.log = SimLog(f"cocotb.tb")
|
||||
self.log = logging.getLogger("cocotb.tb")
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
|
||||
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
|
||||
@@ -80,6 +80,7 @@ class TB(object):
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
|
||||
|
||||
async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, size=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -95,17 +96,17 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in list(range(1,byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width,4096)):
|
||||
for length in list(range(1, byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.axi_ram.write_mem(addr-128, b'\xaa'*(length+256))
|
||||
|
||||
await tb.axi_master.write(addr, test_data, size=size)
|
||||
|
||||
tb.log.debug(tb.axi_ram.hexdump_str((addr&0xfffffff0)-16, (((addr&0xf)+length-1)&0xfffffff0)+48))
|
||||
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
|
||||
|
||||
assert tb.axi_ram.read_mem(addr, length) == test_data
|
||||
assert tb.axi_ram.read_mem(addr-1, 1) == b'\xaa'
|
||||
@@ -114,6 +115,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, size=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -129,11 +131,11 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in list(range(1,byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width,4096)):
|
||||
for length in list(range(1, byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.axi_ram.write_mem(addr, test_data)
|
||||
|
||||
@@ -144,6 +146,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -157,7 +160,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
for k in range(count):
|
||||
length = random.randint(1, min(512, aperture))
|
||||
addr = offset+random.randint(0, aperture-length)
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
await Timer(random.randint(1, 100), 'ns')
|
||||
|
||||
@@ -179,9 +182,11 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
|
||||
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
|
||||
@@ -200,9 +205,12 @@ if cocotb.SIM_NAME:
|
||||
factory.generate_tests()
|
||||
|
||||
|
||||
# cocotb-test
|
||||
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_width", [8, 16, 32])
|
||||
def test_axi(request, data_width):
|
||||
dut = "test_axi"
|
||||
@@ -225,7 +233,7 @@ def test_axi(request, data_width):
|
||||
parameters['ARUSER_WIDTH'] = 1
|
||||
parameters['RUSER_WIDTH'] = 1
|
||||
|
||||
extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()}
|
||||
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
|
||||
|
||||
sim_build = os.path.join(tests_dir,
|
||||
"sim_build_"+request.node.name.replace('[', '-').replace(']', ''))
|
||||
@@ -239,4 +247,3 @@ def test_axi(request, data_width):
|
||||
sim_build=sim_build,
|
||||
extra_env=extra_env,
|
||||
)
|
||||
|
||||
|
||||
@@ -31,18 +31,18 @@ import cocotb_test.simulator
|
||||
import pytest
|
||||
|
||||
import cocotb
|
||||
from cocotb.log import SimLog
|
||||
from cocotb.clock import Clock
|
||||
from cocotb.triggers import RisingEdge, Timer
|
||||
from cocotb.regression import TestFactory
|
||||
|
||||
from cocotbext.axi import AxiLiteMaster, AxiLiteRam
|
||||
|
||||
|
||||
class TB(object):
|
||||
def __init__(self, dut):
|
||||
self.dut = dut
|
||||
|
||||
self.log = SimLog(f"cocotb.tb")
|
||||
self.log = logging.getLogger("cocotb.tb")
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
|
||||
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
|
||||
@@ -77,6 +77,7 @@ class TB(object):
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
|
||||
|
||||
async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -88,17 +89,17 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in range(1,byte_width*2):
|
||||
for length in range(1, byte_width*2):
|
||||
for offset in range(byte_width):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.axil_ram.write_mem(addr-128, b'\xaa'*(length+256))
|
||||
|
||||
await tb.axil_master.write(addr, test_data)
|
||||
|
||||
tb.log.debug(tb.axil_ram.hexdump_str((addr&0xfffffff0)-16, (((addr&0xf)+length-1)&0xfffffff0)+48))
|
||||
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
|
||||
|
||||
assert tb.axil_ram.read_mem(addr, length) == test_data
|
||||
assert tb.axil_ram.read_mem(addr-1, 1) == b'\xaa'
|
||||
@@ -107,6 +108,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -118,11 +120,11 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
for length in range(1,byte_width*2):
|
||||
for length in range(1, byte_width*2):
|
||||
for offset in range(byte_width):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
tb.axil_ram.write_mem(addr, test_data)
|
||||
|
||||
@@ -133,6 +135,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -146,7 +149,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
for k in range(count):
|
||||
length = random.randint(1, min(32, aperture))
|
||||
addr = offset+random.randint(0, aperture-length)
|
||||
test_data = bytearray([x%256 for x in range(length)])
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
await Timer(random.randint(1, 100), 'ns')
|
||||
|
||||
@@ -168,9 +171,11 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
|
||||
for test in [run_test_write, run_test_read]:
|
||||
@@ -184,9 +189,12 @@ if cocotb.SIM_NAME:
|
||||
factory.generate_tests()
|
||||
|
||||
|
||||
# cocotb-test
|
||||
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_width", [8, 16, 32])
|
||||
def test_axil(request, data_width):
|
||||
dut = "test_axil"
|
||||
@@ -203,7 +211,7 @@ def test_axil(request, data_width):
|
||||
parameters['ADDR_WIDTH'] = 32
|
||||
parameters['STRB_WIDTH'] = parameters['DATA_WIDTH'] // 8
|
||||
|
||||
extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()}
|
||||
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
|
||||
|
||||
sim_build = os.path.join(tests_dir,
|
||||
"sim_build_"+request.node.name.replace('[', '-').replace(']', ''))
|
||||
@@ -217,4 +225,3 @@ def test_axil(request, data_width):
|
||||
sim_build=sim_build,
|
||||
extra_env=extra_env,
|
||||
)
|
||||
|
||||
|
||||
@@ -31,18 +31,18 @@ import cocotb_test.simulator
|
||||
import pytest
|
||||
|
||||
import cocotb
|
||||
from cocotb.log import SimLog
|
||||
from cocotb.clock import Clock
|
||||
from cocotb.triggers import RisingEdge
|
||||
from cocotb.regression import TestFactory
|
||||
|
||||
from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink
|
||||
|
||||
|
||||
class TB(object):
|
||||
def __init__(self, dut):
|
||||
self.dut = dut
|
||||
|
||||
self.log = SimLog(f"cocotb.tb")
|
||||
self.log = logging.getLogger("cocotb.tb")
|
||||
self.log.setLevel(logging.DEBUG)
|
||||
|
||||
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
|
||||
@@ -69,6 +69,7 @@ class TB(object):
|
||||
await RisingEdge(self.dut.clk)
|
||||
await RisingEdge(self.dut.clk)
|
||||
|
||||
|
||||
async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=None, backpressure_inserter=None):
|
||||
|
||||
tb = TB(dut)
|
||||
@@ -84,7 +85,7 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N
|
||||
|
||||
test_frames = []
|
||||
|
||||
for test_data in [payload_data(l) for l in payload_lengths()]:
|
||||
for test_data in [payload_data(x) for x in payload_lengths()]:
|
||||
test_frame = AxiStreamFrame(test_data)
|
||||
test_frame.tid = cur_id
|
||||
test_frame.tdest = cur_id
|
||||
@@ -108,17 +109,21 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N
|
||||
await RisingEdge(dut.clk)
|
||||
await RisingEdge(dut.clk)
|
||||
|
||||
|
||||
def cycle_pause():
|
||||
return itertools.cycle([1, 1, 1, 0])
|
||||
|
||||
|
||||
def size_list():
|
||||
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
|
||||
byte_width = data_width // 8
|
||||
return list(range(1,data_width*4+1))+[512]+[1]*64
|
||||
return list(range(1, byte_width*4+1)) + [512] + [1]*64
|
||||
|
||||
|
||||
def incrementing_payload(length):
|
||||
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
|
||||
|
||||
|
||||
if cocotb.SIM_NAME:
|
||||
|
||||
factory = TestFactory(run_test)
|
||||
@@ -129,9 +134,12 @@ if cocotb.SIM_NAME:
|
||||
factory.generate_tests()
|
||||
|
||||
|
||||
# cocotb-test
|
||||
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("data_width", [8, 16, 32])
|
||||
def test_axis(request, data_width):
|
||||
dut = "test_axis"
|
||||
@@ -150,7 +158,7 @@ def test_axis(request, data_width):
|
||||
parameters['DEST_WIDTH'] = 8
|
||||
parameters['USER_WIDTH'] = 1
|
||||
|
||||
extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()}
|
||||
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
|
||||
|
||||
sim_build = os.path.join(tests_dir,
|
||||
"sim_build_"+request.node.name.replace('[', '-').replace(']', ''))
|
||||
@@ -164,4 +172,3 @@ def test_axis(request, data_width):
|
||||
sim_build=sim_build,
|
||||
extra_env=extra_env,
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user