10 Commits

Author SHA1 Message Date
Alex Forencich
5f032b3c9b Release v0.1.2 2020-12-04 13:39:22 -08:00
Alex Forencich
31081c579b Point badge at correct project 2020-12-04 13:17:53 -08:00
Tomasz Hemperek
595e0b1b49 Add status badges 2020-12-04 13:16:10 -08:00
Alex Forencich
22b10f2dd8 Preload ID queue list 2020-12-04 12:39:25 -08:00
Alex Forencich
db3841dd99 Improve alignment bit masks 2020-12-04 00:51:32 -08:00
Alex Forencich
1057058cb3 Shorten stress test worker name 2020-12-03 21:03:05 -08:00
Alex Forencich
e5f2be4468 Use logger formatting instead of F-strings 2020-12-03 21:02:17 -08:00
Alex Forencich
4b5b62419b Add missing parameters 2020-12-03 19:34:03 -08:00
Alex Forencich
03f8fe0fd3 Use pytest importlib mode 2020-12-03 19:33:40 -08:00
Tomasz Hemperek
435b9c9282 Add coverge reporting in CI and upload to codecov 2020-12-03 12:52:51 -08:00
8 changed files with 54 additions and 24 deletions

View File

@@ -30,3 +30,8 @@ jobs:
- name: Test with tox - name: Test with tox
run: tox run: tox
- name: Upload coverage to codecov
run: |
pip install codecov
codecov

1
.gitignore vendored
View File

@@ -11,3 +11,4 @@ __pycache__/
.tox .tox
sim_build_* sim_build_*
.coverage

View File

@@ -1,5 +1,9 @@
# AXI interface modules for Cocotb # AXI interface modules for Cocotb
[![Build Status](https://github.com/alexforencich/cocotbext-axi/workflows/Regression%20Tests/badge.svg?branch=master)](https://github.com/alexforencich/cocotbext-axi/actions/)
[![codecov](https://codecov.io/gh/alexforencich/cocotbext-axi/branch/master/graph/badge.svg)](https://codecov.io/gh/alexforencich/cocotbext-axi)
[![PyPI version](https://badge.fury.io/py/cocotbext-axi.svg)](https://pypi.org/project/cocotbext-axi)
GitHub repository: https://github.com/alexforencich/cocotbext-axi GitHub repository: https://github.com/alexforencich/cocotbext-axi
## Introduction ## Introduction

View File

@@ -74,7 +74,7 @@ class AxiMasterWrite(object):
self.int_write_resp_command_queue = deque() self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event() self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = {} self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
@@ -318,7 +318,6 @@ class AxiMasterWrite(object):
user = [] user = []
for bid, burst_length in cmd.burst_list: for bid, burst_length in cmd.burst_list:
self.int_write_resp_queue_list.setdefault(bid, deque())
while True: while True:
if self.int_write_resp_queue_list[bid]: if self.int_write_resp_queue_list[bid]:
break break
@@ -390,7 +389,7 @@ class AxiMasterRead(object):
self.int_read_resp_command_queue = deque() self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event() self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = {} self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
@@ -599,7 +598,6 @@ class AxiMasterRead(object):
for rid, burst_length in cmd.burst_list: for rid, burst_length in cmd.burst_list:
for k in range(burst_length): for k in range(burst_length):
self.int_read_resp_queue_list.setdefault(rid, deque())
while True: while True:
if self.int_read_resp_queue_list[rid]: if self.int_read_resp_queue_list[rid]:
break break
@@ -672,13 +670,13 @@ class AxiMaster(object):
self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len) self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len)
self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len) self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len)
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None): lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
self.read_if.init_read(address, length, burst, size, lock, cache, prot, qos, region, user, event) self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None): cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
self.write_if.init_write(address, data, burst, size, lock, cache, prot, qos, region, user, wuser, event) self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
def idle(self): def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle()) return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())

View File

@@ -1 +1 @@
__version__ = "0.1.0" __version__ = "0.1.2"

View File

@@ -40,6 +40,8 @@ include = cocotbext.*
[tool:pytest] [tool:pytest]
testpaths = testpaths =
tests tests
addopts =
--import-mode importlib
# tox configuration # tox configuration
[tox:tox] [tox:tox]
@@ -53,10 +55,30 @@ python =
3.9: py39 3.9: py39
[testenv] [testenv]
setenv =
COVERAGE=1
deps = deps =
pytest pytest
pytest-xdist pytest-xdist
cocotb-test cocotb-test
coverage
pytest-cov
commands = commands =
pytest -n auto pytest --cov=cocotbext --cov=tests --cov-branch -n auto
bash -c 'find . -type f -name "\.coverage" | xargs coverage combine --append'
whitelist_externals =
bash
# combine if paths are different
[coverage:paths]
source =
cocotbext/
/*/cocotbext
# do not report dependencies
[coverage:report]
omit =
.tox/*

View File

@@ -98,7 +98,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
for length in list(range(1, byte_width*2))+[1024]: for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -106,7 +106,7 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si
await tb.axi_master.write(addr, test_data, size=size) await tb.axi_master.write(addr, test_data, size=size)
tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axi_ram.read(addr, length) == test_data assert tb.axi_ram.read(addr, length) == test_data
assert tb.axi_ram.read(addr-1, 1) == b'\xaa' assert tb.axi_ram.read(addr-1, 1) == b'\xaa'
@@ -133,7 +133,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz
for length in list(range(1, byte_width*2))+[1024]: for length in list(range(1, byte_width*2))+[1024]:
for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)): for offset in list(range(byte_width))+list(range(4096-byte_width, 4096)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -157,7 +157,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -206,7 +206,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -254,7 +254,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter) tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter) tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16): async def worker(master, offset, aperture, count=16):
for k in range(count): for k in range(count):
length = random.randint(1, min(512, aperture)) length = random.randint(1, min(512, aperture))
addr = offset+random.randint(0, aperture-length) addr = offset+random.randint(0, aperture-length)
@@ -272,7 +272,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = [] workers = []
for k in range(16): for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axi_master, k*0x1000, 0x1000, count=16))) workers.append(cocotb.fork(worker(tb.axi_master, k*0x1000, 0x1000, count=16)))
while workers: while workers:
await workers.pop(0).join() await workers.pop(0).join()

View File

@@ -91,7 +91,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
for length in range(1, byte_width*2): for length in range(1, byte_width*2):
for offset in range(byte_width): for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -99,7 +99,7 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins
await tb.axil_master.write(addr, test_data) await tb.axil_master.write(addr, test_data)
tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & ~0xf)-16, (((addr & 0xf)+length-1) & ~0xf)+48))
assert tb.axil_ram.read(addr, length) == test_data assert tb.axil_ram.read(addr, length) == test_data
assert tb.axil_ram.read(addr-1, 1) == b'\xaa' assert tb.axil_ram.read(addr-1, 1) == b'\xaa'
@@ -122,7 +122,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse
for length in range(1, byte_width*2): for length in range(1, byte_width*2):
for offset in range(byte_width): for offset in range(byte_width):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -146,7 +146,7 @@ async def run_test_write_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -195,7 +195,7 @@ async def run_test_read_words(dut):
for length in list(range(1, 4)): for length in list(range(1, 4)):
for offset in list(range(byte_width)): for offset in list(range(byte_width)):
tb.log.info(f"length {length}, offset {offset}") tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000 addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)]) test_data = bytearray([x % 256 for x in range(length)])
@@ -243,7 +243,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
tb.set_idle_generator(idle_inserter) tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter) tb.set_backpressure_generator(backpressure_inserter)
async def stress_test_worker(master, offset, aperture, count=16): async def worker(master, offset, aperture, count=16):
for k in range(count): for k in range(count):
length = random.randint(1, min(32, aperture)) length = random.randint(1, min(32, aperture))
addr = offset+random.randint(0, aperture-length) addr = offset+random.randint(0, aperture-length)
@@ -261,7 +261,7 @@ async def run_stress_test(dut, idle_inserter=None, backpressure_inserter=None):
workers = [] workers = []
for k in range(16): for k in range(16):
workers.append(cocotb.fork(stress_test_worker(tb.axil_master, k*0x1000, 0x1000, count=16))) workers.append(cocotb.fork(worker(tb.axil_master, k*0x1000, 0x1000, count=16)))
while workers: while workers:
await workers.pop(0).join() await workers.pop(0).join()