diff --git a/cocotbext/axi/__init__.py b/cocotbext/axi/__init__.py index dd591cb..e00ff14 100644 --- a/cocotbext/axi/__init__.py +++ b/cocotbext/axi/__init__.py @@ -26,7 +26,7 @@ from .version import __version__ from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp -from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink +from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink, AxiStreamMonitor from .axil_master import AxiLiteMasterWrite, AxiLiteMasterRead, AxiLiteMaster from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index e4b8396..3ff66e2 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -596,3 +596,149 @@ class AxiStreamSink(object): self.pause = val await RisingEdge(self.clock) + +class AxiStreamMonitor(object): + + _signals = ["tdata"] + _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] + + def __init__(self, entity, name, clock, reset=None, *args, **kwargs): + self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) + self.entity = entity + self.clock = clock + self.reset = reset + self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs) + + self.log.info("AXI stream monitor") + self.log.info("cocotbext-axi version %s", __version__) + self.log.info("Copyright (c) 2020 Alex Forencich") + self.log.info("https://github.com/alexforencich/cocotbext-axi") + + super().__init__(*args, **kwargs) + + self.active = False + self.queue = deque() + self.sync = Event() + self.read_queue = [] + + self.queue_occupancy_bytes = 0 + self.queue_occupancy_frames = 0 + + self.width = len(self.bus.tdata) + self.byte_width = 1 + + self.reset = reset + + if hasattr(self.bus, "tkeep"): + self.byte_width = len(self.bus.tkeep) + + self.byte_size = self.width // self.byte_width + self.byte_mask = 2**self.byte_size-1 + + self.log.info("AXI stream monitor configuration:") + self.log.info(" Byte size: %d bits", self.byte_size) + self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_width) + self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") + self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") + self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") + if hasattr(self.bus, "tkeep"): + self.log.info(" tkeep width: %d bits", len(self.bus.tkeep)) + else: + self.log.info(" tkeep: not present") + if hasattr(self.bus, "tid"): + self.log.info(" tid width: %d bits", len(self.bus.tid)) + else: + self.log.info(" tid: not present") + if hasattr(self.bus, "tdest"): + self.log.info(" tdest width: %d bits", len(self.bus.tdest)) + else: + self.log.info(" tdest: not present") + if hasattr(self.bus, "tuser"): + self.log.info(" tuser width: %d bits", len(self.bus.tuser)) + else: + self.log.info(" tuser: not present") + + cocotb.fork(self._run()) + + def recv(self, compact=True): + if self.queue: + frame = self.queue.popleft() + self.queue_occupancy_bytes -= len(frame) + self.queue_occupancy_frames -= 1 + if compact: + frame.compact() + return frame + return None + + def read(self, count=-1): + while True: + frame = self.recv(compact=True) + if frame is None: + break + self.read_queue.extend(frame.tdata) + if count < 0: + count = len(self.read_queue) + data = self.read_queue[:count] + del self.read_queue[:count] + return data + + def count(self): + return len(self.queue) + + def empty(self): + return not self.queue + + def idle(self): + return not self.active + + async def wait(self, timeout=0, timeout_unit='ns'): + if not self.empty(): + return + self.sync.clear() + if timeout: + await First(self.sync.wait(), Timer(timeout, timeout_unit)) + else: + await self.sync.wait() + + async def _run(self): + frame = AxiStreamFrame([], [], [], [], []) + self.active = False + + while True: + await ReadOnly() + + # read handshake signals + tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value + tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value + + if self.reset is not None and self.reset.value: + await RisingEdge(self.clock) + frame = AxiStreamFrame([], [], [], [], []) + self.active = False + continue + + if tready_sample and tvalid_sample: + for offset in range(self.byte_width): + + frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask) + if hasattr(self.bus, "tkeep"): + frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1) + if hasattr(self.bus, "tid"): + frame.tid.append(self.bus.tid.value.integer) + if hasattr(self.bus, "tdest"): + frame.tdest.append(self.bus.tdest.value.integer) + if hasattr(self.bus, "tuser"): + frame.tuser.append(self.bus.tuser.value.integer) + + if not hasattr(self.bus, "tlast") or self.bus.tlast.value: + if self.byte_size == 8: + frame.tdata = bytearray(frame.tdata) + + self.log.info("RX frame: %s", frame) + + self.queue.append(frame) + self.sync.set() + + frame = AxiStreamFrame([], [], [], [], []) + + await RisingEdge(self.clock) diff --git a/tests/axis/test_axis.py b/tests/axis/test_axis.py index 0b5747b..fc2b4d1 100644 --- a/tests/axis/test_axis.py +++ b/tests/axis/test_axis.py @@ -35,7 +35,7 @@ from cocotb.clock import Clock from cocotb.triggers import RisingEdge from cocotb.regression import TestFactory -from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink +from cocotbext.axi import AxiStreamFrame, AxiStreamSource, AxiStreamSink, AxiStreamMonitor class TB(object): @@ -49,6 +49,7 @@ class TB(object): self.source = AxiStreamSource(dut, "axis", dut.clk, dut.rst) self.sink = AxiStreamSink(dut, "axis", dut.clk, dut.rst) + self.monitor = AxiStreamMonitor(dut, "axis", dut.clk, dut.rst) def set_idle_generator(self, generator=None): if generator: @@ -104,7 +105,16 @@ async def run_test(dut, payload_lengths=None, payload_data=None, idle_inserter=N assert rx_frame.tdest == test_frame.tdest assert not rx_frame.tuser + await tb.monitor.wait() + rx_frame = tb.monitor.recv() + + assert rx_frame.tdata == test_frame.tdata + assert rx_frame.tid == test_frame.tid + assert rx_frame.tdest == test_frame.tdest + assert not rx_frame.tuser + assert tb.sink.empty() + assert tb.monitor.empty() await RisingEdge(dut.clk) await RisingEdge(dut.clk)