Files
crypto/ChaCha20_Poly1305_64/sim/poly1305_friendly_modulo.py

110 lines
2.7 KiB
Python

import logging
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import Timer, RisingEdge, FallingEdge
from cocotb.queue import Queue
from cocotbext.axi import AxiStreamBus, AxiStreamSource
import random
PRIME = 2**130-5
CLK_PERIOD = 4
class TB:
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.INFO)
self.input_queue = Queue()
self.expected_queue = Queue()
self.output_queue = Queue()
cocotb.start_soon(Clock(self.dut.i_clk, CLK_PERIOD, units="ns").start())
cocotb.start_soon(self.run_input())
cocotb.start_soon(self.run_output())
async def cycle_reset(self):
await self._cycle_reset(self.dut.i_rst, self.dut.i_clk)
async def _cycle_reset(self, rst, clk):
rst.setimmediatevalue(0)
await RisingEdge(clk)
await RisingEdge(clk)
rst.value = 1
await RisingEdge(clk)
await RisingEdge(clk)
rst.value = 0
await RisingEdge(clk)
await RisingEdge(clk)
async def write_input(self, value: int, shift_amount: int):
await self.input_queue.put((value, shift_amount))
await self.expected_queue.put((value << (shift_amount*26)) % PRIME)
async def run_input(self):
while True:
value, shift_amount = await self.input_queue.get()
self.dut.i_valid.value = 1
self.dut.i_val.value = value
self.dut.i_shift_amount.value = shift_amount
await RisingEdge(self.dut.i_clk)
self.dut.i_valid.value = 0
self.dut.i_shift_amount.value = 0
self.dut.i_val.value = 0
async def run_output(self):
while True:
await RisingEdge(self.dut.i_clk)
if self.dut.o_valid.value:
await self.output_queue.put(self.dut.o_result.value.integer)
@cocotb.test
async def test_sanity(dut):
tb = TB(dut)
await tb.cycle_reset()
count = 1024
for _ in range(count):
await tb.write_input(random.randint(1,2**(130+16)), random.randint(0, 4))
fail = False
for _ in range(count):
sim_val = await tb.expected_queue.get()
dut_val = await tb.output_queue.get()
if sim_val != dut_val:
tb.log.info(f"{sim_val:x} -> {dut_val:x}")
fail = True
assert not fail
@cocotb.test
async def test_directed(dut):
tb = TB(dut)
await tb.cycle_reset()
await tb.write_input(0x14C0D69391E7116E057E7AD833B00B706AA2390C, 4)
fail = False
sim_val = await tb.expected_queue.get()
dut_val = await tb.output_queue.get()
if sim_val != dut_val:
tb.log.info(f"{sim_val:x} -> {dut_val:x}")
fail = True
assert not fail