Get it roughly working

This commit is contained in:
2026-05-24 15:53:50 -07:00
parent aa8c4a64df
commit 61ee654b18
5 changed files with 644 additions and 59 deletions

View File

@@ -0,0 +1,299 @@
import cocotb
from cocotb.handle import LogicArray, Array, Immediate
from cocotb.clock import Clock
from cocotb.triggers import ReadOnly, NextTimeStep, RisingEdge, Timer
import logging
import random
from enum import IntEnum
logger = logging.getLogger()
logger.setLevel(logging.INFO)
CLK_PERIOD = 5
SETS = 64
WAYS = 4
TAG_WIDTH = 20
data_arrays = [{}, {}, {}, {}]
meta_arrays = [{}, {}, {}, {}]
lru_array = {}
class MesiState(IntEnum):
MESI_INVALID = 0
MESI_SHARED = 1,
MESI_EXCLUSIVE = 2,
MESI_MODIFIED = 3,
def write_cacheline(index: int, way: int, data: bytes, mesi_state: MesiState, tag: int):
data_arrays[way][index] = data
meta_arrays[way][index] = (mesi_state << 20) | tag
async def handle_cache_arrays(dut):
while True:
await RisingEdge(dut.i_clk)
if dut.o_write_valid.value:
index = int(dut.o_write_index.value)
write_enables = [bool(int(dut.o_write_valid.value) & (1 << i)) for i in range(4)]
write_data = dut.o_write_data.value.to_bytes(byteorder="little")
write_meta = int(dut.o_write_meta.value)
logger.debug(f"Write Valid: {index=} {write_enables=} {write_data=} {write_meta=:#x}")
for data_array, meta_array, write_enable in zip(data_arrays, meta_arrays, write_enables):
if write_enable:
data_array[index] = write_data
meta_array[index] = write_meta
if dut.o_read_valid.value:
index = int(dut.o_read_index.value)
logger.debug(f"Read Valid: {index=}")
read_data = [LogicArray.from_bytes(data[index], byteorder="little") for data in data_arrays]
read_meta = [meta[index] for meta in meta_arrays]
dut.i_read_data.value = read_data
dut.i_read_meta.value = read_meta
async def handle_lru_arrays(dut):
while True:
await RisingEdge(dut.i_clk)
if dut.o_lru_write_valid.value:
logger.debug("lru write")
lru_write_index = int(dut.o_lru_write_index.value)
lru_write_data = int(dut.o_lru_write_data.value)
lru_array[lru_write_index] = lru_write_data
if dut.o_lru_read_valid.value:
logger.debug("lru read")
lru_read_index = int(dut.o_lru_read_index.value)
dut.i_lru_read_data.value = lru_array[lru_read_index]
async def handle_writeback(dut):
dut.i_writeback_done.value = 0
while True:
await RisingEdge(dut.i_clk)
if not dut.o_writeback_valid.value:
continue
logger.info("Writeback valid")
await RisingEdge(dut.i_clk)
await RisingEdge(dut.i_clk)
dut.i_writeback_done.value = 1
await RisingEdge(dut.i_clk)
dut.i_writeback_done.value = 0
async def handle_bus_interface(dut):
dut.i_memory_done.value = 0
dut.i_memory_resp.value = 0
while True:
await RisingEdge(dut.i_clk)
if not dut.o_memory_valid.value:
continue
logger.debug("Bus Interface Access")
await RisingEdge(dut.i_clk)
await RisingEdge(dut.i_clk)
dut.i_memory_done.value = 1
dut.i_memory_resp.value = 2
await RisingEdge(dut.i_clk)
dut.i_memory_done.value = 0
dut.i_memory_resp.value = 0
@cocotb.test
async def test_sanity(dut):
# Request a read from the cache, then request a write to the cache
cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start())
cocotb.start_soon(handle_cache_arrays(dut))
dut.i_rst.value = Immediate(1)
for _ in range(10):
await RisingEdge(dut.i_clk)
dut.i_rst.value = 0
await RisingEdge(dut.o_rdy)
for way in range(WAYS):
for index in range(SETS):
write_cacheline(index, way, bytes([0] * 64), MesiState.MESI_EXCLUSIVE, 0)
for i in range(32):
if not dut.o_rdy.value:
continue
dut.i_cpu_tag.value = 0
dut.i_cpu_index.value = i
dut.i_cpu_offset.value = 0
dut.i_rdy.value = 1
dut.i_cpu_we.value = 0
await RisingEdge(dut.i_clk)
@cocotb.test
async def test_clean_eviction(dut):
cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start())
cocotb.start_soon(handle_cache_arrays(dut))
cocotb.start_soon(handle_lru_arrays(dut))
cocotb.start_soon(handle_writeback(dut))
cocotb.start_soon(handle_bus_interface(dut))
dut.i_rst.value = Immediate(1)
for _ in range(10):
await RisingEdge(dut.i_clk)
dut.i_rst.value = 0
await RisingEdge(dut.o_rdy)
INDEX = 2
# Write with tag 0x55
for way in range(WAYS):
write_cacheline(INDEX, way, bytes([0xaa] * 64), MesiState.MESI_SHARED, way+1)
# read with tag 0xaa
dut.i_cpu_tag.value = 0x0
dut.i_cpu_index.value = INDEX
dut.i_cpu_offset.value = 2
dut.i_rdy.value = 1
dut.i_cpu_we.value = 0
await RisingEdge(dut.i_clk)
dut.i_cpu_tag.value = 0xaa
await RisingEdge(dut.i_clk)
dut.i_cpu_tag.value = 0
await Timer(1, "us")
@cocotb.test
async def test_eviction(dut):
cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start())
cocotb.start_soon(handle_cache_arrays(dut))
cocotb.start_soon(handle_lru_arrays(dut))
cocotb.start_soon(handle_writeback(dut))
cocotb.start_soon(handle_bus_interface(dut))
dut.i_rst.value = Immediate(1)
for _ in range(10):
await RisingEdge(dut.i_clk)
dut.i_rst.value = 0
await RisingEdge(dut.o_rdy)
INDEX = 2
# Write with tag 0x55
for way in range(WAYS):
write_cacheline(INDEX, way, bytes([0xaa] * 64), MesiState.MESI_MODIFIED, way+1)
# read with tag 0xaa
dut.i_cpu_tag.value = 0x0
dut.i_cpu_index.value = INDEX
dut.i_cpu_offset.value = 2
dut.i_rdy.value = 1
dut.i_cpu_we.value = 0
await RisingEdge(dut.i_clk)
dut.i_cpu_tag.value = 0xaa
await RisingEdge(dut.i_clk)
dut.i_cpu_tag.value = 0
await Timer(1, "us")
@cocotb.test
async def test_request_ownership(dut):
cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start())
cocotb.start_soon(handle_cache_arrays(dut))
cocotb.start_soon(handle_lru_arrays(dut))
cocotb.start_soon(handle_writeback(dut))
cocotb.start_soon(handle_bus_interface(dut))
dut.i_rst.value = Immediate(1)
for _ in range(10):
await RisingEdge(dut.i_clk)
dut.i_rst.value = 0
await RisingEdge(dut.o_rdy)
INDEX = 2
# Write with tag way + 1
for way in range(WAYS):
write_cacheline(INDEX, way, bytes([0xaa] * 64), MesiState.MESI_SHARED, way+1)
# write with tag 0x2
dut.i_cpu_tag.value = 0
dut.i_cpu_index.value = INDEX
dut.i_cpu_offset.value = 2
dut.i_cpu_data.value = 0xaa
dut.i_rdy.value = 1
dut.i_cpu_we.value = 1
await RisingEdge(dut.i_clk)
dut.i_cpu_data.value = 0
dut.i_cpu_tag.value = 2
await RisingEdge(dut.i_clk)
dut.i_cpu_tag.value = 0
await Timer(1, "us")
@cocotb.test
async def test_way_read_thrash(dut):
cocotb.start_soon(Clock(dut.i_clk, CLK_PERIOD, unit="ns").start())
cocotb.start_soon(handle_cache_arrays(dut))
cocotb.start_soon(handle_lru_arrays(dut))
cocotb.start_soon(handle_writeback(dut))
cocotb.start_soon(handle_bus_interface(dut))
dut.i_rst.value = Immediate(1)
for _ in range(10):
await RisingEdge(dut.i_clk)
dut.i_rst.value = 0
await RisingEdge(dut.o_rdy)
for tag in range(32):
dut.i_cpu_tag.value = tag
dut.i_cpu_index.value = 0
dut.i_cpu_offset.value = 0
dut.i_rdy.value = 1
await RisingEdge(dut.i_clk)
while not dut.o_rdy.value:
await RisingEdge(dut.i_clk)
await Timer(1, "us")

View File

@@ -4,4 +4,10 @@ tests:
modules:
- "application_wrapper_cache_arrays_test"
sources: "sources.list"
waves: True
- name: "application_wrapper_cache_miss_handler_test"
toplevel: "application_wrapper_cache_miss_handler"
modules:
- "application_wrapper_cache_miss_handler_test"
sources: "sources.list"
waves: True