eth: Check stats outputs in AXI stream GMII RX module testbench

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich
2025-04-07 13:32:03 -07:00
parent 2e05b1eff2
commit 1b28dc4b9a

View File

@@ -48,6 +48,26 @@ class TB:
dut.cfg_rx_max_pkt_len.setimmediatevalue(0)
dut.cfg_rx_enable.setimmediatevalue(0)
self.stats = {}
self.stats["rx_start_packet"] = 0
self.stats["stat_rx_byte"] = 0
self.stats["stat_rx_pkt_len"] = 0
self.stats["stat_rx_pkt_fragment"] = 0
self.stats["stat_rx_pkt_jabber"] = 0
self.stats["stat_rx_pkt_ucast"] = 0
self.stats["stat_rx_pkt_mcast"] = 0
self.stats["stat_rx_pkt_bcast"] = 0
self.stats["stat_rx_pkt_vlan"] = 0
self.stats["stat_rx_pkt_good"] = 0
self.stats["stat_rx_pkt_bad"] = 0
self.stats["stat_rx_err_oversize"] = 0
self.stats["stat_rx_err_bad_fcs"] = 0
self.stats["stat_rx_err_bad_block"] = 0
self.stats["stat_rx_err_framing"] = 0
self.stats["stat_rx_err_preamble"] = 0
cocotb.start_soon(self._run_stats_counters())
async def reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
@@ -59,6 +79,15 @@ class TB:
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
for stat in self.stats:
self.stats[stat] = 0
async def _run_stats_counters(self):
while True:
await RisingEdge(self.dut.clk)
for stat in self.stats:
self.stats[stat] += int(getattr(self.dut, stat).value)
def set_enable_generator(self, generator=None):
if self._enable_cr is not None:
self._enable_cr.kill()
@@ -95,9 +124,14 @@ async def run_test(dut, payload_lengths=None, payload_data=None, ifg=12, enable_
test_frames = [payload_data(x) for x in payload_lengths()]
tx_frames = []
total_bytes = 0
total_pkts = 0
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data, tx_complete=tx_frames.append)
await tb.source.send(test_frame)
total_bytes += max(len(test_data), 60)+4
total_pkts += 1
for test_data in test_frames:
rx_frame = await tb.sink.recv()
@@ -119,6 +153,26 @@ async def run_test(dut, payload_lengths=None, payload_data=None, ifg=12, enable_
assert tb.sink.empty()
for stat, val in tb.stats.items():
tb.log.info("%s: %d", stat, val)
assert tb.stats["rx_start_packet"] == total_pkts
assert tb.stats["stat_rx_byte"] == total_bytes
assert tb.stats["stat_rx_pkt_len"] == total_bytes
assert tb.stats["stat_rx_pkt_fragment"] == 0
assert tb.stats["stat_rx_pkt_jabber"] == 0
assert tb.stats["stat_rx_pkt_ucast"] == total_pkts
assert tb.stats["stat_rx_pkt_mcast"] == 0
assert tb.stats["stat_rx_pkt_bcast"] == 0
assert tb.stats["stat_rx_pkt_vlan"] == 0
assert tb.stats["stat_rx_pkt_good"] == total_pkts
assert tb.stats["stat_rx_pkt_bad"] == 0
assert tb.stats["stat_rx_err_oversize"] == 0
assert tb.stats["stat_rx_err_bad_fcs"] == 0
assert tb.stats["stat_rx_err_bad_block"] == 0
assert tb.stats["stat_rx_err_framing"] == 0
assert tb.stats["stat_rx_err_preamble"] == 0
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
@@ -157,6 +211,26 @@ async def run_test_oversize(dut, ifg=12, enable_gen=None, mii_sel=False):
assert tb.sink.empty()
for stat, val in tb.stats.items():
tb.log.info("%s: %d", stat, val)
assert tb.stats["rx_start_packet"] == 3
assert tb.stats["stat_rx_byte"] == 64*2+1519
assert tb.stats["stat_rx_pkt_len"] == 64*2+1519
assert tb.stats["stat_rx_pkt_fragment"] == 0
assert tb.stats["stat_rx_pkt_jabber"] == 0
assert tb.stats["stat_rx_pkt_ucast"] == 3
assert tb.stats["stat_rx_pkt_mcast"] == 0
assert tb.stats["stat_rx_pkt_bcast"] == 0
assert tb.stats["stat_rx_pkt_vlan"] == 0
assert tb.stats["stat_rx_pkt_good"] == 2
assert tb.stats["stat_rx_pkt_bad"] == 1
assert tb.stats["stat_rx_err_oversize"] == 1
assert tb.stats["stat_rx_err_bad_fcs"] == 0
assert tb.stats["stat_rx_err_bad_block"] == 0
assert tb.stats["stat_rx_err_framing"] == 0
assert tb.stats["stat_rx_err_preamble"] == 0
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)