Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f032b3c9b | ||
|
|
31081c579b | ||
|
|
595e0b1b49 | ||
|
|
22b10f2dd8 | ||
|
|
db3841dd99 | ||
|
|
1057058cb3 | ||
|
|
e5f2be4468 | ||
|
|
4b5b62419b | ||
|
|
03f8fe0fd3 | ||
|
|
435b9c9282 |
5
.github/workflows/regression-tests.yml
vendored
5
.github/workflows/regression-tests.yml
vendored
@@ -30,3 +30,8 @@ jobs:
|
||||
|
||||
- name: Test with tox
|
||||
run: tox
|
||||
|
||||
- name: Upload coverage to codecov
|
||||
run: |
|
||||
pip install codecov
|
||||
codecov
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -11,3 +11,4 @@ __pycache__/
|
||||
|
||||
.tox
|
||||
sim_build_*
|
||||
.coverage
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
# AXI interface modules for Cocotb
|
||||
|
||||
[](https://github.com/alexforencich/cocotbext-axi/actions/)
|
||||
[](https://codecov.io/gh/alexforencich/cocotbext-axi)
|
||||
[](https://pypi.org/project/cocotbext-axi)
|
||||
|
||||
GitHub repository: https://github.com/alexforencich/cocotbext-axi
|
||||
|
||||
## Introduction
|
||||
|
||||
@@ -74,7 +74,7 @@ class AxiMasterWrite(object):
|
||||
|
||||
self.int_write_resp_command_queue = deque()
|
||||
self.int_write_resp_command_sync = Event()
|
||||
self.int_write_resp_queue_list = {}
|
||||
self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
|
||||
|
||||
self.in_flight_operations = 0
|
||||
|
||||
@@ -318,7 +318,6 @@ class AxiMasterWrite(object):
|
||||
user = []
|
||||
|
||||
for bid, burst_length in cmd.burst_list:
|
||||
self.int_write_resp_queue_list.setdefault(bid, deque())
|
||||
while True:
|
||||
if self.int_write_resp_queue_list[bid]:
|
||||
break
|
||||
@@ -390,7 +389,7 @@ class AxiMasterRead(object):
|
||||
|
||||
self.int_read_resp_command_queue = deque()
|
||||
self.int_read_resp_command_sync = Event()
|
||||
self.int_read_resp_queue_list = {}
|
||||
self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
|
||||
|
||||
self.in_flight_operations = 0
|
||||
|
||||
@@ -599,7 +598,6 @@ class AxiMasterRead(object):
|
||||
|
||||
for rid, burst_length in cmd.burst_list:
|
||||
for k in range(burst_length):
|
||||
self.int_read_resp_queue_list.setdefault(rid, deque())
|
||||
while True:
|
||||
if self.int_read_resp_queue_list[rid]:
|
||||
break
|
||||
@@ -672,13 +670,13 @@ class AxiMaster(object):
|
||||
self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len)
|
||||
self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len)
|
||||
|
||||
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None,
|
||||
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
|
||||
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
|
||||
self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, event)
|
||||
self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
|
||||
|
||||
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
|
||||
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
|
||||
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
|
||||
self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, wuser, event)
|
||||
self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
|
||||
|
||||
def idle(self):
|
||||
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "0.1.0"
|
||||
__version__ = "0.1.2"
|
||||
|
||||
24
setup.cfg
24
setup.cfg
@@ -40,6 +40,8 @@ include = cocotbext.*
|
||||
[tool:pytest]
|
||||
testpaths =
|
||||
tests
|
||||
addopts =
|
||||
--import-mode importlib
|
||||
|
||||
# tox configuration
|
||||
[tox:tox]
|
||||
@@ -53,10 +55,30 @@ python =
|
||||
3.9: py39
|
||||
|
||||
[testenv]
|
||||
setenv =
|
||||
COVERAGE=1
|
||||
|
||||
deps =
|
||||
pytest
|
||||
pytest-xdist
|
||||
cocotb-test
|
||||
coverage
|
||||
pytest-cov
|
||||
|
||||
commands =
|
||||
pytest -n auto
|
||||
pytest --cov=cocotbext --cov=tests --cov-branch -n auto
|
||||
bash -c 'find . -type f -name "\.coverage" | xargs coverage combine --append'
|
||||
|
||||
whitelist_externals =
|
||||
bash
|
||||
|
||||
# combine if paths are different
|
||||
[coverage:paths]
|
||||
source =
|
||||
cocotbext/
|
||||
/*/cocotbext
|
||||
|
||||
# do not report dependencies
|
||||
[coverage:report]
|
||||
omit =
|
||||
.tox/*
|
||||
|
||||
@@ -98,7 +98,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
|
||||
|
||||
for length in list(range(1, byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
@@ -106,7 +106,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
|
||||
|
||||
await tb.axi_master.write(addr, test_data, size=size)
|
||||
|
||||
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
|
||||
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
|
||||
|
||||
assert tb.axi_ram.read(addr, length) == test_data
|
||||
assert tb.axi_ram.read(addr-1, 1) == b'\xaa'
|
||||
@@ -133,7 +133,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
|
||||
|
||||
for length in list(range(1, byte_width*2))+[1024]:
|
||||
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
@@ -157,7 +157,7 @@ async def run_test_write_words(dut):
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_width)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
@@ -206,7 +206,7 @@ async def run_test_read_words(dut):
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_width)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
@@ -254,7 +254,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
async def stress_test_worker(master, offset, aperture, count=16):
|
||||
async def worker(master, offset, aperture, count=16):
|
||||
for k in range(count):
|
||||
length = random.randint(1, min(512, aperture))
|
||||
addr = offset+random.randint(0, aperture-length)
|
||||
@@ -272,7 +272,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
workers = []
|
||||
|
||||
for k in range(16):
|
||||
workers.append(cocotb.fork(stress_test_worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
|
||||
workers.append(cocotb.fork(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
|
||||
|
||||
while workers:
|
||||
await workers.pop(0).join()
|
||||
|
||||
@@ -91,7 +91,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
|
||||
|
||||
for length in range(1, byte_width*2):
|
||||
for offset in range(byte_width):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
@@ -99,7 +99,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
|
||||
|
||||
await tb.axil_master.write(addr, test_data)
|
||||
|
||||
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48))
|
||||
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
|
||||
|
||||
assert tb.axil_ram.read(addr, length) == test_data
|
||||
assert tb.axil_ram.read(addr-1, 1) == b'\xaa'
|
||||
@@ -122,7 +122,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
|
||||
|
||||
for length in range(1, byte_width*2):
|
||||
for offset in range(byte_width):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
|
||||
@@ -146,7 +146,7 @@ async def run_test_write_words(dut):
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_width)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
@@ -195,7 +195,7 @@ async def run_test_read_words(dut):
|
||||
|
||||
for length in list(range(1, 4)):
|
||||
for offset in list(range(byte_width)):
|
||||
tb.log.info(f"length {length}, offset {offset}")
|
||||
tb.log.info("length %d, offset %d", length, offset)
|
||||
addr = offset+0x1000
|
||||
|
||||
test_data = bytearray([x % 256 for x in range(length)])
|
||||
@@ -243,7 +243,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
tb.set_idle_generator(idle_inserter)
|
||||
tb.set_backpressure_generator(backpressure_inserter)
|
||||
|
||||
async def stress_test_worker(master, offset, aperture, count=16):
|
||||
async def worker(master, offset, aperture, count=16):
|
||||
for k in range(count):
|
||||
length = random.randint(1, min(32, aperture))
|
||||
addr = offset+random.randint(0, aperture-length)
|
||||
@@ -261,7 +261,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
|
||||
workers = []
|
||||
|
||||
for k in range(16):
|
||||
workers.append(cocotb.fork(stress_test_worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
|
||||
workers.append(cocotb.fork(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
|
||||
|
||||
while workers:
|
||||
await workers.pop(0).join()
|
||||
|
||||
Reference in New Issue
Block a user