Files
alibaba_pcie/sim/alibaba_pcie.py
2025-11-22 17:02:25 -08:00

187 lines
5.8 KiB
Python

import logging
import cocotb
from cocotb.triggers import Timer, FallingEdge
from cocotb.clock import Clock
from cocotbext.axi import AxiStreamBus
from cocotbext.pcie.core import RootComplex
from cocotbext.pcie.xilinx.us import UltraScalePlusPcieDevice
from baser import BaseRSerdesSource, BaseRSerdesSink
CLK_PERIOD = 4
class TB:
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.INFO)
self.rc = RootComplex()
self.dev = UltraScalePlusPcieDevice(
# configuration options
pcie_generation=3,
# pcie_link_width=2,
# user_clk_frequency=250e6,
alignment="dword",
cq_straddle=False,
cc_straddle=False,
rq_straddle=False,
rc_straddle=False,
rc_4tlp_straddle=False,
pf_count=1,
max_payload_size=1024,
enable_client_tag=True,
enable_extended_tag=True,
enable_parity=False,
enable_rx_msg_interface=False,
enable_sriov=False,
enable_extended_configuration=False,
pf0_msi_enable=True,
pf0_msi_count=32,
pf1_msi_enable=False,
pf1_msi_count=1,
pf2_msi_enable=False,
pf2_msi_count=1,
pf3_msi_enable=False,
pf3_msi_count=1,
pf0_msix_enable=False,
pf0_msix_table_size=0,
pf0_msix_table_bir=0,
pf0_msix_table_offset=0x00000000,
pf0_msix_pba_bir=0,
pf0_msix_pba_offset=0x00000000,
pf1_msix_enable=False,
pf1_msix_table_size=0,
pf1_msix_table_bir=0,
pf1_msix_table_offset=0x00000000,
pf1_msix_pba_bir=0,
pf1_msix_pba_offset=0x00000000,
pf2_msix_enable=False,
pf2_msix_table_size=0,
pf2_msix_table_bir=0,
pf2_msix_table_offset=0x00000000,
pf2_msix_pba_bir=0,
pf2_msix_pba_offset=0x00000000,
pf3_msix_enable=False,
pf3_msix_table_size=0,
pf3_msix_table_bir=0,
pf3_msix_table_offset=0x00000000,
pf3_msix_pba_bir=0,
pf3_msix_pba_offset=0x00000000,
# signals
user_clk=dut.clk_250,
user_reset=dut.rst_250,
user_lnk_up=dut.u_pcie_top.user_lnk_up,
rq_bus=AxiStreamBus.from_entity(dut.u_pcie_top.s_axis_rq),
rc_bus=AxiStreamBus.from_entity(dut.u_pcie_top.m_axis_rc),
cq_bus=AxiStreamBus.from_entity(dut.u_pcie_top.m_axis_cq),
cc_bus=AxiStreamBus.from_entity(dut.u_pcie_top.s_axis_cc),
)
self.dev.functions[0].configure_bar(0, 64*1024)
self.rc.make_port().connect(self.dev)
cocotb.start_soon(Clock(dut.sfp_mgt_clk_p, 6.4, units="ns").start())
self.serdes_sources = []
self.serdes_sinks = []
for ch in dut.u_eth_dma_wrapper.u_taxi_eth_phy_25g_us.ch:
gt_inst = ch.ch_inst.gt.gt_inst
clk = 2.56
gbx_cfg = None
cocotb.start_soon(Clock(gt_inst.tx_clk, clk, units="ns").start())
cocotb.start_soon(Clock(gt_inst.rx_clk, clk, units="ns").start())
self.serdes_sources.append(BaseRSerdesSource(
data=gt_inst.serdes_rx_data,
data_valid=gt_inst.serdes_rx_data_valid,
hdr=gt_inst.serdes_rx_hdr,
hdr_valid=gt_inst.serdes_rx_hdr_valid,
clock=gt_inst.rx_clk,
slip=gt_inst.serdes_rx_bitslip,
reverse=True,
gbx_cfg=gbx_cfg
))
self.serdes_sinks.append(BaseRSerdesSink(
data=gt_inst.serdes_tx_data,
data_valid=gt_inst.serdes_tx_data_valid,
hdr=gt_inst.serdes_tx_hdr,
hdr_valid=gt_inst.serdes_tx_hdr_valid,
gbx_sync=gt_inst.serdes_tx_gbx_sync,
clock=gt_inst.tx_clk,
reverse=True,
gbx_cfg=gbx_cfg
))
@cocotb.test
async def test_sanity(dut):
tb = TB(dut)
await FallingEdge(dut.rst_250)
await Timer(10, 'us')
await tb.rc.enumerate()
mem = tb.rc.mem_pool.alloc_region(16*1024*1024)
dev = tb.rc.find_device(tb.dev.functions[0].pcie_id)
await dev.enable_device()
await dev.set_master()
dev_bar0 = dev.bar_window[0]
message = b"Hello, world! This is a long string of data with many letters and words."
await mem.write(0, message)
await dev_bar0.write_dword(0x0, 0x00000000)
await dev_bar0.write_dword(0x4, 0x00000000)
await dev_bar0.write_dword(0x8, 0x00000000)
await dev_bar0.write_dword(0xc, len(message))
await dev_bar0.write_dword(0x10, 0x00000001)
await Timer(1, "us")
await dev_bar0.write_dword(0x40, 0x00000000)
await dev_bar0.write_dword(0x44, 0x00000000)
await dev_bar0.write_dword(0x48, 0x00000000)
await dev_bar0.write_dword(0x4c, len(message))
await dev_bar0.write_dword(0x50, 0x00000001)
await dev_bar0.write_dword(0x60, 0x00000000)
await dev_bar0.write_dword(0x64, 0x00000000)
await dev_bar0.write_dword(0x68, 0x00000000)
await dev_bar0.write_dword(0x6c, len(message))
await dev_bar0.write_dword(0x70, 0x00000001)
rx_frame = await tb.serdes_sinks[0].recv()
tb.log.info(rx_frame)
await tb.serdes_sources[1].send(rx_frame)
await Timer(1, "us")
await dev_bar0.write_dword(0x20, 0x00000100)
await dev_bar0.write_dword(0x24, 0x00000000)
await dev_bar0.write_dword(0x28, 0x00000000)
await dev_bar0.write_dword(0x2c, len(message))
await dev_bar0.write_dword(0x30, 0x00000001)
await Timer(1, "us")
msg = await mem.read(0x100, len(message))
tb.log.info(msg)