From 81f8906548c52f0f2992512ba2ac4291590abf7c Mon Sep 17 00:00:00 2001 From: Alex Forencich Date: Thu, 26 Nov 2020 19:51:24 -0800 Subject: [PATCH] Clean up and lint removal --- cocotbext/axi/__init__.py | 3 +- cocotbext/axi/axi_channels.py | 33 +++--- cocotbext/axi/axi_master.py | 179 ++++++++++++++++++++------------- cocotbext/axi/axi_ram.py | 30 +++--- cocotbext/axi/axil_channels.py | 19 ++-- cocotbext/axi/axil_master.py | 27 ++--- cocotbext/axi/axil_ram.py | 23 ++--- cocotbext/axi/axis.py | 21 ++-- cocotbext/axi/constants.py | 8 +- cocotbext/axi/stream.py | 1 - cocotbext/axi/utils.py | 9 +- setup.py | 14 +-- tests/axi/test_axi.py | 31 +++--- tests/axil/test_axil.py | 27 +++-- tests/axis/test_axis.py | 19 ++-- 15 files changed, 255 insertions(+), 189 deletions(-) diff --git a/cocotbext/axi/__init__.py b/cocotbext/axi/__init__.py index 9c247cf..dd591cb 100644 --- a/cocotbext/axi/__init__.py +++ b/cocotbext/axi/__init__.py @@ -24,7 +24,7 @@ THE SOFTWARE. from .version import __version__ -from .constants import * +from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink @@ -33,4 +33,3 @@ from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam - diff --git a/cocotbext/axi/axi_channels.py b/cocotbext/axi/axi_channels.py index d2c094e..e922403 100644 --- a/cocotbext/axi/axi_channels.py +++ b/cocotbext/axi/axi_channels.py @@ -26,36 +26,37 @@ from .stream import define_stream # Write address channel AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW", - signals = ["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"], - optional_signals = ["awlock", "awcache", "awqos", "awregion", "awuser"], - signal_widths = {"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1, "awcache": 4, "awprot": 3, "awqos": 4, "awregion": 4} + signals=["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"], + optional_signals=["awlock", "awcache", "awqos", "awregion", "awuser"], + signal_widths={"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1, + "awcache": 4, "awprot": 3, "awqos": 4, "awregion": 4} ) # Write data channel AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("AxiW", - signals = ["wdata", "wstrb", "wlast", "wvalid", "wready"], - optional_signals = ["wuser"], - signal_widths = {"wlast": 1} + signals=["wdata", "wstrb", "wlast", "wvalid", "wready"], + optional_signals=["wuser"], + signal_widths={"wlast": 1} ) # Write response channel AxiBTransaction, AxiBSource, AxiBSink, AxiBMonitor = define_stream("AxiB", - signals = ["bid", "bresp", "bvalid", "bready"], - optional_signals = ["buser"], - signal_widths = {"bresp": 2} + signals=["bid", "bresp", "bvalid", "bready"], + optional_signals=["buser"], + signal_widths={"bresp": 2} ) # Read address channel AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR", - signals = ["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"], - optional_signals = ["arlock", "arcache", "arqos", "arregion", "aruser"], - signal_widths = {"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1, "arcache": 4, "arprot": 3, "arqos": 4, "arregion": 4} + signals=["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"], + optional_signals=["arlock", "arcache", "arqos", "arregion", "aruser"], + signal_widths={"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1, + "arcache": 4, "arprot": 3, "arqos": 4, "arregion": 4} ) # Read data channel AxiRTransaction, AxiRSource, AxiRSink, AxiRMonitor = define_stream("AxiR", - signals = ["rid", "rdata", "rresp", "rlast", "rvalid", "rready"], - optional_signals = ["ruser"], - signal_widths = {"rresp": 2, "rlast": 1} + signals=["rid", "rdata", "rresp", "rlast", "rvalid", "rready"], + optional_signals=["ruser"], + signal_widths={"rresp": 2, "rlast": 1} ) - diff --git a/cocotbext/axi/axi_master.py b/cocotbext/axi/axi_master.py index 9a6f77c..c3b6d0f 100644 --- a/cocotbext/axi/axi_master.py +++ b/cocotbext/axi/axi_master.py @@ -22,15 +22,15 @@ THE SOFTWARE. """ -import cocotb -from cocotb.triggers import RisingEdge, Event -from cocotb.log import SimLog - from collections import deque +import cocotb +from cocotb.triggers import Event +from cocotb.log import SimLog + from .version import __version__ -from .constants import * -from .axi_channels import * +from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp +from .axi_channels import AxiAWSource, AxiWSource, AxiBSink, AxiARSource, AxiRSink class AxiMasterWrite(object): @@ -81,7 +81,8 @@ class AxiMasterWrite(object): cocotb.fork(self._process_write()) cocotb.fork(self._process_write_resp()) - def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): + def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): if token is not None: if token in self.active_tokens: raise Exception("Token is not unique") @@ -130,35 +131,43 @@ class AxiMasterWrite(object): return resp return None - async def write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): token = object() self.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, token) await self.wait_for_token(token) return self.get_write_resp(token)[1:3] - async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): words = data data = bytearray() for w in words: data.extend(w.to_bytes(ws, 'little')) await self.write(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write_words(address, data, 4, burst, size, lock, cache, prot, qos, region, user) - async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write_words(address, data, 8, burst, size, lock, cache, prot, qos, region, user) - async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write(address, [data], burst, size, lock, cache, prot, qos, region, user) - async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write_words(address, [data], ws, burst, size, lock, cache, prot, qos, region, user) - async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write_dwords(address, [data], burst, size, lock, cache, prot, qos, region, user) - async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): await self.write_qwords(address, [data], burst, size, lock, cache, prot, qos, region, user) async def _process_write(self): @@ -167,7 +176,7 @@ class AxiMasterWrite(object): self.write_command_sync.clear() await self.write_command_sync.wait() - address, data, burst, size, lock, cache, prot, qos, region, user, token = self.write_command_queue.popleft() + addr, data, burst, size, lock, cache, prot, qos, region, user, token = self.write_command_queue.popleft() num_bytes = self.byte_width @@ -177,13 +186,13 @@ class AxiMasterWrite(object): num_bytes = 2**size assert 0 < num_bytes <= self.byte_width - aligned_addr = (address // num_bytes) * num_bytes - word_addr = (address // self.byte_width) * self.byte_width + aligned_addr = (addr // num_bytes) * num_bytes + word_addr = (addr // self.byte_width) * self.byte_width - start_offset = address % self.byte_width - end_offset = ((address + len(data) - 1) % self.byte_width) + 1 + start_offset = addr % self.byte_width + end_offset = ((addr + len(data) - 1) % self.byte_width) + 1 - cycles = (len(data) + (address % num_bytes) + num_bytes-1) // num_bytes + cycles = (len(data) + (addr % num_bytes) + num_bytes-1) // num_bytes cur_addr = aligned_addr offset = 0 @@ -194,7 +203,8 @@ class AxiMasterWrite(object): burst_list = [] burst_length = 0 - self.log.info(f"Write start addr: {address:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Write start addr: 0x%08x prot: %s data: %s", + addr, prot, ' '.join((f'{c:02x}' for c in data))) for k in range(cycles): start = cycle_offset @@ -222,8 +232,10 @@ class AxiMasterWrite(object): transfer_count += 1 n = 0 - burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len - burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align + # split on burst length + burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) + # split on 4k address boundary + burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes burst_list.append((awid, burst_length)) @@ -242,7 +254,8 @@ class AxiMasterWrite(object): await self.aw_channel.drive(aw) - self.log.info(f"Write burst start awid {awid:#x} awaddr: {cur_addr:#010x} awlen: {burst_length-1} awsize: {size}") + self.log.info("Write burst start awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s", + awid, cur_addr, burst_length-1, size, prot) n += 1 @@ -256,7 +269,7 @@ class AxiMasterWrite(object): cur_addr += num_bytes cycle_offset = (cycle_offset + num_bytes) % self.byte_width - self.int_write_resp_command_queue.append((address, len(data), size, cycles, prot, burst_list, token)) + self.int_write_resp_command_queue.append((addr, len(data), size, cycles, prot, burst_list, token)) self.int_write_resp_command_sync.set() async def _process_write_resp(self): @@ -298,9 +311,10 @@ class AxiMasterWrite(object): self.id_queue.append(bid) self.id_sync.set() - self.log.info(f"Write burst complete bid {burst_id:#x} bresp: {burst_resp!s}") + self.log.info("Write burst complete bid: 0x%x bresp: %s", burst_id, burst_resp) - self.log.info(f"Write complete addr: {addr:#010x} prot: {prot} resp: {resp!s} length: {length}") + self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d", + addr, prot, resp, length) self.write_resp_queue.append((addr, length, resp, user, token)) self.write_resp_sync.set() @@ -349,7 +363,8 @@ class AxiMasterRead(object): cocotb.fork(self._process_read()) cocotb.fork(self._process_read_resp()) - def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): + def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): if token is not None: if token in self.active_tokens: raise Exception("Token is not unique") @@ -398,35 +413,43 @@ class AxiMasterRead(object): return resp return None - async def read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read(self, address, length, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): token = object() self.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, token) await self.wait_for_token(token) return self.get_read_data(token)[1:3] - async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): data = await self.read(address, count*ws, burst, size, lock, cache, prot, qos, region, user) words = [] for k in range(count): words.append(int.from_bytes(data[0][ws*k:ws*(k+1)], 'little')) return words - async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_words(address, count, 4, burst, size, lock, cache, prot, qos, region, user) - async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_words(address, count, 8, burst, size, lock, cache, prot, qos, region, user) - async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return (await self.read(address, 1, burst, size, lock, cache, prot, qos, region, user))[0] - async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return (await self.read_words(address, 1, ws, burst, size, lock, cache, prot, qos, region, user))[0] - async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return (await self.read_dwords(address, 1, burst, size, lock, cache, prot, qos, region, user))[0] - async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return (await self.read_qwords(address, 1, burst, size, lock, cache, prot, qos, region, user))[0] async def _process_read(self): @@ -435,7 +458,7 @@ class AxiMasterRead(object): self.read_command_sync.clear() await self.read_command_sync.wait() - address, length, burst, size, lock, cache, prot, qos, region, user, token = self.read_command_queue.popleft() + addr, length, burst, size, lock, cache, prot, qos, region, user, token = self.read_command_queue.popleft() num_bytes = self.byte_width @@ -445,10 +468,9 @@ class AxiMasterRead(object): num_bytes = 2**size assert 0 < num_bytes <= self.byte_width - aligned_addr = (address // num_bytes) * num_bytes - word_addr = (address // self.byte_width) * self.byte_width + aligned_addr = (addr // num_bytes) * num_bytes - cycles = (length + num_bytes-1 + (address % num_bytes)) // num_bytes + cycles = (length + num_bytes-1 + (addr % num_bytes)) // num_bytes burst_list = [] @@ -457,6 +479,8 @@ class AxiMasterRead(object): burst_length = 0 + self.log.info("Read start addr: 0x%08x prot: %s", addr, prot) + for k in range(cycles): n += 1 @@ -469,8 +493,10 @@ class AxiMasterRead(object): n = 0 - burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) # max len - burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align + # split on burst length + burst_length = min(cycles-k, min(max(self.max_burst_len, 1), 256)) + # split on 4k address boundary + burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes burst_list.append((arid, burst_length)) @@ -489,11 +515,12 @@ class AxiMasterRead(object): await self.ar_channel.drive(ar) - self.log.info(f"Read burst start arid {arid:#x} araddr: {cur_addr:#010x} arlen: {burst_length-1} arsize: {size}") + self.log.info("Read burst start arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s", + arid, cur_addr, burst_length-1, size, prot) cur_addr += num_bytes - self.int_read_resp_command_queue.append((address, length, size, cycles, prot, burst_list, token)) + self.int_read_resp_command_queue.append((addr, length, size, cycles, prot, burst_list, token)) self.int_read_resp_command_sync.set() async def _process_read_resp(self): @@ -510,7 +537,6 @@ class AxiMasterRead(object): word_addr = (addr // self.byte_width) * self.byte_width start_offset = addr % self.byte_width - end_offset = ((addr + length - 1) % self.byte_width) + 1 cycle_offset = aligned_addr - word_addr data = bytearray() @@ -566,11 +592,12 @@ class AxiMasterRead(object): self.id_queue.append(rid) self.id_sync.set() - self.log.info(f"Read burst complete rid {cycle_id:#x} rresp: {resp!s}") + self.log.info("Read burst complete rid: 0x%x rresp: %s", cycle_id, resp) data = data[:length] - self.log.info(f"Read complete addr: {addr:#010x} prot: {prot} resp: {resp!s} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s", + addr, prot, resp, ' '.join((f'{c:02x}' for c in data))) self.read_data_queue.append((addr, data, resp, user, token)) self.read_data_sync.set() @@ -587,10 +614,12 @@ class AxiMaster(object): self.write_if = AxiMasterWrite(entity, name, clock, reset) self.read_if = AxiMasterRead(entity, name, clock, reset) - def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): + def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, token) - def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): + def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None): self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, token) def idle(self): @@ -619,52 +648,66 @@ class AxiMaster(object): def get_write_resp(self, token=None): return self.write_if.get_write_resp(token) - async def read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read(self, address, length, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read(address, length, burst, size, lock, cache, prot, qos, region, user) - async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_words(self, address, count, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_words(address, count, ws, burst, size, lock, cache, prot, qos, region, user) - async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_dwords(self, address, count, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_dwords(address, count, burst, size, lock, cache, prot, qos, region, user) - async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_qwords(self, address, count, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_qwords(address, count, burst, size, lock, cache, prot, qos, region, user) - async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_byte(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_byte(address, burst, size, lock, cache, prot, qos, region, user) - async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_word(self, address, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_word(address, ws, burst, size, lock, cache, prot, qos, region, user) - async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_dword(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_dword(address, burst, size, lock, cache, prot, qos, region, user) - async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def read_qword(self, address, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.read_if.read_qword(address, burst, size, lock, cache, prot, qos, region, user) - async def write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_words(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_words(address, data, ws, burst, size, lock, cache, prot, qos, region, user) - async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_dwords(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_dwords(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_qwords(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_qwords(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_byte(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_byte(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_word(self, address, data, ws=2, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_word(address, data, ws, burst, size, lock, cache, prot, qos, region, user) - async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_dword(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_dword(address, data, burst, size, lock, cache, prot, qos, region, user) - async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): + async def write_qword(self, address, data, burst=AxiBurstType.INCR, size=None, + lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): return await self.write_if.write_qword(address, data, burst, size, lock, cache, prot, qos, region, user) - - diff --git a/cocotbext/axi/axi_ram.py b/cocotbext/axi/axi_ram.py index 04dd08f..d9dfe02 100644 --- a/cocotbext/axi/axi_ram.py +++ b/cocotbext/axi/axi_ram.py @@ -22,16 +22,14 @@ THE SOFTWARE. """ +import mmap + import cocotb -from cocotb.triggers import Event from cocotb.log import SimLog -import mmap -from collections import deque - from .version import __version__ -from .constants import * -from .axi_channels import * +from .constants import AxiBurstType, AxiProt, AxiResp +from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource from .utils import hexdump, hexdump_str @@ -96,7 +94,8 @@ class AxiRamWrite(object): burst = int(aw.awburst) prot = AxiProt(int(aw.awprot)) - self.log.info(f"Write burst awid: {awid:#x} awaddr: {addr:#010x} awlen: {length} awsize: {size} awprot: {prot}") + self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s", + awid, addr, length, size, prot) num_bytes = 2**size assert 0 < num_bytes <= self.byte_width @@ -112,7 +111,7 @@ class AxiRamWrite(object): if burst == AxiBurstType.INCR: # check 4k boundary crossing - assert 0x1000-(aligned_addr&0xfff) >= transfer_size + assert 0x1000-(aligned_addr & 0xfff) >= transfer_size cur_addr = aligned_addr @@ -132,7 +131,8 @@ class AxiRamWrite(object): data = data.to_bytes(self.byte_width, 'little') - self.log.debug(f"Write word awid: {awid:#x} addr: {cur_addr:#010x} wstrb: {strb:#04x} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s", + awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data))) for i in range(self.byte_width): if strb & (1 << i): @@ -171,9 +171,6 @@ class AxiRamRead(object): self.ar_channel = AxiARSink(entity, name, clock, reset) self.r_channel = AxiRSource(entity, name, clock, reset) - self.int_read_resp_command_queue = deque() - self.int_read_resp_command_sync = Event() - self.in_flight_operations = 0 self.width = len(self.r_channel.bus.rdata) @@ -212,7 +209,8 @@ class AxiRamRead(object): burst = int(ar.arburst) prot = AxiProt(ar.arprot) - self.log.info(f"Read burst arid: {arid:#x} araddr: {addr:#010x} arlen: {length} arsize: {size} arprot: {prot}") + self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s", + arid, addr, length, size, prot) num_bytes = 2**size assert 0 < num_bytes <= self.byte_width @@ -228,7 +226,7 @@ class AxiRamRead(object): if burst == AxiBurstType.INCR: # check 4k boundary crossing - assert 0x1000-(aligned_addr&0xfff) >= transfer_size + assert 0x1000-(aligned_addr & 0xfff) >= transfer_size cur_addr = aligned_addr @@ -247,7 +245,8 @@ class AxiRamRead(object): self.r_channel.send(r) - self.log.debug(f"Read word arid: {arid:#x} addr: {cur_addr:#010x} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s", + arid, cur_addr, ' '.join((f'{c:02x}' for c in data))) if burst != AxiBurstType.FIXED: cur_addr += num_bytes @@ -284,4 +283,3 @@ class AxiRam(object): def hexdump_str(self, address, length, prefix=""): return hexdump_str(self.mem, address, length, prefix=prefix) - diff --git a/cocotbext/axi/axil_channels.py b/cocotbext/axi/axil_channels.py index 704b7cb..7fc3c4b 100644 --- a/cocotbext/axi/axil_channels.py +++ b/cocotbext/axi/axil_channels.py @@ -26,30 +26,29 @@ from .stream import define_stream # Write address channel AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMonitor = define_stream("AxiLiteAW", - signals = ["awaddr", "awprot", "awvalid", "awready"], - signal_widths = {"awprot": 3} + signals=["awaddr", "awprot", "awvalid", "awready"], + signal_widths={"awprot": 3} ) # Write data channel AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor = define_stream("AxiLiteW", - signals = ["wdata", "wstrb", "wvalid", "wready"] + signals=["wdata", "wstrb", "wvalid", "wready"] ) # Write response channel AxiLiteBTransaction, AxiLiteBSource, AxiLiteBSink, AxiLiteBMonitor = define_stream("AxiLiteB", - signals = ["bresp", "bvalid", "bready"], - signal_widths = {"bresp": 2} + signals=["bresp", "bvalid", "bready"], + signal_widths={"bresp": 2} ) # Read address channel AxiLiteARTransaction, AxiLiteARSource, AxiLiteARSink, AxiLiteARMonitor = define_stream("AxiLiteAR", - signals = ["araddr", "arprot", "arvalid", "arready"], - signal_widths = {"arprot": 3} + signals=["araddr", "arprot", "arvalid", "arready"], + signal_widths={"arprot": 3} ) # Read data channel AxiLiteRTransaction, AxiLiteRSource, AxiLiteRSink, AxiLiteRMonitor = define_stream("AxiLiteR", - signals = ["rdata", "rresp", "rvalid", "rready"], - signal_widths = {"rresp": 2} + signals=["rdata", "rresp", "rvalid", "rready"], + signal_widths={"rresp": 2} ) - diff --git a/cocotbext/axi/axil_master.py b/cocotbext/axi/axil_master.py index 13a1f71..10d138f 100644 --- a/cocotbext/axi/axil_master.py +++ b/cocotbext/axi/axil_master.py @@ -22,15 +22,15 @@ THE SOFTWARE. """ -import cocotb -from cocotb.triggers import RisingEdge, Event -from cocotb.log import SimLog - from collections import deque +import cocotb +from cocotb.triggers import Event +from cocotb.log import SimLog + from .version import __version__ -from .constants import * -from .axil_channels import * +from .constants import AxiProt, AxiResp +from .axil_channels import AxiLiteAWSource, AxiLiteWSource, AxiLiteBSink, AxiLiteARSource, AxiLiteRSink class AxiLiteMasterWrite(object): @@ -175,7 +175,8 @@ class AxiLiteMasterWrite(object): offset = 0 - self.log.info(f"Write start addr: {address:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Write start addr: 0x%08x prot: %s data: %s", + address, prot, ' '.join((f'{c:02x}' for c in data))) for k in range(cycles): start = 0 @@ -224,7 +225,8 @@ class AxiLiteMasterWrite(object): if cycle_resp != AxiResp.OKAY: resp = cycle_resp - self.log.info(f"Write complete addr: {addr:#010x} prot: {prot} resp: {resp!s} length: {length}") + self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d", + addr, prot, resp, length) self.write_resp_queue.append((addr, length, resp, token)) self.write_resp_sync.set() @@ -359,7 +361,8 @@ class AxiLiteMasterRead(object): self.int_read_resp_command_queue.append((address, length, cycles, prot, token)) self.int_read_resp_command_sync.set() - self.log.info(f"Read start addr: {address:#010x} prot: {prot} length: {length}") + self.log.info("Read start addr: 0x%08x prot: %s length: %d", + address, prot, length) for k in range(cycles): ar = self.ar_channel._transaction_obj() @@ -376,8 +379,6 @@ class AxiLiteMasterRead(object): addr, length, cycles, prot, token = self.int_read_resp_command_queue.popleft() - word_addr = (addr // self.byte_width) * self.byte_width - start_offset = addr % self.byte_width end_offset = ((addr + length - 1) % self.byte_width) + 1 @@ -406,7 +407,8 @@ class AxiLiteMasterRead(object): for j in range(start, stop): data.extend(bytearray([(cycle_data >> j*8) & 0xff])) - self.log.info(f"Read complete addr: {addr:#010x} prot: {prot} resp: {resp!s} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s", + addr, prot, resp, ' '.join((f'{c:02x}' for c in data))) self.read_data_queue.append((addr, data, resp, token)) self.read_data_sync.set() @@ -502,4 +504,3 @@ class AxiLiteMaster(object): async def write_qword(self, address, data, prot=AxiProt.NONSECURE): return await self.write_if.write_qword(address, data, prot) - diff --git a/cocotbext/axi/axil_ram.py b/cocotbext/axi/axil_ram.py index e74077d..0455833 100644 --- a/cocotbext/axi/axil_ram.py +++ b/cocotbext/axi/axil_ram.py @@ -22,17 +22,14 @@ THE SOFTWARE. """ +import mmap + import cocotb -from cocotb.triggers import Event from cocotb.log import SimLog -import mmap -import queue -from collections import deque - from .version import __version__ -from .constants import * -from .axil_channels import * +from .constants import AxiProt, AxiResp +from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource from .utils import hexdump, hexdump_str @@ -103,7 +100,8 @@ class AxiLiteRamWrite(object): data = data.to_bytes(self.byte_width, 'little') - self.log.info(f"Write data addr: {addr:#010x} prot: {prot} wstrb: {strb:#04x} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s", + addr, prot, strb, ' '.join((f'{c:02x}' for c in data))) for i in range(self.byte_width): if strb & (1 << i): @@ -120,7 +118,7 @@ class AxiLiteRamWrite(object): class AxiLiteRamRead(object): def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) - + self.reset = reset self.ar_channel = AxiLiteARSink(entity, name, clock, reset) @@ -132,9 +130,6 @@ class AxiLiteRamRead(object): self.mem = mmap.mmap(-1, size) self.size = len(self.mem) - self.int_read_resp_command_queue = deque() - self.int_read_resp_command_sync = Event() - self.in_flight_operations = 0 self.width = len(self.r_channel.bus.rdata) @@ -179,7 +174,8 @@ class AxiLiteRamRead(object): self.r_channel.send(r) - self.log.info(f"Read data addr: {addr:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}") + self.log.info("Read data araddr: 0x%08x arprot: %s data: %s", + addr, prot, ' '.join((f'{c:02x}' for c in data))) class AxiLiteRam(object): @@ -209,4 +205,3 @@ class AxiLiteRam(object): def hexdump_str(self, address, length, prefix=""): return hexdump_str(self.mem, address, length, prefix=prefix) - diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index 5115c5e..c1b4fcf 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -22,15 +22,16 @@ THE SOFTWARE. """ +from collections import deque + import cocotb from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event from cocotb.bus import Bus from cocotb.log import SimLog -from collections import deque - from .version import __version__ + class AxiStreamFrame(object): def __init__(self, tdata=b'', tkeep=None, tid=None, tdest=None, tuser=None): self.tdata = bytearray() @@ -194,12 +195,12 @@ class AxiStreamFrame(object): def __repr__(self): return ( - f"{type(self).__name__}(tdata={repr(self.tdata)}, " + - f"tkeep={repr(self.tkeep)}, " + - f"tid={repr(self.tid)}, " + - f"tdest={repr(self.tdest)}, " + - f"tuser={repr(self.tuser)})" - ) + f"{type(self).__name__}(tdata={repr(self.tdata)}, " + f"tkeep={repr(self.tkeep)}, " + f"tid={repr(self.tid)}, " + f"tdest={repr(self.tdest)}, " + f"tuser={repr(self.tuser)})" + ) def __len__(self): return len(self.tdata) @@ -334,7 +335,7 @@ class AxiStreamSource(object): frame = self.queue.popleft() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 - self.log.info(f"TX frame: {frame}") + self.log.info("TX frame: %s", frame) frame.normalize() self.active = True @@ -529,7 +530,7 @@ class AxiStreamSink(object): if self.byte_size == 8: frame.tdata = bytearray(frame.tdata) - self.log.info(f"RX frame: {frame}") + self.log.info("RX frame: %s", frame) self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 diff --git a/cocotbext/axi/constants.py b/cocotbext/axi/constants.py index 341a6e5..0781e56 100644 --- a/cocotbext/axi/constants.py +++ b/cocotbext/axi/constants.py @@ -24,6 +24,7 @@ THE SOFTWARE. import enum + # Burst types # AWBURST/ARBURST class AxiBurstType(enum.IntEnum): @@ -31,6 +32,7 @@ class AxiBurstType(enum.IntEnum): INCR = 0b01 WRAP = 0b10 + # Burst sizes (per beat) # AWSIZE/ARSIZE class AxiBurstSize(enum.IntEnum): @@ -43,12 +45,14 @@ class AxiBurstSize(enum.IntEnum): SIZE_64 = 0b110 SIZE_128 = 0b111 + # Lock types # AWLOCK/ARLOCK class AxiLockType(enum.IntEnum): NORMAL = 0b0 EXCLUSIVE = 0b1 + # Cache control # AWCACHE/ARCACHE class AxiCacheBit(enum.IntFlag): @@ -57,6 +61,7 @@ class AxiCacheBit(enum.IntFlag): RA = 0b0100 WA = 0b1000 + # ARCACHE ARCACHE_DEVICE_NON_BUFFERABLE = 0b0000 ARCACHE_DEVICE_BUFFERABLE = 0b0001 @@ -85,6 +90,7 @@ AWCACHE_WRITE_BACK_READ_ALLOC = 0b0111 AWCACHE_WRITE_BACK_WRITE_ALLOC = 0b1111 AWCACHE_WRITE_BACK_READ_AND_WRITE_ALLOC = 0b1111 + # Protection bits # AWPROT/ARPROT class AxiProt(enum.IntFlag): @@ -92,6 +98,7 @@ class AxiProt(enum.IntFlag): NONSECURE = 0b010 INSTRUCTION = 0b100 + # Operation status responses # BRESP/RRESP class AxiResp(enum.IntEnum): @@ -99,4 +106,3 @@ class AxiResp(enum.IntEnum): EXOKAY = 0b01 SLVERR = 0b10 DECERR = 0b11 - diff --git a/cocotbext/axi/stream.py b/cocotbext/axi/stream.py index ac68375..9eabca5 100644 --- a/cocotbext/axi/stream.py +++ b/cocotbext/axi/stream.py @@ -411,4 +411,3 @@ def define_stream(name, signals, optional_signals=None, valid_signal=None, ready monitor = type(name+"Monitor", (StreamMonitor,), attrib) return transaction, source, sink, monitor - diff --git a/cocotbext/axi/utils.py b/cocotbext/axi/utils.py index 4a8c39c..1983e17 100644 --- a/cocotbext/axi/utils.py +++ b/cocotbext/axi/utils.py @@ -22,6 +22,7 @@ THE SOFTWARE. """ + def hexdump_line(data, offset): h = "" c = "" @@ -30,18 +31,20 @@ def hexdump_line(data, offset): c += chr(ch) if 32 < ch < 127 else "." return f"{offset:08x}: {h:48} {c}" + def hexdump(data, start=0, length=None, prefix="", offset=0): stop = min(start+length, len(data)) if length else len(data) for k in range(start, stop, 16): - print(prefix+hexdump_line(data[k:min(k+16,stop)], k+offset)) + print(prefix+hexdump_line(data[k:min(k+16, stop)], k+offset)) + def hexdump_lines(data, start=0, length=None, prefix="", offset=0): lines = [] stop = min(start+length, len(data)) if length else len(data) for k in range(start, stop, 16): - lines.append(prefix+hexdump_line(data[k:min(k+16,stop)], k+offset)) + lines.append(prefix+hexdump_line(data[k:min(k+16, stop)], k+offset)) return lines + def hexdump_str(data, start=0, length=None, prefix="", offset=0): return "\n".join(hexdump_lines(data, start, length, prefix, offset)) - diff --git a/setup.py b/setup.py index 8c2d031..f92c9cb 100644 --- a/setup.py +++ b/setup.py @@ -11,19 +11,19 @@ with open("README.md", "r") as f: long_description = f.read() setup( - name = "cocotbext-axi", + name="cocotbext-axi", author="Alex Forencich", author_email="alex@alexforencich.com", description="AXI modules for Cocotb", long_description=long_description, long_description_content_type='text/markdown', url="https://github.com/alexforencich/cocotbext-axi", - download_url = 'http://github.com/alexforencich/cocotbext-axi/tarball/master', - version = version, - packages = find_namespace_packages(include=['cocotbext.*']), - install_requires = ['cocotb'], - python_requires = '>=3.6', - classifiers = [ + download_url='http://github.com/alexforencich/cocotbext-axi/tarball/master', + version=version, + packages=find_namespace_packages(include=['cocotbext.*']), + install_requires=['cocotb'], + python_requires='>=3.6', + classifiers=[ "Programming Language :: Python :: 3", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", diff --git a/tests/axi/test_axi.py b/tests/axi/test_axi.py index 11fe3d8..5aad517 100644 --- a/tests/axi/test_axi.py +++ b/tests/axi/test_axi.py @@ -31,18 +31,18 @@ import cocotb_test.simulator import pytest import cocotb -from cocotb.log import SimLog from cocotb.clock import Clock from cocotb.triggers import RisingEdge, Timer from cocotb.regression import TestFactory from cocotbext.axi import AxiMaster, AxiRam + class TB(object): def __init__(self, dut): self.dut = dut - self.log = SimLog(f"cocotb.tb") + self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.DEBUG) cocotb.fork(Clock(dut.clk, 2, units="ns").start()) @@ -80,6 +80,7 @@ class TB(object): await RisingEdge(self.dut.clk) await RisingEdge(self.dut.clk) + async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, size=None): tb = TB(dut) @@ -95,17 +96,17 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si tb.set_idle_generator(idle_inserter) tb.set_backpressure_generator(backpressure_inserter) - for length in list(range(1,byte_width*2))+[1024]: - for offset in list(range(byte_width))+list(range(4096-byte_width,4096)): + for length in list(range(1, byte_width*2))+[1024]: + for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): tb.log.info(f"length {length}, offset {offset}") addr = offset+0x1000 - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) tb.axi_ram.write_mem(addr-128, b'\xaa'*(length+256)) await tb.axi_master.write(addr, test_data, size=size) - tb.log.debug(tb.axi_ram.hexdump_str((addr&0xfffffff0)-16, (((addr&0xf)+length-1)&0xfffffff0)+48)) + tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) assert tb.axi_ram.read_mem(addr, length) == test_data assert tb.axi_ram.read_mem(addr-1, 1) == b'\xaa' @@ -114,6 +115,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si await RisingEdge(dut.clk) await RisingEdge(dut.clk) + async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, size=None): tb = TB(dut) @@ -129,11 +131,11 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz tb.set_idle_generator(idle_inserter) tb.set_backpressure_generator(backpressure_inserter) - for length in list(range(1,byte_width*2))+[1024]: - for offset in list(range(byte_width))+list(range(4096-byte_width,4096)): + for length in list(range(1, byte_width*2))+[1024]: + for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): tb.log.info(f"length {length}, offset {offset}") addr = offset+0x1000 - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) tb.axi_ram.write_mem(addr, test_data) @@ -144,6 +146,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz await RisingEdge(dut.clk) await RisingEdge(dut.clk) + async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): tb = TB(dut) @@ -157,7 +160,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): for k in range(count): length = random.randint(1, min(512, aperture)) addr = offset+random.randint(0, aperture-length) - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) await Timer(random.randint(1, 100), 'ns') @@ -179,9 +182,11 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): await RisingEdge(dut.clk) await RisingEdge(dut.clk) + def cycle_pause(): return itertools.cycle([1, 1, 1, 0]) + if cocotb.SIM_NAME: data_width = int(os.getenv("PARAM_DATA_WIDTH")) @@ -200,9 +205,12 @@ if cocotb.SIM_NAME: factory.generate_tests() +# cocotb-test + tests_dir = os.path.dirname(__file__) rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl')) + @pytest.mark.parametrize("data_width", [8, 16, 32]) def test_axi(request, data_width): dut = "test_axi" @@ -225,7 +233,7 @@ def test_axi(request, data_width): parameters['ARUSER_WIDTH'] = 1 parameters['RUSER_WIDTH'] = 1 - extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()} + extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} sim_build = os.path.join(tests_dir, "sim_build_"+request.node.name.replace('[', '-').replace(']', '')) @@ -239,4 +247,3 @@ def test_axi(request, data_width): sim_build=sim_build, extra_env=extra_env, ) - diff --git a/tests/axil/test_axil.py b/tests/axil/test_axil.py index 55d958a..eaf8385 100644 --- a/tests/axil/test_axil.py +++ b/tests/axil/test_axil.py @@ -31,18 +31,18 @@ import cocotb_test.simulator import pytest import cocotb -from cocotb.log import SimLog from cocotb.clock import Clock from cocotb.triggers import RisingEdge, Timer from cocotb.regression import TestFactory from cocotbext.axi import AxiLiteMaster, AxiLiteRam + class TB(object): def __init__(self, dut): self.dut = dut - self.log = SimLog(f"cocotb.tb") + self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.DEBUG) cocotb.fork(Clock(dut.clk, 2, units="ns").start()) @@ -77,6 +77,7 @@ class TB(object): await RisingEdge(self.dut.clk) await RisingEdge(self.dut.clk) + async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): tb = TB(dut) @@ -88,17 +89,17 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins tb.set_idle_generator(idle_inserter) tb.set_backpressure_generator(backpressure_inserter) - for length in range(1,byte_width*2): + for length in range(1, byte_width*2): for offset in range(byte_width): tb.log.info(f"length {length}, offset {offset}") addr = offset+0x1000 - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) tb.axil_ram.write_mem(addr-128, b'\xaa'*(length+256)) await tb.axil_master.write(addr, test_data) - tb.log.debug(tb.axil_ram.hexdump_str((addr&0xfffffff0)-16, (((addr&0xf)+length-1)&0xfffffff0)+48)) + tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) assert tb.axil_ram.read_mem(addr, length) == test_data assert tb.axil_ram.read_mem(addr-1, 1) == b'\xaa' @@ -107,6 +108,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins await RisingEdge(dut.clk) await RisingEdge(dut.clk) + async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inserter=None): tb = TB(dut) @@ -118,11 +120,11 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse tb.set_idle_generator(idle_inserter) tb.set_backpressure_generator(backpressure_inserter) - for length in range(1,byte_width*2): + for length in range(1, byte_width*2): for offset in range(byte_width): tb.log.info(f"length {length}, offset {offset}") addr = offset+0x1000 - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) tb.axil_ram.write_mem(addr, test_data) @@ -133,6 +135,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse await RisingEdge(dut.clk) await RisingEdge(dut.clk) + async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): tb = TB(dut) @@ -146,7 +149,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): for k in range(count): length = random.randint(1, min(32, aperture)) addr = offset+random.randint(0, aperture-length) - test_data = bytearray([x%256 for x in range(length)]) + test_data = bytearray([x % 256 for x in range(length)]) await Timer(random.randint(1, 100), 'ns') @@ -168,9 +171,11 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None): await RisingEdge(dut.clk) await RisingEdge(dut.clk) + def cycle_pause(): return itertools.cycle([1, 1, 1, 0]) + if cocotb.SIM_NAME: for test in [run_test_write, run_test_read]: @@ -184,9 +189,12 @@ if cocotb.SIM_NAME: factory.generate_tests() +# cocotb-test + tests_dir = os.path.dirname(__file__) rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl')) + @pytest.mark.parametrize("data_width", [8, 16, 32]) def test_axil(request, data_width): dut = "test_axil" @@ -203,7 +211,7 @@ def test_axil(request, data_width): parameters['ADDR_WIDTH'] = 32 parameters['STRB_WIDTH'] = parameters['DATA_WIDTH'] // 8 - extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()} + extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} sim_build = os.path.join(tests_dir, "sim_build_"+request.node.name.replace('[', '-').replace(']', '')) @@ -217,4 +225,3 @@ def test_axil(request, data_width): sim_build=sim_build, extra_env=extra_env, ) - diff --git a/tests/axis/test_axis.py b/tests/axis/test_axis.py index 67f1d4d..0b5747b 100644 --- a/tests/axis/test_axis.py +++ b/tests/axis/test_axis.py @@ -31,18 +31,18 @@ import cocotb_test.simulator import pytest import cocotb -from cocotb.log import SimLog from cocotb.clock import Clock from cocotb.triggers import RisingEdge from cocotb.regression import TestFactory from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink + class TB(object): def __init__(self, dut): self.dut = dut - self.log = SimLog(f"cocotb.tb") + self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.DEBUG) cocotb.fork(Clock(dut.clk, 2, units="ns").start()) @@ -69,6 +69,7 @@ class TB(object): await RisingEdge(self.dut.clk) await RisingEdge(self.dut.clk) + async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=None, backpressure_inserter=None): tb = TB(dut) @@ -84,7 +85,7 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N test_frames = [] - for test_data in [payload_data(l) for l in payload_lengths()]: + for test_data in [payload_data(x) for x in payload_lengths()]: test_frame = AxiStreamFrame(test_data) test_frame.tid = cur_id test_frame.tdest = cur_id @@ -108,17 +109,21 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N await RisingEdge(dut.clk) await RisingEdge(dut.clk) + def cycle_pause(): return itertools.cycle([1, 1, 1, 0]) + def size_list(): data_width = int(os.getenv("PARAM_DATA_WIDTH")) byte_width = data_width // 8 - return list(range(1,data_width*4+1))+[512]+[1]*64 + return list(range(1, byte_width*4+1)) + [512] + [1]*64 + def incrementing_payload(length): return bytearray(itertools.islice(itertools.cycle(range(256)), length)) + if cocotb.SIM_NAME: factory = TestFactory(run_test) @@ -129,9 +134,12 @@ if cocotb.SIM_NAME: factory.generate_tests() +# cocotb-test + tests_dir = os.path.dirname(__file__) rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl')) + @pytest.mark.parametrize("data_width", [8, 16, 32]) def test_axis(request, data_width): dut = "test_axis" @@ -150,7 +158,7 @@ def test_axis(request, data_width): parameters['DEST_WIDTH'] = 8 parameters['USER_WIDTH'] = 1 - extra_env = {f'PARAM_{k}' : str(v) for k, v in parameters.items()} + extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} sim_build = os.path.join(tests_dir, "sim_build_"+request.node.name.replace('[', '-').replace(']', '')) @@ -164,4 +172,3 @@ def test_axis(request, data_width): sim_build=sim_build, extra_env=extra_env, ) -