import cocotb from cocotb.handle import Immediate, LogicArray from cocotb.simulator import get_sim_time from cocotb.clock import Clock from cocotb.triggers import Timer, RisingEdge, FallingEdge, with_timeout from enum import IntEnum from collections import defaultdict from collections.abc import Mapping import logging import random logger = logging.getLogger() logger.setLevel(logging.INFO) CLK_PERIOD = 5 reference_cache_data = defaultdict(bytearray) higher_cache_data = defaultdict(bytearray) async def cpu_sequencer(dut, sequence: Mapping[int, int, bool, bool]): addr, do, we, sync = sequence[0] dut.i_addr.value = addr dut.i_data.value = do dut.i_we.value = we dut.i_sync.value = sync await FallingEdge(dut.i_rst) index = 1 while index < len(sequence): await RisingEdge(dut.i_clk) if not dut.o_rdy.value: continue addr, do, we, sync = sequence[index] dut.i_addr.value = addr dut.i_data.value = do dut.i_we.value = we dut.i_sync.value = sync index += 1 await Timer(150, "ns") async def cpu_data_monitor(dut): previous_address = 0 address = 0 we = 0 previous_we = 0 i_data = 0 previous_i_data = 0 await FallingEdge(dut.i_rst) while True: await RisingEdge(dut.i_clk) if not dut.o_rdy.value: continue previous_address = address previous_we = we address = int(dut.i_addr.value) we = int(dut.i_we.value) previous_i_data = i_data i_data = int(dut.i_data.value) data = int(dut.o_data.value) if previous_address == 0: continue # don't care if it was a write if previous_we: index = (previous_address // 64) % 64 offset = previous_address % 64 cacheline = reference_cache_data[index] cacheline[offset] = previous_i_data logger.debug(f"We saw a write here {index=} {offset=} previous_data={previous_i_data:x}") else: index = (previous_address // 64) % 64 offset = previous_address % 64 cacheline = reference_cache_data[index] expected_data = cacheline[offset] if (data != expected_data): logger.error(f"{get_sim_time()} {address=:x} {previous_address=:x} {data=:x} {expected_data=:x}") async def mmu_sequencer(dut): while True: await RisingEdge(dut.i_clk) dut.i_phys_address.value = dut.i_addr.value async def handle_higher_level_cache(dut): dut.i_cache_rdy.value = 0 class CacheCmd(IntEnum): CACHE_NONE = 0 CACHE_READ = 1 CACHE_WRITE = 2 while True: await RisingEdge(dut.i_clk) dut.i_cache_rdy.value = 0 if not dut.o_cache_valid.value: continue cmd = CacheCmd(dut.o_cache_cmd.value) addr = int(dut.o_cache_addr.value) logger.debug(f"{cmd=} {addr=}") if cmd == CacheCmd.CACHE_READ: if addr not in higher_cache_data: data = bytearray(random.randbytes(64)) higher_cache_data[addr] = data dut.i_cache_data.value = LogicArray.from_bytes(higher_cache_data[addr] , byteorder="little") dut.i_cache_rdy.value = 1 reference_cache_data[int(dut.read_index.value)] = higher_cache_data[addr] await RisingEdge(dut.i_clk) dut.i_cache_rdy.value = 0 elif cmd == CacheCmd.CACHE_WRITE: dut.i_cache_rdy.value = 1 data = dut.o_cache_data.value.to_bytes(byteorder="little") higher_cache_data[addr] = bytearray(data) await RisingEdge(dut.i_clk) dut.i_cache_rdy.value = 0 @cocotb.test async def sanity_test(dut): expected_cache_misses = 0 expected_evictions = 0 cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) cpu_sequence = [ (0x100, 0xaa, True, False), (0x101, 0xbb, True, False), (0x100, 0x00, False, False), (0x101, 0x00, False, False), (0x200, 0xcc, True, False), (0x201, 0xdd, True, False), (0x100, 0x00, False, False), (0x101, 0x00, False, False), (0x200, 0x00, False, False), (0x201, 0x00, False, False), (0x100, 0x11, True, False), (0x101, 0x22, True, False), (0x100, 0x00, False, False), (0x200, 0x33, True, False), (0x101, 0x00, False, False), (0x201, 0x44, True, False), (0x100, 0x00, False, False), (0x200, 0x00, False, False), (0x101, 0x00, False, False), (0x201, 0x00, False, False), ] dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = 2 expected_evictions = 0 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def clean_evict_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) # Read from one cacheline, then read from an aliased cacheline without writing. # cacheline should be overwritten without evicting cpu_sequence = [ (0x100, 0x00, False, False), (0x1100, 0x00, False, False), ] dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = 2 expected_evictions = 0 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def dirty_evict_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) # Read from one cacheline, then read from an aliased cacheline without writing. # cacheline should be overwritten without evicting cpu_sequence = [ (0x100, 0x41, True, False), (0x101, 0x42, True, False), (0x1100, 0x00, False, False), (0x1100, 0xaa, True, False), (0x100, 0x00, False, False) ] dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = 3 expected_evictions = 2 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def long_write_thrash_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_lines_read = 2**20//64 cpu_sequence = [ (i*64, i % 256, True, False) for i in range(num_lines_read)] dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) # The last 64 lines aren't evicted expected_cache_misses = num_lines_read expected_evictions = num_lines_read - 64 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def long_write_read_thrash_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_lines_read = 2**20//64 cpu_sequence = [ (i*64, i % 256, True, False) for i in range(num_lines_read)] cpu_sequence.extend([ (i*64, 0, False, False) for i in range(num_lines_read)]) dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = num_lines_read * 2 expected_evictions = num_lines_read dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def long_write_linear_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_bytes_read = 2**16 cpu_sequence = [ (i, i % 256, True, False) for i in range(num_bytes_read)] dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = num_bytes_read // 64 expected_evictions = num_bytes_read//64 - 64 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def long_write_read_linear_test(dut): cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_bytes_read = 2**16 cpu_sequence = [ (i, i % 256, True, False) for i in range(num_bytes_read)] cpu_sequence.extend([ (i, 0, False, False) for i in range(num_bytes_read)]) dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = (num_bytes_read // 64) * 2 expected_evictions = num_bytes_read // 64 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def short_write_read_linear_test(dut): # What makes this test "short" is that we read 64 cachelines, # so we shouldn't have to make any evictions # TODO add number of evictions and cachlines loaded as performance counteres cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_bytes_read = 64*64 cpu_sequence = [ (i, i % 256, True, False) for i in range(num_bytes_read)] # 64 bytes times 64 cachelines cpu_sequence.extend([ (i, i % 256, False, False) for i in range(num_bytes_read)]) # 64 bytes times 64 cachelines dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence) expected_cache_misses = num_bytes_read//64 expected_evictions = num_bytes_read//64 - 64 dut_evictions = int(dut.eviction_count.value) dut_misses = int(dut.cache_miss_count.value) if dut_evictions != expected_evictions: logger.error(f"Eviction count mismatch! Expected {expected_evictions}, saw {dut_evictions}") if dut_misses != expected_cache_misses: logger.error(f"Miss count mismatch! Expected {expected_cache_misses}, saw {dut_misses}") @cocotb.test async def random_access_test(dut): # Just fully random accesses # This is also kind of a thrash test since this is not realistic cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start()) cocotb.start_soon(mmu_sequencer(dut)) cocotb.start_soon(handle_higher_level_cache(dut)) cocotb.start_soon(cpu_data_monitor(dut)) num_bytes_read = 2**18 cpu_sequence = [ (random.randint(0, 2**32), random.randint(0, 255), random.randint(0,1), random.randint(0,1)) for _ in range(num_bytes_read)] # 64 bytes times 64 cachelines dut.i_rst.value = Immediate(1) for _ in range(10): await RisingEdge(dut.i_clk) dut.i_rst.value = 0 await cpu_sequencer(dut, cpu_sequence)