AXI interface modules for Cocotb
GitHub repository: https://github.com/alexforencich/cocotbext-axi
Introduction
AXI, AXI lite, and AXI stream simulation models for cocotb.
Installation
Installation from pip (release version, stable):
$ pip install cocotbext-axi
Installation from git (latest development version, potentially unstable):
$ pip install https://github.com/alexforencich/cocotbext-axi/archive/master.zip
Installation for active development:
$ git clone https://github.com/alexforencich/cocotbext-axi
$ pip install -e cocotbext-axi
Documentation and usage examples
See the tests directory, verilog-axi, and verilog-axis for complete testbenches using these modules.
AXI and AXI lite master
The AxiMaster and AxiLiteMaster classes implement AXI masters and are capable of generating read and write operations against AXI slaves. Requested operations will be split and aligned according to the AXI specification. The AxiMaster module is capable of generating narrow bursts, handling multiple in-flight operations, and handling reordering and interleaving in responses across different transaction IDs.
The AxiMaster is a wrapper around AxiMasterWrite and AxiMasterRead. Similarly, AxiLiteMaster is a wrapper around AxiLiteMasterWrite and AxiLiteMasterRead. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiMaster
axi_master = AxiMaster(dut, "s_axi", dut.clk, dut.rst)
The modules use cocotb.bus.Bus internally to automatically connect to the corresponding signals in the bus, presuming they are named according to the AXI spec and have a common prefix.
Once the module is instantiated, read and write operations can be initiated in a few different ways.
First, non-blocking operations can be started with init_read() and init_write(). These methods will queue up a read or write operation to be carried out over the interface. The result of the operation can be retrieved with get_read_data() and get_write_resp(). To monitor the status of the module, idle(), wait(), wait_read(), and wait_write() can be used. For example:
axi_master.init_write(0x0000, b'test')
await axi_master.wait()
resp = axi_master.get_write_resp()
axi_master.init_read(0x0000, 4)
await axi_master.wait()
data = axi_master.get_read_data()
Alternatively, an event object can be provided as an argument to init_read() and init_write(), and the result can be retrieved from Event.data. For example:
event = Event()
axi_master.init_write(0x0000, b'test', event=event)
await event.wait()
resp = event.data
event = Event()
axi_master.init_read(0x0000, 4, event=event)
await event.wait()
resp = event.data
Second, blocking operations can be carried out with read() and write() and their associated word-access wrappers. Multiple concurrent operations started from different coroutines are handled correctly. For example:
await axi_master.write(0x0000, b'test')
data = await axi_master.read(0x0000, 4)
read(), write(), get_read_data(), and get_write_resp() return namedtuple objects containing address, data or length, and resp.
AxiMaster and AxiLiteMaster constructor parameters
- entity: object that contains the AXI slave interface signals
- name: signal name prefix (e.g. for
s_axi_awaddr, the prefix iss_axi) - clock: clock signal
- reset: reset signal (optional)
Additional parameters for AxiMaster
- max_burst_len: maximum burst length in cycles, range 1-256, default 256.
Methods
init_read(address, length, ...): initiate reading length bytes, starting at addressinit_write(address, data, ...): initiate writing data (bytes), starting from addressidle(): returns True when there are no outstanding operations in progresswait(): blocking wait until all outstanding operations completewait_read(): wait until all outstanding read operations completewait_write(): wait until all outstanding write operations completeread_data_ready(): determine if any read read data is availableget_read_data(): fetch first available read datawrite_resp_ready(): determine if any write response is availableget_write_resp(): fetch first available write responseread(address, length, ...): read length bytes, starting at addressread_words(address, count, byteorder='little', ws=2, ...): read count ws-byte words, starting at addressread_dwords(address, count, byteorder='little', ...): read count 4-byte dwords, starting at addressread_qwords(address, count, byteorder='little', ...): read count 8-byte qwords, starting at addressread_byte(address, ...): read single byte at addressread_word(address, byteorder='little', ws=2, ...): read single ws-byte word at addressread_dword(address, byteorder='little', ...): read single 4-byte dword at addressread_qword(address, byteorder='little', ...): read single 8-byte qword at addresswrite(address, data, ...): write data (bytes), starting at addresswrite_words(address, data, byteorder='little', ws=2, ...): write data (ws-byte words), starting at addresswrite_dwords(address, data, byteorder='little', ...): write data (4-byte dwords), starting at addresswrite_qwords(address, data, byteorder='little', ...): write data (8-byte qwords), starting at addresswrite_byte(address, data, ...): write single byte at addresswrite_word(address, data, byteorder='little', ws=2, ...): write single ws-byte word at addresswrite_dword(address, data, byteorder='little', ...): write single 4-byte dword at addresswrite_qword(address, data, byteorder='little', ...): write single 8-byte qword at address
Additional optional arguments for AxiMaster
- arid,awid: AXI ID for bursts, default automatically assigned
- burst: AXI burst type, default
AxiBurstType.INCR - size: AXI burst size, default maximum supported by interface
- lock: AXI lock type, default
AxiLockType.NORMAL - cache: AXI cache field, default
0b0011 - prot: AXI protection flags, default
AxiProt.NONSECURE - qos: AXI QOS field, default
0 - region: AXI region field, default
0 - user: AXI user signal (awuser/aruser), default
0 - wuser: AXI wuser signal, default
0(write-related methods only) - event:
Eventobject used to wait on and retrieve result for specific operation, defaultNone(init_read()andinit_write()only). If provided, the event will be triggered when the operation completes and the result returned viaEvent.datainstead ofget_read_data()orget_write_resp().
Additional optional arguments for AxiLiteMaster
- prot: AXI protection flags, default
AxiProt.NONSECURE - event:
Eventobject used to wait on and retrieve result for specific operation, defaultNone(init_read()andinit_write()only). If provided, the event will be triggered when the operation completes and the result returned viaEvent.datainstead ofget_read_data()orget_write_resp().
AXI and AXI lite RAM
The AxiRam and AxiLiteRam classes implement AXI RAMs and are capable of completing read and write operations from upstream AXI masters. The AxiRam module is capable of handling narrow bursts.
The AxiRam is a wrapper around AxiRamWrite and AxiRamRead. Similarly, AxiLiteRam is a wrapper around AxiLiteRamWrite and AxiLiteRamRead. If a read-only or write-only interface is required instead of a full interface, use the corresponding read-only or write-only variant, the usage and API are exactly the same.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiRam
axi_ram = AxiRam(dut, "m_axi", dut.clk, dut.rst, size=2**16)
The modules use cocotb.bus.Bus internally to automatically connect to the corresponding signals in the bus, presuming they are named according to the AXI spec and have a common prefix.
Once the module is instantiated, the memory contents can be accessed in a couple of different ways. First, the mmap object can be accessed directly via the mem attribute. Second, read(), write(), and various word-access wrappers are available. Hex dump helper methods are also provided for debugging. For example:
axi_ram.write(0x0000, b'test')
data = axi_ram.read(0x0000, 4)
Multi-port memories can be constructed by passing the mem object of the first instance to the other instances. For example, here is how to create a four-port RAM:
axi_ram_p1 = AxiRam(dut, "m00_axi", dut.clk, dut.rst, size=2**16)
axi_ram_p2 = AxiRam(dut, "m01_axi", dut.clk, dut.rst, mem=axi_ram_p1.mem)
axi_ram_p3 = AxiRam(dut, "m02_axi", dut.clk, dut.rst, mem=axi_ram_p1.mem)
axi_ram_p4 = AxiRam(dut, "m03_axi", dut.clk, dut.rst, mem=axi_ram_p1.mem)
AxiRam and AxiLiteRam constructor parameters
- entity: object that contains the AXI master interface signals
- name: signal name prefix (e.g. for
m_axi_awaddr, the prefix ism_axi) - clock: clock signal
- reset: reset signal (optional)
- size: memory size in bytes (optional, default 1024)
- mem: mmap object to use (optional, overrides size)
Attributes:
- mem: directly access shared
mmapobject
Methods
read(address, length): read length bytes, starting at addressread_words(address, count, byteorder='little', ws=2): read count ws-byte words, starting at addressread_dwords(address, count, byteorder='little'): read count 4-byte dwords, starting at addressread_qwords(address, count, byteorder='little'): read count 8-byte qwords, starting at addressread_byte(address): read single byte at addressread_word(address, byteorder='little', ws=2): read single ws-byte word at addressread_dword(address, byteorder='little'): read single 4-byte dword at addressread_qword(address, byteorder='little'): read single 8-byte qword at addresswrite(address, data): write data (bytes), starting at addresswrite_words(address, data, byteorder='little', ws=2): write data (ws-byte words), starting at addresswrite_dwords(address, data, byteorder='little'): write data (4-byte dwords), starting at addresswrite_qwords(address, data, byteorder='little'): write data (8-byte qwords), starting at addresswrite_byte(address, data): write single byte at addresswrite_word(address, data, byteorder='little', ws=2): write single ws-byte word at addresswrite_dword(address, data, byteorder='little'): write single 4-byte dword at addresswrite_qword(address, data, byteorder='little'): write single 8-byte qword at addresshexdump(address, length, prefix=''): print hex dump of length bytes starting fromaddress, prefix lines with optionalprefixhexdump_line(address, length, prefix=''): return hex dump (list of str) of length bytes starting fromaddress, prefix lines with optionalprefixhexdump_str(address, length, prefix=''): return hex dump (str) of length bytes starting fromaddress, prefix lines with optionalprefix
AXI stream
The AxiStreamSource, AxiStreamSink, and AxiStreamMonitor classes can be used to drive, receive, and monitor traffic on AXI stream interfaces. The AxiStreamSource drives all signals except for tready and can be used to drive AXI stream traffic into a design. The AxiStreamSink drives the tready line only and as such can receive AXI stream traffic and exert backpressure. The AxiStreamMonitor drives no signals and as such can be connected to internal AXI stream interfaces to monitor traffic.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.axi import AxiStreamSource, AxiStreamSink, AxiStreamMonitor
axis_source = AxiStreamSource(dut, "s_axis", dut.clk, dut.rst)
axis_sink = AxiStreamSink(dut, "m_axis", dut.clk, dut.rst)
axis_monitor = AxiStreamMonitor(dut.inst, "int_axis", dut.clk, dut.rst)
The modules use cocotb.bus.Bus internally to automatically connect to the corresponding signals in the bus, presuming they are named according to the AXI spec and have a common prefix.
To send data into a design with an AxiStreamSource, call send() or write(). Accepted data types are iterables or AxiStreamFrame objects. Call wait() to wait for the transmit operation to complete. Example:
await axis_source.send(b'test data')
To receive data with an AxiStreamSink or AxiStreamMonitor, call recv() or read(). recv() is intended for use with a frame-oriented interface, and by default compacts AxiStreamFrames before returning them. read() is intended for non-frame-oriented streams. Calling read() internally calls recv() for all frames currently in the queue, then compacts and coalesces tdata from all frames into a separate read queue, from which read data is returned. All sideband data is discarded.
data = await axis_sink.recv()
Signals
tdata: data, requiredtvalid: qualifies all other signals; optional, assumed1when absenttready: indicates sink is ready for data; optional, assumed1when absenttlast: marks the last cycle of a frame; optional, assumed1when absenttkeep: qualifies data byte, data bus width must be evenly divisible bytkeepsignal width; optional, assumed1when absenttid: ID signal, can be used for routing; optional, assumed0when absenttdest: destination signal, can be used for routing; optional, assumed0when absenttuser: additional user data; optional, assumed0when absent
Constructor parameters:
- entity: object that contains the AXI stream interface signals
- name: signal name prefix (e.g. for
m_axis_tdata, the prefix ism_axis) - clock: clock signal
- reset: reset signal (optional)
- byte_size: byte size (optional)
- byte_lanes: byte lane count (optional)
Note: byte_size, byte_lanes, len(tdata), and len(tkeep) are all related, in that byte_lanes is set from tkeep if it is connected, and byte_size*byte_lanes == len(tdata). So, if tkeep is connected, both byte_size and byte_lanes will be computed internally and cannot be overridden. If tkeep is not connected, then either byte_size or byte_lanes can be specified, and the other will be computed such that byte_size*byte_lanes == len(tdata).
Attributes:
- pause: stall the interface (deassert
treadyortvalid) (source/sink only) - queue_occupancy_bytes: number of bytes in queue (all)
- queue_occupancy_frames: number of frames in queue (all)
- queue_occupancy_limit_bytes: max number of bytes in queue allowed before tready deassert (sink only)
- queue_occupancy_limit_frames: max number of frames in queue allowed before tready deassert (sink only)
Methods
send(frame): send frame (blocking) (source)send_nowait(frame): send frame (non-blocking) (source)write(data): send data (alias of send) (blocking) (source)write_nowait(data): send data (alias of send_nowait) (non-blocking) (source)recv(compact=True): receive a frame as aGmiiFrame(blocking) (sink)recv_nowait(compact=True): receive a frame as aGmiiFrame(non-blocking) (sink)read(count): read count bytes from buffer (blocking) (sink/monitor)read_nowait(count): read count bytes from buffer (non-blocking) (sink/monitor)read(count): read count bytes from buffer (sink/monitor)count(): returns the number of items in the queue (all)empty(): returns True if the queue is empty (all)full(): returns True if the queue occupancy limits are met (sink)idle(): returns True if no transfer is in progress (all) or if the queue is not empty (source)wait(): wait for idle (source)wait(timeout=0, timeout_unit='ns'): wait for frame received (sink)set_pause_generator(generator): set generator for pause signal, generator will be advanced on every clock cycle (source/sink)clear_pause_generator(): remove generator for pause signal (source/sink)
AxiStreamFrame object
The AxiStreamFrame object is a container for a frame to be transferred via AXI stream. The tdata field contains the packet data in the form of a list of bytes, which is either a bytearray if the byte size is 8 bits or a list of ints otherwise. tkeep, tid, tdest, and tuser can either be None, an int, or a list of ints.
Attributes:
tdata: bytes, bytearray, or listtkeep: tkeep field, optional; list, each entry qualifies the corresponding entry intdata. Can be used to insert gaps on the source side.tid: tid field, optional; int or list with one entry pertdata, last value used per cycle when sending.tdest: tdest field, optional; int or list with one entry pertdata, last value used per cycle when sending.tuser: tuser field, optional; int or list with one entry pertdata, last value used per cycle when sending.
Methods:
normalize(): packtkeep,tid,tdest, andtuserto the same length astdata, replicating last element if necessary, initializetkeepto list of1andtid,tdest, andtuserto list of0if not specified.compact(): removetdata,tid,tdest, andtuservalues based ontkeep, removetkeep, compacttid,tdest, andtuserto an int if all values are identical.
AXI signals
- Write address channel
awid: transaction IDawaddr: addressawlen: burst length (cycles)awsize: burst size (bytes/cycle)awburst: burst typeawlock: lock typeawcache: cache controlawprot: protection bitsawqos: QoS fieldawregion: region fieldawuser: additional user sideband dataawvalid: valid signal, qualifies all channel fieldsawready: ready signal, back-pressure from sink
- Write data channel
wdata: write datawstrb: write strobewlast: end of burst flagwuser: additional user sideband datawvalid: valid signal, qualifies all channel fieldswready: ready signal, back-pressure from sink
- Write response channel
bid: transaction IDbresp: write responsebuser: additional user sideband databvalid: valid signal, qualifies all channel fieldsbready: ready signal, back-pressure from sink
- Read address channel
arid: transaction IDaraddr: addressarlen: burst length (cycles)arsize: burst size (bytes/cycle)arburst: burst typearlock: lock typearcache: cache controlarprot: protection bitsarqos: QoS fieldarregion: region fieldaruser: additional user sideband dataarvalid: valid signal, qualifies all channel fieldsarready: ready signal, back-pressure from sink
- Read data channel
rid: transaction IDrdata: read datarresp: read responserlast: end of burst flagruser: additional user sideband datarvalid: valid signal, qualifies all channel fieldsrready: ready signal, back-pressure from sink
AXI lite signals
- Write address channel
awaddr: addressawprot: protection bitsawvalid: valid signal, qualifies all channel fieldsawready: ready signal, back-pressure from sink
- Write data channel
wdata: write datawstrb: write strobewvalid: valid signal, qualifies all channel fieldswready: ready signal, back-pressure from sink
- Write response channel
bresp: write responsebvalid: valid signal, qualifies all channel fieldsbready: ready signal, back-pressure from sink
- Read address channel
araddr: addressarprot: protection bitsarvalid: valid signal, qualifies all channel fieldsarready: ready signal, back-pressure from sink
- Read data channel
rdata: read datarresp: read responservalid: valid signal, qualifies all channel fieldsrready: ready signal, back-pressure from sink
AXI stream signals
tdata: datatvalid: qualifies all other signalstready: indicates sink is ready for datatlast: marks the last cycle of a frametkeep: qualifies data bytes intdatatid: ID signal, can be used for routingtdest: destination signal, can be used for routingtuser: additional sideband data