Add AXI stream monitor

This commit is contained in:
Alex Forencich
2020-11-29 14:15:17 -08:00
parent ea2ee23e0f
commit 5c4072cb16
3 changed files with 158 additions and 2 deletions

View File

@@ -26,7 +26,7 @@ from .version import __version__
from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp
from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink
from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
from .axil_master import AxiLiteMasterWrite, AxiLiteMasterRead, AxiLiteMaster
from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam

View File

@@ -596,3 +596,149 @@ class AxiStreamSink(object):
self.pause = val
await RisingEdge(self.clock)
class AxiStreamMonitor(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.entity = entity
self.clock = clock
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
self.log.info("AXI stream monitor")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(*args, **kwargs)
self.active = False
self.queue = deque()
self.sync = Event()
self.read_queue = []
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata)
self.byte_width = 1
self.reset = reset
if hasattr(self.bus, "tkeep"):
self.byte_width = len(self.bus.tkeep)
self.byte_size = self.width // self.byte_width
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream monitor configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
if hasattr(self.bus, "tkeep"):
self.log.info(" tkeep width: %d bits", len(self.bus.tkeep))
else:
self.log.info(" tkeep: not present")
if hasattr(self.bus, "tid"):
self.log.info(" tid width: %d bits", len(self.bus.tid))
else:
self.log.info(" tid: not present")
if hasattr(self.bus, "tdest"):
self.log.info(" tdest width: %d bits", len(self.bus.tdest))
else:
self.log.info(" tdest: not present")
if hasattr(self.bus, "tuser"):
self.log.info(" tuser width: %d bits", len(self.bus.tuser))
else:
self.log.info(" tuser: not present")
cocotb.fork(self._run())
def recv(self, compact=True):
if self.queue:
frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
return None
def read(self, count=-1):
while True:
frame = self.recv(compact=True)
if frame is None:
break
self.read_queue.extend(frame.tdata)
if count < 0:
count = len(self.read_queue)
data = self.read_queue[:count]
del self.read_queue[:count]
return data
def count(self):
return len(self.queue)
def empty(self):
return not self.queue
def idle(self):
return not self.active
async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty():
return
self.sync.clear()
if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit))
else:
await self.sync.wait()
async def _run(self):
frame = AxiStreamFrame([], [], [], [], [])
self.active = False
while True:
await ReadOnly()
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
frame = AxiStreamFrame([], [], [], [], [])
self.active = False
continue
if tready_sample and tvalid_sample:
for offset in range(self.byte_width):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
if hasattr(self.bus, "tid"):
frame.tid.append(self.bus.tid.value.integer)
if hasattr(self.bus, "tdest"):
frame.tdest.append(self.bus.tdest.value.integer)
if hasattr(self.bus, "tuser"):
frame.tuser.append(self.bus.tuser.value.integer)
if not hasattr(self.bus, "tlast") or self.bus.tlast.value:
if self.byte_size == 8:
frame.tdata = bytearray(frame.tdata)
self.log.info("RX frame: %s", frame)
self.queue.append(frame)
self.sync.set()
frame = AxiStreamFrame([], [], [], [], [])
await RisingEdge(self.clock)

View File

@@ -35,7 +35,7 @@ from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink
from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
class TB(object):
@@ -49,6 +49,7 @@ class TB(object):
self.source = AxiStreamSource(dut, "axis", dut.clk, dut.rst)
self.sink = AxiStreamSink(dut, "axis", dut.clk, dut.rst)
self.monitor = AxiStreamMonitor(dut, "axis", dut.clk, dut.rst)
def set_idle_generator(self, generator=None):
if generator:
@@ -104,7 +105,16 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
await tb.monitor.wait()
rx_frame = tb.monitor.recv()
assert rx_frame.tdata == test_frame.tdata
assert rx_frame.tid == test_frame.tid
assert rx_frame.tdest == test_frame.tdest
assert not rx_frame.tuser
assert tb.sink.empty()
assert tb.monitor.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)