dma: Add AXI DMA interface module and testbench

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich
2025-11-04 12:41:07 -08:00
parent 851919f16f
commit 14d988d1f2
17 changed files with 4005 additions and 0 deletions

View File

@@ -66,6 +66,7 @@ To facilitate the dual-license model, contributions to the project can only be a
* AXI central DMA
* AXI streaming DMA
* DMA client for AXI stream
* DMA interface for AXI
* Segmented SDP RAM
* Segmented dual-clock SDP RAM
* Ethernet

View File

@@ -0,0 +1,6 @@
taxi_dma_if_axi.sv
taxi_dma_if_axi_rd.sv
taxi_dma_if_axi_wr.sv
taxi_dma_desc_if.sv
taxi_dma_ram_if.sv
../lib/taxi/src/axi/rtl/taxi_axi_if.sv

View File

@@ -0,0 +1,210 @@
// SPDX-License-Identifier: CERN-OHL-S-2.0
/*
Copyright (c) 2021-2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
*/
`resetall
`timescale 1ns / 1ps
`default_nettype none
/*
* AXI DMA interface
*/
module taxi_dma_if_axi #
(
// Maximum AXI burst length to generate
parameter AXI_MAX_BURST_LEN = 256,
// Operation table size (read)
parameter RD_OP_TBL_SIZE = 32,
// Operation table size (write)
parameter WR_OP_TBL_SIZE = 32,
// Use AXI ID signals (read)
parameter RD_USE_AXI_ID = 0,
// Use AXI ID signals (write)
parameter WR_USE_AXI_ID = 1
)
(
input wire logic clk,
input wire logic rst,
/*
* AXI master interface
*/
taxi_axi_if.wr_mst m_axi_wr,
taxi_axi_if.rd_mst m_axi_rd,
/*
* Read descriptor
*/
taxi_dma_desc_if.req_snk rd_desc_req,
taxi_dma_desc_if.sts_src rd_desc_sts,
/*
* Write descriptor
*/
taxi_dma_desc_if.req_snk wr_desc_req,
taxi_dma_desc_if.sts_src wr_desc_sts,
/*
* RAM interface
*/
taxi_dma_ram_if.wr_mst dma_ram_wr,
taxi_dma_ram_if.rd_mst dma_ram_rd,
/*
* Configuration
*/
input wire logic read_enable,
input wire logic write_enable,
/*
* Status
*/
output wire logic status_rd_busy,
output wire logic status_wr_busy,
/*
* Statistics
*/
output wire logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_op_start_tag,
output wire logic stat_rd_op_start_valid,
output wire logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_op_finish_tag,
output wire logic [3:0] stat_rd_op_finish_status,
output wire logic stat_rd_op_finish_valid,
output wire logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_req_start_tag,
output wire logic [12:0] stat_rd_req_start_len,
output wire logic stat_rd_req_start_valid,
output wire logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_req_finish_tag,
output wire logic [3:0] stat_rd_req_finish_status,
output wire logic stat_rd_req_finish_valid,
output wire logic stat_rd_op_tbl_full,
output wire logic stat_rd_tx_stall,
output wire logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_op_start_tag,
output wire logic stat_wr_op_start_valid,
output wire logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_op_finish_tag,
output wire logic [3:0] stat_wr_op_finish_status,
output wire logic stat_wr_op_finish_valid,
output wire logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_req_start_tag,
output wire logic [12:0] stat_wr_req_start_len,
output wire logic stat_wr_req_start_valid,
output wire logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_req_finish_tag,
output wire logic [3:0] stat_wr_req_finish_status,
output wire logic stat_wr_req_finish_valid,
output wire logic stat_wr_op_tbl_full,
output wire logic stat_wr_tx_stall
);
taxi_dma_if_axi_rd #(
.AXI_MAX_BURST_LEN(AXI_MAX_BURST_LEN),
.OP_TBL_SIZE(RD_OP_TBL_SIZE),
.USE_AXI_ID(RD_USE_AXI_ID)
)
dma_rd_inst (
.clk(clk),
.rst(rst),
/*
* AXI master interface
*/
.m_axi_rd(m_axi_rd),
/*
* Read descriptor
*/
.rd_desc_req(rd_desc_req),
.rd_desc_sts(rd_desc_sts),
/*
* RAM interface
*/
.dma_ram_wr(dma_ram_wr),
/*
* Configuration
*/
.enable(read_enable),
/*
* Status
*/
.status_busy(status_rd_busy),
/*
* Statistics
*/
.stat_rd_op_start_tag(stat_rd_op_start_tag),
.stat_rd_op_start_valid(stat_rd_op_start_valid),
.stat_rd_op_finish_tag(stat_rd_op_finish_tag),
.stat_rd_op_finish_status(stat_rd_op_finish_status),
.stat_rd_op_finish_valid(stat_rd_op_finish_valid),
.stat_rd_req_start_tag(stat_rd_req_start_tag),
.stat_rd_req_start_len(stat_rd_req_start_len),
.stat_rd_req_start_valid(stat_rd_req_start_valid),
.stat_rd_req_finish_tag(stat_rd_req_finish_tag),
.stat_rd_req_finish_status(stat_rd_req_finish_status),
.stat_rd_req_finish_valid(stat_rd_req_finish_valid),
.stat_rd_op_tbl_full(stat_rd_op_tbl_full),
.stat_rd_tx_stall(stat_rd_tx_stall)
);
taxi_dma_if_axi_wr #(
.AXI_MAX_BURST_LEN(AXI_MAX_BURST_LEN),
.OP_TBL_SIZE(WR_OP_TBL_SIZE),
.USE_AXI_ID(WR_USE_AXI_ID)
)
dma_wr_inst (
.clk(clk),
.rst(rst),
/*
* AXI master interface
*/
.m_axi_wr(m_axi_wr),
/*
* Write descriptor
*/
.wr_desc_req(wr_desc_req),
.wr_desc_sts(wr_desc_sts),
/*
* RAM interface
*/
.dma_ram_rd(dma_ram_rd),
/*
* Configuration
*/
.enable(write_enable),
/*
* Status
*/
.status_busy(status_wr_busy),
/*
* Statistics
*/
.stat_wr_op_start_tag(stat_wr_op_start_tag),
.stat_wr_op_start_valid(stat_wr_op_start_valid),
.stat_wr_op_finish_tag(stat_wr_op_finish_tag),
.stat_wr_op_finish_status(stat_wr_op_finish_status),
.stat_wr_op_finish_valid(stat_wr_op_finish_valid),
.stat_wr_req_start_tag(stat_wr_req_start_tag),
.stat_wr_req_start_len(stat_wr_req_start_len),
.stat_wr_req_start_valid(stat_wr_req_start_valid),
.stat_wr_req_finish_tag(stat_wr_req_finish_tag),
.stat_wr_req_finish_status(stat_wr_req_finish_status),
.stat_wr_req_finish_valid(stat_wr_req_finish_valid),
.stat_wr_op_tbl_full(stat_wr_op_tbl_full),
.stat_wr_tx_stall(stat_wr_tx_stall)
);
endmodule
`resetall

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,65 @@
# SPDX-License-Identifier: CERN-OHL-S-2.0
#
# Copyright (c) 2020-2025 FPGA Ninja, LLC
#
# Authors:
# - Alex Forencich
TOPLEVEL_LANG = verilog
SIM ?= verilator
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ps
RTL_DIR = ../../rtl
LIB_DIR = ../../lib
TAXI_SRC_DIR = $(LIB_DIR)/taxi/src
DUT = taxi_dma_if_axi
COCOTB_TEST_MODULES = test_$(DUT)
COCOTB_TOPLEVEL = test_$(DUT)
MODULE = $(COCOTB_TEST_MODULES)
TOPLEVEL = $(COCOTB_TOPLEVEL)
VERILOG_SOURCES += $(COCOTB_TOPLEVEL).sv
VERILOG_SOURCES += $(RTL_DIR)/$(DUT).f
# handle file list files
process_f_file = $(call process_f_files,$(addprefix $(dir $1),$(shell cat $1)))
process_f_files = $(foreach f,$1,$(if $(filter %.f,$f),$(call process_f_file,$f),$f))
uniq_base = $(if $1,$(call uniq_base,$(foreach f,$1,$(if $(filter-out $(notdir $(lastword $1)),$(notdir $f)),$f,))) $(lastword $1))
VERILOG_SOURCES := $(call uniq_base,$(call process_f_files,$(VERILOG_SOURCES)))
# module parameters
export PARAM_AXI_DATA_W := 64
export PARAM_AXI_ADDR_W := 16
export PARAM_AXI_STRB_W := $(shell expr $(PARAM_AXI_DATA_W) / 8 )
export PARAM_AXI_ID_W := 8
export PARAM_AXI_MAX_BURST_LEN := 256
export PARAM_RAM_SEL_W := 2
export PARAM_RAM_ADDR_W := 16
export PARAM_RAM_SEGS := 2
export PARAM_IMM_EN := 1
export PARAM_IMM_W := $(PARAM_AXI_DATA_W)
export PARAM_LEN_W := 16
export PARAM_TAG_W := 8
export PARAM_RD_OP_TBL_SIZE := $(shell python -c "print(2**$(PARAM_AXI_ID_W))")
export PARAM_WR_OP_TBL_SIZE := $(shell python -c "print(2**$(PARAM_AXI_ID_W))")
export PARAM_RD_USE_AXI_ID := 0
export PARAM_WR_USE_AXI_ID := 1
ifeq ($(SIM), icarus)
PLUSARGS += -fst
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
else ifeq ($(SIM), verilator)
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
VERILATOR_TRACE = 1
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -0,0 +1 @@
../dma_psdp_ram.py

View File

@@ -0,0 +1,347 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: CERN-OHL-S-2.0
"""
Copyright (c) 2021-2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
"""
import itertools
import logging
import os
import sys
import cocotb_test.simulator
import pytest
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.axi import AxiBus, AxiRam
from cocotbext.axi.stream import define_stream
try:
from dma_psdp_ram import PsdpRam, PsdpRamBus
except ImportError:
# attempt import from current directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
try:
from dma_psdp_ram import PsdpRam, PsdpRamBus
finally:
del sys.path[0]
DescBus, DescTransaction, DescSource, DescSink, DescMonitor = define_stream("Desc",
signals=["req_src_addr", "req_src_sel", "req_src_asid", "req_dst_addr", "req_dst_sel", "req_dst_asid", "req_len", "req_tag", "req_valid", "req_ready"],
optional_signals=["req_imm", "req_imm_en", "req_id", "req_dest", "req_user"]
)
DescStatusBus, DescStatusTransaction, DescStatusSource, DescStatusSink, DescStatusMonitor = define_stream("DescStatus",
signals=["sts_tag", "sts_error", "sts_valid"],
optional_signals=["sts_len", "sts_id", "sts_dest", "sts_user"]
)
class TB(object):
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.start_soon(Clock(dut.clk, 10, units="ns").start())
# AXI RAM
self.axi_ram = AxiRam(AxiBus.from_entity(dut.m_axi), dut.clk, dut.rst, size=2**16)
# DMA RAM
self.dma_ram = PsdpRam(PsdpRamBus.from_entity(dut.dma_ram), dut.clk, dut.rst, size=2**16)
# Control
self.read_desc_source = DescSource(DescBus.from_entity(dut.rd_desc), dut.clk, dut.rst)
self.read_desc_status_sink = DescStatusSink(DescStatusBus.from_entity(dut.rd_desc), dut.clk, dut.rst)
self.write_desc_source = DescSource(DescBus.from_entity(dut.wr_desc), dut.clk, dut.rst)
self.write_desc_status_sink = DescStatusSink(DescStatusBus.from_entity(dut.wr_desc), dut.clk, dut.rst)
dut.read_enable.setimmediatevalue(0)
dut.write_enable.setimmediatevalue(0)
def set_idle_generator(self, generator=None):
if generator:
self.axi_ram.write_if.b_channel.set_pause_generator(generator())
self.axi_ram.read_if.r_channel.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
self.axi_ram.write_if.aw_channel.set_pause_generator(generator())
self.axi_ram.write_if.w_channel.set_pause_generator(generator())
self.axi_ram.read_if.ar_channel.set_pause_generator(generator())
self.dma_ram.write_if.set_pause_generator(generator())
self.dma_ram.read_if.set_pause_generator(generator())
async def cycle_reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.write_if.byte_lanes
ram_byte_lanes = tb.dma_ram.write_if.byte_lanes
tag_count = 2**len(tb.write_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.write_enable.value = 1
for length in list(range(1, ram_byte_lanes+3))+list(range(128-4, 128+4))+[1024]:
for axi_offset in axi_offsets:
for ram_offset in range(1):
tb.log.info("length %d, axi_offset %d, ram_offset %d", length, axi_offset, ram_offset)
axi_addr = axi_offset+0x1000
ram_addr = ram_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.dma_ram.write(ram_addr & 0xffff80, b'\x55'*(len(test_data)+256))
tb.axi_ram.write(axi_addr-128, b'\xaa'*(len(test_data)+256))
tb.dma_ram.write(ram_addr, test_data)
tb.log.debug("%s", tb.dma_ram.hexdump_str((ram_addr & ~0xf)-16, (((ram_addr & 0xf)+length-1) & ~0xf)+48, prefix="RAM "))
desc = DescTransaction(req_dst_addr=axi_addr, req_src_addr=ram_addr, req_src_sel=0, req_len=len(test_data), req_tag=cur_tag)
await tb.write_desc_source.send(desc)
status = await tb.write_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
assert tb.axi_ram.read(axi_addr-1, len(test_data)+2) == b'\xaa'+test_data+b'\xaa'
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.write_if.byte_lanes
ram_byte_lanes = tb.dma_ram.write_if.byte_lanes
tag_count = 2**len(tb.read_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.read_enable.value = 1
for length in list(range(1, ram_byte_lanes+3))+list(range(128-4, 128+4))+[1024]:
for axi_offset in axi_offsets:
for ram_offset in range(1):
tb.log.info("length %d, axi_offset %d, ram_offset %d", length, axi_offset, ram_offset)
axi_addr = axi_offset+0x1000
ram_addr = ram_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(axi_addr, test_data)
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
tb.dma_ram.write(ram_addr-256, b'\xaa'*(len(test_data)+512))
desc = DescTransaction(req_src_addr=axi_addr, req_dst_addr=ram_addr, req_dst_sel=0, req_len=len(test_data), req_tag=cur_tag)
await tb.read_desc_source.send(desc)
status = await tb.read_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.dma_ram.hexdump_str((ram_addr & ~0xf)-16, (((ram_addr & 0xf)+length-1) & ~0xf)+48, prefix="RAM "))
assert tb.dma_ram.read(ram_addr-8, len(test_data)+16) == b'\xaa'*8+test_data+b'\xaa'*8
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_write_imm(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.write_if.byte_lanes
tag_count = 2**len(tb.write_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.write_enable.value = 1
for length in list(range(1, len(dut.wr_desc.req_imm) // 8)):
for axi_offset in axi_offsets:
tb.log.info("length %d, axi_offset %d", length, axi_offset)
axi_addr = axi_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
imm = int.from_bytes(test_data, 'little')
tb.axi_ram.write(axi_addr-128, b'\xaa'*(len(test_data)+256))
tb.log.debug("Immediate: 0x%x", imm)
desc = DescTransaction(req_dst_addr=axi_addr, req_src_addr=0, req_src_sel=0, req_imm=imm, req_imm_en=1, req_len=len(test_data), req_tag=cur_tag)
await tb.write_desc_source.send(desc)
status = await tb.write_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
assert tb.axi_ram.read(axi_addr-1, len(test_data)+2) == b'\xaa'+test_data+b'\xaa'
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
if getattr(cocotb, 'top', None) is not None:
for test in [run_test_write, run_test_read, run_test_write_imm]:
factory = TestFactory(test)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
lib_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'lib'))
taxi_src_dir = os.path.abspath(os.path.join(lib_dir, 'taxi', 'src'))
def process_f_files(files):
lst = {}
for f in files:
if f[-2:].lower() == '.f':
with open(f, 'r') as fp:
l = fp.read().split()
for f in process_f_files([os.path.join(os.path.dirname(f), x) for x in l]):
lst[os.path.basename(f)] = f
else:
lst[os.path.basename(f)] = f
return list(lst.values())
@pytest.mark.parametrize("offset_group", list(range(8)))
@pytest.mark.parametrize("axi_data_w", [64, 128])
def test_taxi_dma_if_axi(request, axi_data_w, offset_group):
dut = "taxi_dma_if_axi"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = module
verilog_sources = [
os.path.join(tests_dir, f"{toplevel}.sv"),
os.path.join(rtl_dir, f"{dut}.f"),
]
verilog_sources = process_f_files(verilog_sources)
parameters = {}
parameters['AXI_DATA_W'] = axi_data_w
parameters['AXI_ADDR_W'] = 16
parameters['AXI_STRB_W'] = parameters['AXI_DATA_W'] // 8
parameters['AXI_ID_W'] = 8
parameters['AXI_MAX_BURST_LEN'] = 256
parameters['RAM_SEL_W'] = 2
parameters['RAM_ADDR_W'] = 16
parameters['RAM_SEGS'] = 2
parameters['IMM_EN'] = 1
parameters['IMM_W'] = parameters['AXI_DATA_W']
parameters['LEN_W'] = 16
parameters['TAG_W'] = 8
parameters['RD_OP_TBL_SIZE'] = 2**parameters['AXI_ID_W']
parameters['WR_OP_TBL_SIZE'] = 2**parameters['AXI_ID_W']
parameters['RD_USE_AXI_ID'] = 0
parameters['WR_USE_AXI_ID'] = 1
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
extra_env['OFFSET_GROUP'] = str(offset_group)
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
simulator="verilator",
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,215 @@
// SPDX-License-Identifier: CERN-OHL-S-2.0
/*
Copyright (c) 2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
*/
`resetall
`timescale 1ns / 1ps
`default_nettype none
/*
* AXI DMA interface testbench
*/
module test_taxi_dma_if_axi #
(
/* verilator lint_off WIDTHTRUNC */
parameter AXI_DATA_W = 64,
parameter AXI_ADDR_W = 16,
parameter AXI_STRB_W = AXI_DATA_W / 8,
parameter AXI_ID_W = 8,
parameter AXI_MAX_BURST_LEN = 256,
parameter RAM_SEL_W = 2,
parameter RAM_ADDR_W = 16,
parameter RAM_SEGS = 2,
parameter logic IMM_EN = 1,
parameter IMM_W = AXI_DATA_W,
parameter LEN_W = 16,
parameter TAG_W = 8,
parameter RD_OP_TBL_SIZE = 2**AXI_ID_W,
parameter WR_OP_TBL_SIZE = 2**AXI_ID_W,
parameter logic RD_USE_AXI_ID = 1'b0,
parameter logic WR_USE_AXI_ID = 1'b1
/* verilator lint_on WIDTHTRUNC */
)
();
localparam RAM_DATA_W = AXI_DATA_W*2;
localparam RAM_SEG_DATA_W = RAM_DATA_W / RAM_SEGS;
localparam RAM_SEG_BE_W = RAM_SEG_DATA_W / 8;
localparam RAM_SEG_ADDR_W = RAM_ADDR_W - $clog2(RAM_SEGS*RAM_SEG_BE_W);
logic clk;
logic rst;
taxi_axi_if #(
.DATA_W(AXI_DATA_W),
.ADDR_W(AXI_ADDR_W),
.STRB_W(AXI_STRB_W),
.ID_W(AXI_ID_W),
.AWUSER_EN(1'b0),
.WUSER_EN(1'b0),
.BUSER_EN(1'b0),
.ARUSER_EN(1'b0),
.RUSER_EN(1'b0),
.MAX_BURST_LEN(AXI_MAX_BURST_LEN)
) m_axi();
taxi_dma_desc_if #(
.SRC_ADDR_W(AXI_ADDR_W),
.SRC_SEL_EN(1'b0),
.SRC_ASID_EN(1'b0),
.DST_ADDR_W(RAM_ADDR_W),
.DST_SEL_EN(1'b1),
.DST_SEL_W(RAM_SEL_W),
.DST_ASID_EN(1'b0),
.IMM_EN(1'b0),
.LEN_W(LEN_W),
.TAG_W(TAG_W),
.ID_EN(1'b0),
.DEST_EN(1'b0),
.USER_EN(1'b0)
) rd_desc();
taxi_dma_desc_if #(
.SRC_ADDR_W(RAM_ADDR_W),
.SRC_SEL_EN(1'b1),
.SRC_SEL_W(RAM_SEL_W),
.SRC_ASID_EN(1'b0),
.DST_ADDR_W(AXI_ADDR_W),
.DST_SEL_EN(1'b0),
.DST_ASID_EN(1'b0),
.IMM_EN(IMM_EN),
.IMM_W(IMM_W),
.LEN_W(LEN_W),
.TAG_W(TAG_W),
.ID_EN(1'b0),
.DEST_EN(1'b0),
.USER_EN(1'b0)
) wr_desc();
taxi_dma_ram_if #(
.SEGS(RAM_SEGS),
.SEG_ADDR_W(RAM_SEG_ADDR_W),
.SEG_DATA_W(RAM_SEG_DATA_W),
.SEG_BE_W(RAM_SEG_BE_W)
) dma_ram();
logic read_enable;
logic write_enable;
logic status_rd_busy;
logic status_wr_busy;
logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_op_start_tag;
logic stat_rd_op_start_valid;
logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_op_finish_tag;
logic [3:0] stat_rd_op_finish_status;
logic stat_rd_op_finish_valid;
logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_req_start_tag;
logic [12:0] stat_rd_req_start_len;
logic stat_rd_req_start_valid;
logic [$clog2(RD_OP_TBL_SIZE)-1:0] stat_rd_req_finish_tag;
logic [3:0] stat_rd_req_finish_status;
logic stat_rd_req_finish_valid;
logic stat_rd_op_tbl_full;
logic stat_rd_tx_stall;
logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_op_start_tag;
logic stat_wr_op_start_valid;
logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_op_finish_tag;
logic [3:0] stat_wr_op_finish_status;
logic stat_wr_op_finish_valid;
logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_req_start_tag;
logic [12:0] stat_wr_req_start_len;
logic stat_wr_req_start_valid;
logic [$clog2(WR_OP_TBL_SIZE)-1:0] stat_wr_req_finish_tag;
logic [3:0] stat_wr_req_finish_status;
logic stat_wr_req_finish_valid;
logic stat_wr_op_tbl_full;
logic stat_wr_tx_stall;
taxi_dma_if_axi #(
.AXI_MAX_BURST_LEN(AXI_MAX_BURST_LEN),
.RD_OP_TBL_SIZE(RD_OP_TBL_SIZE),
.WR_OP_TBL_SIZE(WR_OP_TBL_SIZE),
.RD_USE_AXI_ID(RD_USE_AXI_ID),
.WR_USE_AXI_ID(WR_USE_AXI_ID)
)
uut (
.clk(clk),
.rst(rst),
/*
* AXI master interface
*/
.m_axi_wr(m_axi),
.m_axi_rd(m_axi),
/*
* Read descriptor
*/
.rd_desc_req(rd_desc),
.rd_desc_sts(rd_desc),
/*
* Write descriptor
*/
.wr_desc_req(wr_desc),
.wr_desc_sts(wr_desc),
/*
* RAM interface
*/
.dma_ram_wr(dma_ram),
.dma_ram_rd(dma_ram),
/*
* Configuration
*/
.read_enable(read_enable),
.write_enable(write_enable),
/*
* Status
*/
.status_rd_busy(status_rd_busy),
.status_wr_busy(status_wr_busy),
/*
* Statistics
*/
.stat_rd_op_start_tag(stat_rd_op_start_tag),
.stat_rd_op_start_valid(stat_rd_op_start_valid),
.stat_rd_op_finish_tag(stat_rd_op_finish_tag),
.stat_rd_op_finish_status(stat_rd_op_finish_status),
.stat_rd_op_finish_valid(stat_rd_op_finish_valid),
.stat_rd_req_start_tag(stat_rd_req_start_tag),
.stat_rd_req_start_len(stat_rd_req_start_len),
.stat_rd_req_start_valid(stat_rd_req_start_valid),
.stat_rd_req_finish_tag(stat_rd_req_finish_tag),
.stat_rd_req_finish_status(stat_rd_req_finish_status),
.stat_rd_req_finish_valid(stat_rd_req_finish_valid),
.stat_rd_op_tbl_full(stat_rd_op_tbl_full),
.stat_rd_tx_stall(stat_rd_tx_stall),
.stat_wr_op_start_tag(stat_wr_op_start_tag),
.stat_wr_op_start_valid(stat_wr_op_start_valid),
.stat_wr_op_finish_tag(stat_wr_op_finish_tag),
.stat_wr_op_finish_status(stat_wr_op_finish_status),
.stat_wr_op_finish_valid(stat_wr_op_finish_valid),
.stat_wr_req_start_tag(stat_wr_req_start_tag),
.stat_wr_req_start_len(stat_wr_req_start_len),
.stat_wr_req_start_valid(stat_wr_req_start_valid),
.stat_wr_req_finish_tag(stat_wr_req_finish_tag),
.stat_wr_req_finish_status(stat_wr_req_finish_status),
.stat_wr_req_finish_valid(stat_wr_req_finish_valid),
.stat_wr_op_tbl_full(stat_wr_op_tbl_full),
.stat_wr_tx_stall(stat_wr_tx_stall)
);
endmodule
`resetall

View File

@@ -0,0 +1,64 @@
# SPDX-License-Identifier: CERN-OHL-S-2.0
#
# Copyright (c) 2020-2025 FPGA Ninja, LLC
#
# Authors:
# - Alex Forencich
TOPLEVEL_LANG = verilog
SIM ?= verilator
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ps
RTL_DIR = ../../rtl
LIB_DIR = ../../lib
TAXI_SRC_DIR = $(LIB_DIR)/taxi/src
DUT = taxi_dma_if_axi_rd
COCOTB_TEST_MODULES = test_$(DUT)
COCOTB_TOPLEVEL = test_$(DUT)
MODULE = $(COCOTB_TEST_MODULES)
TOPLEVEL = $(COCOTB_TOPLEVEL)
VERILOG_SOURCES += $(COCOTB_TOPLEVEL).sv
VERILOG_SOURCES += $(RTL_DIR)/$(DUT).sv
VERILOG_SOURCES += $(RTL_DIR)/taxi_dma_desc_if.sv
VERILOG_SOURCES += $(RTL_DIR)/taxi_dma_ram_if.sv
VERILOG_SOURCES += $(TAXI_SRC_DIR)/axi/rtl/taxi_axi_if.sv
# handle file list files
process_f_file = $(call process_f_files,$(addprefix $(dir $1),$(shell cat $1)))
process_f_files = $(foreach f,$1,$(if $(filter %.f,$f),$(call process_f_file,$f),$f))
uniq_base = $(if $1,$(call uniq_base,$(foreach f,$1,$(if $(filter-out $(notdir $(lastword $1)),$(notdir $f)),$f,))) $(lastword $1))
VERILOG_SOURCES := $(call uniq_base,$(call process_f_files,$(VERILOG_SOURCES)))
# module parameters
export PARAM_AXI_DATA_W := 64
export PARAM_AXI_ADDR_W := 16
export PARAM_AXI_STRB_W := $(shell expr $(PARAM_AXI_DATA_W) / 8 )
export PARAM_AXI_ID_W := 8
export PARAM_AXI_MAX_BURST_LEN := 256
export PARAM_RAM_SEL_W := 2
export PARAM_RAM_ADDR_W := 16
export PARAM_RAM_SEGS := 2
export PARAM_LEN_W := 16
export PARAM_TAG_W := 8
export PARAM_OP_TBL_SIZE := $(shell python -c "print(2**$(PARAM_AXI_ID_W))")
export PARAM_USE_AXI_ID := 1
ifeq ($(SIM), icarus)
PLUSARGS += -fst
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
else ifeq ($(SIM), verilator)
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
VERILATOR_TRACE = 1
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -0,0 +1 @@
../dma_psdp_ram.py

View File

@@ -0,0 +1,228 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: CERN-OHL-S-2.0
"""
Copyright (c) 2021-2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
"""
import itertools
import logging
import os
import sys
import cocotb_test.simulator
import pytest
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.axi import AxiReadBus, AxiRamRead
from cocotbext.axi.stream import define_stream
try:
from dma_psdp_ram import PsdpRamWrite, PsdpRamWriteBus
except ImportError:
# attempt import from current directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
try:
from dma_psdp_ram import PsdpRamWrite, PsdpRamWriteBus
finally:
del sys.path[0]
DescBus, DescTransaction, DescSource, DescSink, DescMonitor = define_stream("Desc",
signals=["req_src_addr", "req_src_sel", "req_src_asid", "req_dst_addr", "req_dst_sel", "req_dst_asid", "req_len", "req_tag", "req_valid", "req_ready"],
optional_signals=["req_imm", "req_imm_en", "req_id", "req_dest", "req_user"]
)
DescStatusBus, DescStatusTransaction, DescStatusSource, DescStatusSink, DescStatusMonitor = define_stream("DescStatus",
signals=["sts_tag", "sts_error", "sts_valid"],
optional_signals=["sts_len", "sts_id", "sts_dest", "sts_user"]
)
class TB(object):
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.start_soon(Clock(dut.clk, 10, units="ns").start())
# AXI RAM
self.axi_ram = AxiRamRead(AxiReadBus.from_entity(dut.m_axi), dut.clk, dut.rst, size=2**16)
# DMA RAM
self.dma_ram = PsdpRamWrite(PsdpRamWriteBus.from_entity(dut.dma_ram), dut.clk, dut.rst, size=2**16)
# Control
self.read_desc_source = DescSource(DescBus.from_entity(dut.rd_desc), dut.clk, dut.rst)
self.read_desc_status_sink = DescStatusSink(DescStatusBus.from_entity(dut.rd_desc), dut.clk, dut.rst)
dut.enable.setimmediatevalue(0)
def set_idle_generator(self, generator=None):
if generator:
self.axi_ram.r_channel.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
self.axi_ram.ar_channel.set_pause_generator(generator())
self.dma_ram.set_pause_generator(generator())
async def cycle_reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.byte_lanes
ram_byte_lanes = tb.dma_ram.byte_lanes
tag_count = 2**len(tb.read_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.enable.value = 1
for length in list(range(0, ram_byte_lanes+3))+list(range(128-4, 128+4))+[1024]:
for axi_offset in axi_offsets:
for ram_offset in range(ram_byte_lanes+1):
tb.log.info("length %d, axi_offset %d, ram_offset %d", length, axi_offset, ram_offset)
axi_addr = axi_offset+0x1000
ram_addr = ram_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(axi_addr, test_data)
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
tb.dma_ram.write(ram_addr-256, b'\xaa'*(len(test_data)+512))
desc = DescTransaction(req_src_addr=axi_addr, req_dst_addr=ram_addr, req_dst_sel=0, req_len=len(test_data), req_tag=cur_tag)
await tb.read_desc_source.send(desc)
status = await tb.read_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.dma_ram.hexdump_str((ram_addr & ~0xf)-16, (((ram_addr & 0xf)+length-1) & ~0xf)+48, prefix="RAM "))
assert tb.dma_ram.read(ram_addr-8, len(test_data)+16) == b'\xaa'*8+test_data+b'\xaa'*8
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
if getattr(cocotb, 'top', None) is not None:
factory = TestFactory(run_test_read)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
lib_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'lib'))
taxi_src_dir = os.path.abspath(os.path.join(lib_dir, 'taxi', 'src'))
def process_f_files(files):
lst = {}
for f in files:
if f[-2:].lower() == '.f':
with open(f, 'r') as fp:
l = fp.read().split()
for f in process_f_files([os.path.join(os.path.dirname(f), x) for x in l]):
lst[os.path.basename(f)] = f
else:
lst[os.path.basename(f)] = f
return list(lst.values())
@pytest.mark.parametrize("offset_group", list(range(8)))
@pytest.mark.parametrize("axi_data_w", [64, 128])
def test_taxi_dma_if_axi_rd(request, axi_data_w, offset_group):
dut = "taxi_dma_if_axi_rd"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = module
verilog_sources = [
os.path.join(tests_dir, f"{toplevel}.sv"),
os.path.join(rtl_dir, f"{dut}.sv"),
os.path.join(rtl_dir, "taxi_dma_desc_if.sv"),
os.path.join(rtl_dir, "taxi_dma_ram_if.sv"),
os.path.join(taxi_src_dir, "axi", "rtl", "taxi_axi_if.sv"),
]
verilog_sources = process_f_files(verilog_sources)
parameters = {}
parameters['AXI_DATA_W'] = axi_data_w
parameters['AXI_ADDR_W'] = 16
parameters['AXI_STRB_W'] = parameters['AXI_DATA_W'] // 8
parameters['AXI_ID_W'] = 8
parameters['AXI_MAX_BURST_LEN'] = 256
parameters['RAM_SEL_W'] = 2
parameters['RAM_ADDR_W'] = 16
parameters['RAM_SEGS'] = 2
parameters['LEN_W'] = 16
parameters['TAG_W'] = 8
parameters['OP_TBL_SIZE'] = 2**parameters['AXI_ID_W']
parameters['USE_AXI_ID'] = 0
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
extra_env['OFFSET_GROUP'] = str(offset_group)
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
simulator="verilator",
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,156 @@
// SPDX-License-Identifier: CERN-OHL-S-2.0
/*
Copyright (c) 2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
*/
`resetall
`timescale 1ns / 1ps
`default_nettype none
/*
* AXI DMA interface testbench
*/
module test_taxi_dma_if_axi_rd #
(
/* verilator lint_off WIDTHTRUNC */
parameter AXI_DATA_W = 64,
parameter AXI_ADDR_W = 16,
parameter AXI_STRB_W = AXI_DATA_W / 8,
parameter AXI_ID_W = 8,
parameter AXI_MAX_BURST_LEN = 256,
parameter RAM_SEL_W = 2,
parameter RAM_ADDR_W = 16,
parameter RAM_SEGS = 2,
parameter logic IMM_EN = 1,
parameter IMM_W = AXI_DATA_W,
parameter LEN_W = 16,
parameter TAG_W = 8,
parameter OP_TBL_SIZE = 2**AXI_ID_W,
parameter logic USE_AXI_ID = 1'b0
/* verilator lint_on WIDTHTRUNC */
)
();
localparam RAM_DATA_W = AXI_DATA_W*2;
localparam RAM_SEG_DATA_W = RAM_DATA_W / RAM_SEGS;
localparam RAM_SEG_BE_W = RAM_SEG_DATA_W / 8;
localparam RAM_SEG_ADDR_W = RAM_ADDR_W - $clog2(RAM_SEGS*RAM_SEG_BE_W);
logic clk;
logic rst;
taxi_axi_if #(
.DATA_W(AXI_DATA_W),
.ADDR_W(AXI_ADDR_W),
.STRB_W(AXI_STRB_W),
.ID_W(AXI_ID_W),
.AWUSER_EN(1'b0),
.WUSER_EN(1'b0),
.BUSER_EN(1'b0),
.ARUSER_EN(1'b0),
.RUSER_EN(1'b0),
.MAX_BURST_LEN(AXI_MAX_BURST_LEN)
) m_axi();
taxi_dma_desc_if #(
.SRC_ADDR_W(AXI_ADDR_W),
.SRC_SEL_EN(1'b0),
.SRC_ASID_EN(1'b0),
.DST_ADDR_W(RAM_ADDR_W),
.DST_SEL_EN(1'b1),
.DST_SEL_W(RAM_SEL_W),
.DST_ASID_EN(1'b0),
.IMM_EN(1'b0),
.LEN_W(LEN_W),
.TAG_W(TAG_W),
.ID_EN(1'b0),
.DEST_EN(1'b0),
.USER_EN(1'b0)
) rd_desc();
taxi_dma_ram_if #(
.SEGS(RAM_SEGS),
.SEG_ADDR_W(RAM_SEG_ADDR_W),
.SEG_DATA_W(RAM_SEG_DATA_W),
.SEG_BE_W(RAM_SEG_BE_W)
) dma_ram();
logic enable;
logic status_busy;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_rd_op_start_tag;
logic stat_rd_op_start_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_rd_op_finish_tag;
logic [3:0] stat_rd_op_finish_status;
logic stat_rd_op_finish_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_rd_req_start_tag;
logic [12:0] stat_rd_req_start_len;
logic stat_rd_req_start_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_rd_req_finish_tag;
logic [3:0] stat_rd_req_finish_status;
logic stat_rd_req_finish_valid;
logic stat_rd_op_tbl_full;
logic stat_rd_tx_stall;
taxi_dma_if_axi_rd #(
.AXI_MAX_BURST_LEN(AXI_MAX_BURST_LEN),
.OP_TBL_SIZE(OP_TBL_SIZE),
.USE_AXI_ID(USE_AXI_ID)
)
uut (
.clk(clk),
.rst(rst),
/*
* AXI master interface
*/
.m_axi_rd(m_axi),
/*
* Read descriptor
*/
.rd_desc_req(rd_desc),
.rd_desc_sts(rd_desc),
/*
* RAM interface
*/
.dma_ram_wr(dma_ram),
/*
* Configuration
*/
.enable(enable),
/*
* Status
*/
.status_busy(status_busy),
/*
* Statistics
*/
.stat_rd_op_start_tag(stat_rd_op_start_tag),
.stat_rd_op_start_valid(stat_rd_op_start_valid),
.stat_rd_op_finish_tag(stat_rd_op_finish_tag),
.stat_rd_op_finish_status(stat_rd_op_finish_status),
.stat_rd_op_finish_valid(stat_rd_op_finish_valid),
.stat_rd_req_start_tag(stat_rd_req_start_tag),
.stat_rd_req_start_len(stat_rd_req_start_len),
.stat_rd_req_start_valid(stat_rd_req_start_valid),
.stat_rd_req_finish_tag(stat_rd_req_finish_tag),
.stat_rd_req_finish_status(stat_rd_req_finish_status),
.stat_rd_req_finish_valid(stat_rd_req_finish_valid),
.stat_rd_op_tbl_full(stat_rd_op_tbl_full),
.stat_rd_tx_stall(stat_rd_tx_stall)
);
endmodule
`resetall

View File

@@ -0,0 +1,66 @@
# SPDX-License-Identifier: CERN-OHL-S-2.0
#
# Copyright (c) 2020-2025 FPGA Ninja, LLC
#
# Authors:
# - Alex Forencich
TOPLEVEL_LANG = verilog
SIM ?= verilator
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ps
RTL_DIR = ../../rtl
LIB_DIR = ../../lib
TAXI_SRC_DIR = $(LIB_DIR)/taxi/src
DUT = taxi_dma_if_axi_wr
COCOTB_TEST_MODULES = test_$(DUT)
COCOTB_TOPLEVEL = test_$(DUT)
MODULE = $(COCOTB_TEST_MODULES)
TOPLEVEL = $(COCOTB_TOPLEVEL)
VERILOG_SOURCES += $(COCOTB_TOPLEVEL).sv
VERILOG_SOURCES += $(RTL_DIR)/$(DUT).sv
VERILOG_SOURCES += $(RTL_DIR)/taxi_dma_desc_if.sv
VERILOG_SOURCES += $(RTL_DIR)/taxi_dma_ram_if.sv
VERILOG_SOURCES += $(TAXI_SRC_DIR)/axi/rtl/taxi_axi_if.sv
# handle file list files
process_f_file = $(call process_f_files,$(addprefix $(dir $1),$(shell cat $1)))
process_f_files = $(foreach f,$1,$(if $(filter %.f,$f),$(call process_f_file,$f),$f))
uniq_base = $(if $1,$(call uniq_base,$(foreach f,$1,$(if $(filter-out $(notdir $(lastword $1)),$(notdir $f)),$f,))) $(lastword $1))
VERILOG_SOURCES := $(call uniq_base,$(call process_f_files,$(VERILOG_SOURCES)))
# module parameters
export PARAM_AXI_DATA_W := 64
export PARAM_AXI_ADDR_W := 16
export PARAM_AXI_STRB_W := $(shell expr $(PARAM_AXI_DATA_W) / 8 )
export PARAM_AXI_ID_W := 8
export PARAM_AXI_MAX_BURST_LEN := 256
export PARAM_RAM_SEL_W := 2
export PARAM_RAM_ADDR_W := 16
export PARAM_RAM_SEGS := 2
export PARAM_IMM_EN := 1
export PARAM_IMM_W := $(PARAM_AXI_DATA_W)
export PARAM_LEN_W := 16
export PARAM_TAG_W := 8
export PARAM_OP_TBL_SIZE := $(shell python -c "print(2**$(PARAM_AXI_ID_W))")
export PARAM_USE_AXI_ID := 1
ifeq ($(SIM), icarus)
PLUSARGS += -fst
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-P $(COCOTB_TOPLEVEL).$(subst PARAM_,,$(v))=$($(v)))
else ifeq ($(SIM), verilator)
COMPILE_ARGS += $(foreach v,$(filter PARAM_%,$(.VARIABLES)),-G$(subst PARAM_,,$(v))=$($(v)))
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
VERILATOR_TRACE = 1
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -0,0 +1 @@
../dma_psdp_ram.py

View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: CERN-OHL-S-2.0
"""
Copyright (c) 2021-2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
"""
import itertools
import logging
import os
import sys
import cocotb_test.simulator
import pytest
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.axi import AxiWriteBus, AxiRamWrite
from cocotbext.axi.stream import define_stream
try:
from dma_psdp_ram import PsdpRamRead, PsdpRamReadBus
except ImportError:
# attempt import from current directory
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
try:
from dma_psdp_ram import PsdpRamRead, PsdpRamReadBus
finally:
del sys.path[0]
DescBus, DescTransaction, DescSource, DescSink, DescMonitor = define_stream("Desc",
signals=["req_src_addr", "req_src_sel", "req_src_asid", "req_dst_addr", "req_dst_sel", "req_dst_asid", "req_len", "req_tag", "req_valid", "req_ready"],
optional_signals=["req_imm", "req_imm_en", "req_id", "req_dest", "req_user"]
)
DescStatusBus, DescStatusTransaction, DescStatusSource, DescStatusSink, DescStatusMonitor = define_stream("DescStatus",
signals=["sts_tag", "sts_error", "sts_valid"],
optional_signals=["sts_len", "sts_id", "sts_dest", "sts_user"]
)
class TB(object):
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.start_soon(Clock(dut.clk, 10, units="ns").start())
# AXI RAM
self.axi_ram = AxiRamWrite(AxiWriteBus.from_entity(dut.m_axi), dut.clk, dut.rst, size=2**16)
# DMA RAM
self.dma_ram = PsdpRamRead(PsdpRamReadBus.from_entity(dut.dma_ram), dut.clk, dut.rst, size=2**16)
# Control
self.write_desc_source = DescSource(DescBus.from_entity(dut.wr_desc), dut.clk, dut.rst)
self.write_desc_status_sink = DescStatusSink(DescStatusBus.from_entity(dut.wr_desc), dut.clk, dut.rst)
dut.enable.setimmediatevalue(0)
def set_idle_generator(self, generator=None):
if generator:
self.axi_ram.b_channel.set_pause_generator(generator())
def set_backpressure_generator(self, generator=None):
if generator:
self.axi_ram.aw_channel.set_pause_generator(generator())
self.axi_ram.w_channel.set_pause_generator(generator())
self.dma_ram.set_pause_generator(generator())
async def cycle_reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst.value = 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.byte_lanes
ram_byte_lanes = tb.dma_ram.byte_lanes
tag_count = 2**len(tb.write_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.enable.value = 1
for length in list(range(0, ram_byte_lanes+3))+list(range(128-4, 128+4))+[1024]:
for axi_offset in axi_offsets:
for ram_offset in range(ram_byte_lanes+1):
tb.log.info("length %d, axi_offset %d, ram_offset %d", length, axi_offset, ram_offset)
axi_addr = axi_offset+0x1000
ram_addr = ram_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.dma_ram.write(ram_addr & 0xffff80, b'\x55'*(len(test_data)+256))
tb.axi_ram.write(axi_addr-128, b'\xaa'*(len(test_data)+256))
tb.dma_ram.write(ram_addr, test_data)
tb.log.debug("%s", tb.dma_ram.hexdump_str((ram_addr & ~0xf)-16, (((ram_addr & 0xf)+length-1) & ~0xf)+48, prefix="RAM "))
desc = DescTransaction(req_dst_addr=axi_addr, req_src_addr=ram_addr, req_src_sel=0, req_len=len(test_data), req_tag=cur_tag)
await tb.write_desc_source.send(desc)
status = await tb.write_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
assert tb.axi_ram.read(axi_addr-1, len(test_data)+2) == b'\xaa'+test_data+b'\xaa'
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
async def run_test_write_imm(dut, idle_inserter=None, backpressure_inserter=None):
tb = TB(dut)
axi_byte_lanes = tb.axi_ram.byte_lanes
tag_count = 2**len(tb.write_desc_source.bus.req_tag)
axi_offsets = list(range(axi_byte_lanes+1))+list(range(4096-axi_byte_lanes, 4096))
if os.getenv("OFFSET_GROUP") is not None:
group = int(os.getenv("OFFSET_GROUP"))
axi_offsets = axi_offsets[group::8]
cur_tag = 1
tb.set_idle_generator(idle_inserter)
tb.set_backpressure_generator(backpressure_inserter)
await tb.cycle_reset()
tb.dut.enable.value = 1
for length in list(range(1, len(dut.wr_desc.req_imm) // 8)):
for axi_offset in axi_offsets:
tb.log.info("length %d, axi_offset %d", length, axi_offset)
axi_addr = axi_offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
imm = int.from_bytes(test_data, 'little')
tb.axi_ram.write(axi_addr-128, b'\xaa'*(len(test_data)+256))
tb.log.debug("Immediate: 0x%x", imm)
desc = DescTransaction(req_dst_addr=axi_addr, req_src_addr=0, req_src_sel=0, req_imm=imm, req_imm_en=1, req_len=len(test_data), req_tag=cur_tag)
await tb.write_desc_source.send(desc)
status = await tb.write_desc_status_sink.recv()
tb.log.info("status: %s", status)
assert int(status.sts_tag) == cur_tag
assert int(status.sts_error) == 0
tb.log.debug("%s", tb.axi_ram.hexdump_str((axi_addr & ~0xf)-16, (((axi_addr & 0xf)+length-1) & ~0xf)+48, prefix="AXI "))
assert tb.axi_ram.read(axi_addr-1, len(test_data)+2) == b'\xaa'+test_data+b'\xaa'
cur_tag = (cur_tag + 1) % tag_count
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def cycle_pause():
return itertools.cycle([1, 1, 1, 0])
if getattr(cocotb, 'top', None) is not None:
for test in [run_test_write, run_test_write_imm]:
factory = TestFactory(test)
factory.add_option("idle_inserter", [None, cycle_pause])
factory.add_option("backpressure_inserter", [None, cycle_pause])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
lib_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'lib'))
taxi_src_dir = os.path.abspath(os.path.join(lib_dir, 'taxi', 'src'))
def process_f_files(files):
lst = {}
for f in files:
if f[-2:].lower() == '.f':
with open(f, 'r') as fp:
l = fp.read().split()
for f in process_f_files([os.path.join(os.path.dirname(f), x) for x in l]):
lst[os.path.basename(f)] = f
else:
lst[os.path.basename(f)] = f
return list(lst.values())
@pytest.mark.parametrize("offset_group", list(range(8)))
@pytest.mark.parametrize("axi_data_w", [64, 128])
def test_taxi_dma_if_axi_wr(request, axi_data_w, offset_group):
dut = "taxi_dma_if_axi_wr"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = module
verilog_sources = [
os.path.join(tests_dir, f"{toplevel}.sv"),
os.path.join(rtl_dir, f"{dut}.sv"),
os.path.join(rtl_dir, "taxi_dma_desc_if.sv"),
os.path.join(rtl_dir, "taxi_dma_ram_if.sv"),
os.path.join(taxi_src_dir, "axi", "rtl", "taxi_axi_if.sv"),
]
verilog_sources = process_f_files(verilog_sources)
parameters = {}
parameters['AXI_DATA_W'] = axi_data_w
parameters['AXI_ADDR_W'] = 16
parameters['AXI_STRB_W'] = parameters['AXI_DATA_W'] // 8
parameters['AXI_ID_W'] = 8
parameters['AXI_MAX_BURST_LEN'] = 256
parameters['RAM_SEL_W'] = 2
parameters['RAM_ADDR_W'] = 16
parameters['RAM_SEGS'] = 2
parameters['IMM_EN'] = 1
parameters['IMM_W'] = parameters['AXI_DATA_W']
parameters['LEN_W'] = 16
parameters['TAG_W'] = 8
parameters['OP_TBL_SIZE'] = 2**parameters['AXI_ID_W']
parameters['USE_AXI_ID'] = 0
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
extra_env['OFFSET_GROUP'] = str(offset_group)
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
simulator="verilator",
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,157 @@
// SPDX-License-Identifier: CERN-OHL-S-2.0
/*
Copyright (c) 2025 FPGA Ninja, LLC
Authors:
- Alex Forencich
*/
`resetall
`timescale 1ns / 1ps
`default_nettype none
/*
* AXI DMA interface testbench
*/
module test_taxi_dma_if_axi_wr #
(
/* verilator lint_off WIDTHTRUNC */
parameter AXI_DATA_W = 64,
parameter AXI_ADDR_W = 16,
parameter AXI_STRB_W = AXI_DATA_W / 8,
parameter AXI_ID_W = 8,
parameter AXI_MAX_BURST_LEN = 256,
parameter RAM_SEL_W = 2,
parameter RAM_ADDR_W = 16,
parameter RAM_SEGS = 2,
parameter logic IMM_EN = 1,
parameter IMM_W = AXI_DATA_W,
parameter LEN_W = 16,
parameter TAG_W = 8,
parameter OP_TBL_SIZE = 2**AXI_ID_W,
parameter logic USE_AXI_ID = 1'b1
/* verilator lint_on WIDTHTRUNC */
)
();
localparam RAM_DATA_W = AXI_DATA_W*2;
localparam RAM_SEG_DATA_W = RAM_DATA_W / RAM_SEGS;
localparam RAM_SEG_BE_W = RAM_SEG_DATA_W / 8;
localparam RAM_SEG_ADDR_W = RAM_ADDR_W - $clog2(RAM_SEGS*RAM_SEG_BE_W);
logic clk;
logic rst;
taxi_axi_if #(
.DATA_W(AXI_DATA_W),
.ADDR_W(AXI_ADDR_W),
.STRB_W(AXI_STRB_W),
.ID_W(AXI_ID_W),
.AWUSER_EN(1'b0),
.WUSER_EN(1'b0),
.BUSER_EN(1'b0),
.ARUSER_EN(1'b0),
.RUSER_EN(1'b0),
.MAX_BURST_LEN(AXI_MAX_BURST_LEN)
) m_axi();
taxi_dma_desc_if #(
.SRC_ADDR_W(RAM_ADDR_W),
.SRC_SEL_EN(1'b1),
.SRC_SEL_W(RAM_SEL_W),
.SRC_ASID_EN(1'b0),
.DST_ADDR_W(AXI_ADDR_W),
.DST_SEL_EN(1'b0),
.DST_ASID_EN(1'b0),
.IMM_EN(IMM_EN),
.IMM_W(IMM_W),
.LEN_W(LEN_W),
.TAG_W(TAG_W),
.ID_EN(1'b0),
.DEST_EN(1'b0),
.USER_EN(1'b0)
) wr_desc();
taxi_dma_ram_if #(
.SEGS(RAM_SEGS),
.SEG_ADDR_W(RAM_SEG_ADDR_W),
.SEG_DATA_W(RAM_SEG_DATA_W),
.SEG_BE_W(RAM_SEG_BE_W)
) dma_ram();
logic enable;
logic status_busy;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_wr_op_start_tag;
logic stat_wr_op_start_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_wr_op_finish_tag;
logic [3:0] stat_wr_op_finish_status;
logic stat_wr_op_finish_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_wr_req_start_tag;
logic [12:0] stat_wr_req_start_len;
logic stat_wr_req_start_valid;
logic [$clog2(OP_TBL_SIZE)-1:0] stat_wr_req_finish_tag;
logic [3:0] stat_wr_req_finish_status;
logic stat_wr_req_finish_valid;
logic stat_wr_op_tbl_full;
logic stat_wr_tx_stall;
taxi_dma_if_axi_wr #(
.AXI_MAX_BURST_LEN(AXI_MAX_BURST_LEN),
.OP_TBL_SIZE(OP_TBL_SIZE),
.USE_AXI_ID(USE_AXI_ID)
)
uut (
.clk(clk),
.rst(rst),
/*
* AXI master interface
*/
.m_axi_wr(m_axi),
/*
* Write descriptor
*/
.wr_desc_req(wr_desc),
.wr_desc_sts(wr_desc),
/*
* RAM interface
*/
.dma_ram_rd(dma_ram),
/*
* Configuration
*/
.enable(enable),
/*
* Status
*/
.status_busy(status_busy),
/*
* Statistics
*/
.stat_wr_op_start_tag(stat_wr_op_start_tag),
.stat_wr_op_start_valid(stat_wr_op_start_valid),
.stat_wr_op_finish_tag(stat_wr_op_finish_tag),
.stat_wr_op_finish_status(stat_wr_op_finish_status),
.stat_wr_op_finish_valid(stat_wr_op_finish_valid),
.stat_wr_req_start_tag(stat_wr_req_start_tag),
.stat_wr_req_start_len(stat_wr_req_start_len),
.stat_wr_req_start_valid(stat_wr_req_start_valid),
.stat_wr_req_finish_tag(stat_wr_req_finish_tag),
.stat_wr_req_finish_status(stat_wr_req_finish_status),
.stat_wr_req_finish_valid(stat_wr_req_finish_valid),
.stat_wr_op_tbl_full(stat_wr_op_tbl_full),
.stat_wr_tx_stall(stat_wr_tx_stall)
);
endmodule
`resetall