import logging import cocotb from cocotb.triggers import Timer, FallingEdge from cocotb.clock import Clock from cocotbext.axi import AxiStreamBus from cocotbext.pcie.core import RootComplex from cocotbext.pcie.xilinx.us import UltraScalePlusPcieDevice from baser import BaseRSerdesSource, BaseRSerdesSink from alibaba_pcie_top_regs import alibaba_pcie_top_regsClass CLK_PERIOD = 4 class TB: def __init__(self, dut): self.dut = dut self.log = logging.getLogger("cocotb.tb") self.log.setLevel(logging.INFO) self.rc = RootComplex() self.dev = UltraScalePlusPcieDevice( # configuration options pcie_generation=3, # pcie_link_width=2, # user_clk_frequency=250e6, alignment="dword", cq_straddle=False, cc_straddle=False, rq_straddle=False, rc_straddle=False, rc_4tlp_straddle=False, pf_count=1, max_payload_size=1024, enable_client_tag=True, enable_extended_tag=True, enable_parity=False, enable_rx_msg_interface=False, enable_sriov=False, enable_extended_configuration=False, pf0_msi_enable=True, pf0_msi_count=32, pf1_msi_enable=False, pf1_msi_count=1, pf2_msi_enable=False, pf2_msi_count=1, pf3_msi_enable=False, pf3_msi_count=1, pf0_msix_enable=False, pf0_msix_table_size=0, pf0_msix_table_bir=0, pf0_msix_table_offset=0x00000000, pf0_msix_pba_bir=0, pf0_msix_pba_offset=0x00000000, pf1_msix_enable=False, pf1_msix_table_size=0, pf1_msix_table_bir=0, pf1_msix_table_offset=0x00000000, pf1_msix_pba_bir=0, pf1_msix_pba_offset=0x00000000, pf2_msix_enable=False, pf2_msix_table_size=0, pf2_msix_table_bir=0, pf2_msix_table_offset=0x00000000, pf2_msix_pba_bir=0, pf2_msix_pba_offset=0x00000000, pf3_msix_enable=False, pf3_msix_table_size=0, pf3_msix_table_bir=0, pf3_msix_table_offset=0x00000000, pf3_msix_pba_bir=0, pf3_msix_pba_offset=0x00000000, # signals user_clk=dut.u_pcie_top.clk_250, user_reset=dut.u_pcie_top.rst_250, user_lnk_up=dut.u_pcie_top.user_lnk_up, rq_bus=AxiStreamBus.from_entity(dut.u_pcie_top.s_axis_rq), rc_bus=AxiStreamBus.from_entity(dut.u_pcie_top.m_axis_rc), cq_bus=AxiStreamBus.from_entity(dut.u_pcie_top.m_axis_cq), cc_bus=AxiStreamBus.from_entity(dut.u_pcie_top.s_axis_cc), ) self.dev.functions[0].configure_bar(0, 64*1024) self.rc.make_port().connect(self.dev) cocotb.start_soon(Clock(dut.sfp_mgt_clk_p, 6.4, units="ns").start()) self.serdes_sources = [] self.serdes_sinks = [] for ch in dut.u_eth_dma_wrapper.u_taxi_eth_phy_25g_us.ch: gt_inst = ch.ch_inst.gt.gt_inst clk = 2.56 gbx_cfg = None cocotb.start_soon(Clock(gt_inst.tx_clk, clk, units="ns").start()) cocotb.start_soon(Clock(gt_inst.rx_clk, clk, units="ns").start()) self.serdes_sources.append(BaseRSerdesSource( data=gt_inst.serdes_rx_data, data_valid=gt_inst.serdes_rx_data_valid, hdr=gt_inst.serdes_rx_hdr, hdr_valid=gt_inst.serdes_rx_hdr_valid, clock=gt_inst.rx_clk, slip=gt_inst.serdes_rx_bitslip, reverse=True, gbx_cfg=gbx_cfg )) self.serdes_sinks.append(BaseRSerdesSink( data=gt_inst.serdes_tx_data, data_valid=gt_inst.serdes_tx_data_valid, hdr=gt_inst.serdes_tx_hdr, hdr_valid=gt_inst.serdes_tx_hdr_valid, gbx_sync=gt_inst.serdes_tx_gbx_sync, clock=gt_inst.tx_clk, reverse=True, gbx_cfg=gbx_cfg )) @cocotb.test async def test_sanity(dut): tb = TB(dut) await FallingEdge(dut.rst_250) await Timer(10, 'us') await tb.rc.enumerate() mem = tb.rc.mem_pool.alloc_region(16*1024*1024) dev = tb.rc.find_device(tb.dev.functions[0].pcie_id) await dev.enable_device() await dev.set_master() dev_bar0 = dev.bar_window[0] message = b"Hello, world! This is a long string of data with many letters and words." await mem.write(0, message) regmap = alibaba_pcie_top_regsClass() pcie_dma_rd = regmap.pcie_top_regs.pcie_dma_regs.dma_rd pcie_dma_wr = regmap.pcie_top_regs.pcie_dma_regs.dma_wr eth_dma_rd = regmap.eth_dma_wrapper_regs.pcie_dma_regs.dma_rd eth_dma_wr = regmap.eth_dma_wrapper_regs.pcie_dma_regs.dma_wr # DMA from host to dma memory await dev_bar0.write_dword(pcie_dma_rd.src_addr_low.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_rd.src_addr_high.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_rd.dst_addr.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_rd.length.addr, len(message)) await dev_bar0.write_dword(pcie_dma_rd.trigger.addr, 0x00000001) await Timer(1, "us") # Set up stream to memory DMA to store ethernet frame await dev_bar0.write_dword(eth_dma_wr.src_addr.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_wr.dst_addr_low.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_wr.dst_addr_high.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_wr.length.addr, len(message)) await dev_bar0.write_dword(eth_dma_wr.trigger.addr, 0x00000001) # Trigger memory to stream dma to send ethernet frame await dev_bar0.write_dword(eth_dma_rd.src_addr_low.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_rd.src_addr_high.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_rd.dst_addr.addr, 0x00000000) await dev_bar0.write_dword(eth_dma_rd.length.addr, len(message)) await dev_bar0.write_dword(eth_dma_rd.trigger.addr, 0x00000001) rx_frame = await tb.serdes_sinks[0].recv() tb.log.info(rx_frame) await tb.serdes_sources[1].send(rx_frame) await Timer(1, "us") # DMA from dma memory to host await dev_bar0.write_dword(pcie_dma_wr.src_addr.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_wr.dst_addr_low.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_wr.dst_addr_high.addr, 0x00000000) await dev_bar0.write_dword(pcie_dma_wr.length.addr, len(message)) await dev_bar0.write_dword(pcie_dma_wr.trigger.addr, 0x00000001) await Timer(1, "us") msg = await mem.read(0x100, len(message)) tb.log.info(msg)