import logging import cocotb from cocotb.clock import Clock from cocotb.triggers import Timer, RisingEdge, FallingEdge from cocotb.queue import Queue from cocotbext.axi import AxiStreamBus, AxiStreamSource import random PRIME = 2**130-5 CLK_PERIOD = 4 class TB: def __init__(self, dut): self.dut = dut self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.INFO) self.input_queue = Queue() self.expected_queue = Queue() self.output_queue = Queue() cocotb.start_soon(Clock(self.dut.i_clk, CLK_PERIOD, units="ns").start()) cocotb.start_soon(self.run_input()) cocotb.start_soon(self.run_output()) async def cycle_reset(self): await self._cycle_reset(self.dut.i_rst, self.dut.i_clk) async def _cycle_reset(self, rst, clk): rst.setimmediatevalue(0) await RisingEdge(clk) await RisingEdge(clk) rst.value = 1 await RisingEdge(clk) await RisingEdge(clk) rst.value = 0 await RisingEdge(clk) await RisingEdge(clk) async def write_input(self, value: int, shift_amount: int): await self.input_queue.put((value, shift_amount)) await self.expected_queue.put((value << (shift_amount*26)) % PRIME) async def run_input(self): while True: value, shift_amount = await self.input_queue.get() self.dut.i_valid.value = 1 self.dut.i_val.value = value self.dut.i_shift_amount.value = shift_amount await RisingEdge(self.dut.i_clk) self.dut.i_valid.value = 0 self.dut.i_shift_amount.value = 0 self.dut.i_val.value = 0 async def run_output(self): while True: await RisingEdge(self.dut.i_clk) if self.dut.o_valid.value: await self.output_queue.put(self.dut.o_result.value.integer) @cocotb.test async def test_sanity(dut): tb = TB(dut) await tb.cycle_reset() count = 1024 for _ in range(count): await tb.write_input(random.randint(1,2**(130+16)), random.randint(0, 4)) fail = False for _ in range(count): sim_val = await tb.expected_queue.get() dut_val = await tb.output_queue.get() if sim_val != dut_val: tb.log.info(f"{sim_val:x} -> {dut_val:x}") fail = True assert not fail