import logging import cocotb from cocotb.triggers import Timer, FallingEdge from cocotb.clock import Clock from cocotbext.axi import AxiStreamBus from cocotbext.pcie.core import RootComplex from cocotbext.pcie.xilinx.us import UltraScalePlusPcieDevice from baser import BaseRSerdesSource, BaseRSerdesSink CLK_PERIOD = 4 class TB: def __init__(self, dut): self.dut = dut self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.INFO) self.rc = RootComplex() self.dev = UltraScalePlusPcieDevice( # configuration options pcie_generation=3, # pcie_link_width=2, # user_clk_frequency=250e6, alignment="dword", cq_straddle=False, cc_straddle=False, rq_straddle=False, rc_straddle=False, rc_4tlp_straddle=False, pf_count=1, max_payload_size=1024, enable_client_tag=True, enable_extended_tag=True, enable_parity=False, enable_rx_msg_interface=False, enable_sriov=False, enable_extended_configuration=False, pf0_msi_enable=True, pf0_msi_count=32, pf1_msi_enable=False, pf1_msi_count=1, pf2_msi_enable=False, pf2_msi_count=1, pf3_msi_enable=False, pf3_msi_count=1, pf0_msix_enable=False, pf0_msix_table_size=0, pf0_msix_table_bir=0, pf0_msix_table_offset=0x00000000, pf0_msix_pba_bir=0, pf0_msix_pba_offset=0x00000000, pf1_msix_enable=False, pf1_msix_table_size=0, pf1_msix_table_bir=0, pf1_msix_table_offset=0x00000000, pf1_msix_pba_bir=0, pf1_msix_pba_offset=0x00000000, pf2_msix_enable=False, pf2_msix_table_size=0, pf2_msix_table_bir=0, pf2_msix_table_offset=0x00000000, pf2_msix_pba_bir=0, pf2_msix_pba_offset=0x00000000, pf3_msix_enable=False, pf3_msix_table_size=0, pf3_msix_table_bir=0, pf3_msix_table_offset=0x00000000, pf3_msix_pba_bir=0, pf3_msix_pba_offset=0x00000000, # signals user_clk=dut.clk_250, user_reset=dut.rst_250, user_lnk_up=dut.user_lnk_up, rq_bus=AxiStreamBus.from_entity(dut.s_axis_rq), rc_bus=AxiStreamBus.from_entity(dut.m_axis_rc), cq_bus=AxiStreamBus.from_entity(dut.m_axis_cq), cc_bus=AxiStreamBus.from_entity(dut.s_axis_cc), ) self.dev.functions[0].configure_bar(0, 64*1024) self.rc.make_port().connect(self.dev) cocotb.start_soon(Clock(dut.sfp_mgt_clk_p, 6.4, units="ns").start()) self.serdes_sources = [] self.serdes_sinks = [] for ch in dut.u_eth_dma_wrapper.u_taxi_eth_phy_25g_us.ch: gt_inst = ch.ch_inst.gt.gt_inst clk = 2.56 gbx_cfg = None cocotb.start_soon(Clock(gt_inst.tx_clk, clk, units="ns").start()) cocotb.start_soon(Clock(gt_inst.rx_clk, clk, units="ns").start()) self.serdes_sources.append(BaseRSerdesSource( data=gt_inst.serdes_rx_data, data_valid=gt_inst.serdes_rx_data_valid, hdr=gt_inst.serdes_rx_hdr, hdr_valid=gt_inst.serdes_rx_hdr_valid, clock=gt_inst.rx_clk, slip=gt_inst.serdes_rx_bitslip, reverse=True, gbx_cfg=gbx_cfg )) self.serdes_sinks.append(BaseRSerdesSink( data=gt_inst.serdes_tx_data, data_valid=gt_inst.serdes_tx_data_valid, hdr=gt_inst.serdes_tx_hdr, hdr_valid=gt_inst.serdes_tx_hdr_valid, gbx_sync=gt_inst.serdes_tx_gbx_sync, clock=gt_inst.tx_clk, reverse=True, gbx_cfg=gbx_cfg )) @cocotb.test async def test_sanity(dut): tb = TB(dut) await FallingEdge(dut.rst_250) await Timer(10, 'us') await tb.rc.enumerate() mem = tb.rc.mem_pool.alloc_region(16*1024*1024) dev = tb.rc.find_device(tb.dev.functions[0].pcie_id) await dev.enable_device() await dev.set_master() dev_bar0 = dev.bar_window[0] tb.log.info(dev_bar0.write) message = b"Hello, world! This is a long string of data with many letters and words." await mem.write(0, message) await dev_bar0.write_dword(0x0, 0x00000000) await dev_bar0.write_dword(0x4, 0x00000000) await dev_bar0.write_dword(0x8, 0x00000000) await dev_bar0.write_dword(0xc, len(message)) await dev_bar0.write_dword(0x10, 0x00000001) await Timer(1, "us") await dev_bar0.write_dword(0x40, 0x00000000) await dev_bar0.write_dword(0x44, 0x00000000) await dev_bar0.write_dword(0x48, 0x00000000) await dev_bar0.write_dword(0x4c, len(message)) await dev_bar0.write_dword(0x50, 0x00000001) await dev_bar0.write_dword(0x60, 0x00000000) await dev_bar0.write_dword(0x64, 0x00000000) await dev_bar0.write_dword(0x68, 0x00000000) await dev_bar0.write_dword(0x6c, len(message)) await dev_bar0.write_dword(0x70, 0x00000001) rx_frame = await tb.serdes_sinks[0].recv() tb.log.info(rx_frame) await tb.serdes_sources[1].send(rx_frame) await Timer(1, "us") await dev_bar0.write_dword(0x20, 0x00000100) await dev_bar0.write_dword(0x24, 0x00000000) await dev_bar0.write_dword(0x28, 0x00000000) await dev_bar0.write_dword(0x2c, len(message)) await dev_bar0.write_dword(0x30, 0x00000001) await Timer(1, "us") msg = await mem.read(0x100, len(message)) tb.log.info(msg)