Ethernet interface modules for Cocotb
GitHub repository: https://github.com/alexforencich/cocotbext-eth
Introduction
Ethernet interface models for cocotb.
Installation
Installation from pip (release version, stable):
$ pip install cocotbext-eth
Installation from git (latest development version, potentially unstable):
$ pip install https://github.com/alexforencich/cocotbext-eth/archive/master.zip
Installation for active development:
$ git clone https://github.com/alexforencich/cocotbext-eth
$ pip install -e cocotbext-eth
Documentation and usage examples
See the tests directory and verilog-ethernet for complete testbenches using these modules.
GMII
The GmiiSource and GmiiSink classes can be used to drive, receive, and monitor GMII traffic. The GmiiSource drives GMII traffic into a design. The GmiiSink receives GMII traffic, including monitoring internal interfaces. The GmiiPhy class is a wrapper around GmiiSource and GmiiSink that also provides clocking and rate-switching to emulate a GMII PHY chip.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.eth import GmiiSource, GmiiSink
gmii_source = GmiiSource(dut.rxd, dut.rx_er, dut.rx_en, dut.clk, dut.rst)
gmii_sink = GmiiSink(dut.txd, dut.tx_er, dut.tx_en, dut.clk, dut.rst)
To send data into a design with a GmiiSource, call send() or send_nowait(). Accepted data types are iterables that can be converted to bytearray or GmiiFrame objects. Optionally, call wait() to wait for the transmit operation to complete. Example:
await gmii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await gmii_source.wait()
It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the GmiiFrame object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
await gmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with a GmiiSink, call recv() or recv_nowait(). Optionally call wait() to wait for new receive data.
data = await gmii_sink.recv()
The GmiiPhy class provides a model of a GMII PHY chip. It wraps instances of GmiiSource (rx) and GmiiSink (tx), provides the necessary clocking components, and provides the set_speed() method to change the link speed. set_speed() changes the tx_clk and rx_clk frequencies, switches between gtx_clk and tx_clk, and selects the appropriate mode (MII or GMII) on the source and sink instances. In general, the GmiiPhy class is intended to be used for integration tests where the design expects to be directly connected to an external GMII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, GmiiPhy
gmii_phy = GmiiPhy(dut.txd, dut.tx_er, dut.tx_en, dut.tx_clk, dut.gtx_clk,
dut.rxd, dut.rx_er, dut.rx_en, dut.rx_clk, dut.rst, speed=1000e6)
gmii_phy.set_speed(100e6)
await gmii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await gmii_phy.tx.recv()
Signals
txd,rxd: datatx_er,rx_er: error (when asserted withtx_enorrx_dv)tx_en,rx_dv: data valid
Constructor parameters:
- data: data signal (txd, rxd, etc.)
- er: error signal (tx_er, rx_er, etc.) (optional)
- dv: data valid signal (tx_en, rx_dv, etc.)
- clock: clock signal
- reset: reset signal (optional)
- enable: clock enable (optional)
- mii_select: MII mode select (optional)
Attributes:
- queue_occupancy_bytes: number of bytes in queue
- queue_occupancy_frames: number of frames in queue
- mii_mode: control MII mode when mii_select signal is not connected
Methods
send(frame): send frame (blocking) (source)send_nowait(frame): send frame (non-blocking) (source)recv(): receive a frame as aGmiiFrame(blocking) (sink)recv_nowait(): receive a frame as aGmiiFrame(non-blocking) (sink)count(): returns the number of items in the queue (all)empty(): returns True if the queue is empty (all)idle(): returns True if no transfer is in progress (all) or if the queue is not empty (source)clear(): drop all data in queue (all)wait(): wait for idle (source)wait(timeout=0, timeout_unit='ns'): wait for frame received (sink)
GMII timing diagram
Example transfer via GMII at 1 Gbps:
__ __ __ __ _ __ __ __ __
tx_clk __/ \__/ \__/ \__/ \__/ ... _/ \__/ \__/ \__/ \__
_____ _____ _____ _ _ _____ _____
tx_d[7:0] XXXXXXXXX_55__X_55__X_55__X_ ... _X_72__X_fb__XXXXXXXXXXXX
tx_er ____________________________ ... _________________________
___________________ _____________
tx_en ________/ ... \___________
GmiiFrame object
The GmiiFrame object is a container for a frame to be transferred via GMII. The data field contains the packet data in the form of a list of bytes. error contains the er signal level state associated with each byte as a list of ints.
Attributes:
data: bytearrayerror: error field, optional; list, each entry qualifies the corresponding entry indata.sim_time_start: simulation time of first transfer cycle of frame.sim_time_sfd: simulation time at which the SFD was transferred.sim_time_end: simulation time of last transfer cycle of frame.start_lane: byte lane in which the start control character was transferred.tx_complete: event or callable triggered when frame is transmitted.
Methods:
from_payload(payload, min_len=60): createGmiiFramefrom payload data, inserts preamble, zero-pads frame to minimum length and computes and inserts FCS (class method)from_raw_payload(payload): createGmiiFramefrom payload data, inserts preamble only (class method)get_preamble_len(): locate SFD and return preamble lengthget_preamble(): return preambleget_payload(strip_fcs=True): return payload, optionally strip FCSget_fcs(): return FCScheck_fcs(): returns True if FCS is correctnormalize(): packerrorto the same length asdata, replicating last element if necessary, initialize to list of0if not specified.compact(): removeerrorif all zero
MII
The MiiSource and MiiSink classes can be used to drive, receive, and monitor MII traffic. The MiiSource drives MII traffic into a design. The MiiSink receives MII traffic, including monitoring internal interfaces. The MiiPhy class is a wrapper around MiiSource and MiiSink that also provides clocking and rate-switching to emulate an MII PHY chip.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.eth import MiiSource, MiiSink
mii_source = MiiSource(dut.rxd, dut.rx_er, dut.rx_en, dut.clk, dut.rst)
mii_sink = MiiSink(dut.txd, dut.tx_er, dut.tx_en, dut.clk, dut.rst)
All signals must be passed separately into these classes.
To send data into a design with an MiiSource, call send() or send_nowait(). Accepted data types are iterables that can be converted to bytearray or GmiiFrame objects. Optionally, call wait() to wait for the transmit operation to complete. Example:
await mii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await mii_source.wait()
It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the GmiiFrame object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
await mii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an MiiSink, call recv() or recv_nowait(). Optionally call wait() to wait for new receive data.
data = await mii_sink.recv()
The MiiPhy class provides a model of an MII PHY chip. It wraps instances of MiiSource (rx) and MiiSink (tx), provides the necessary clocking components, and provides the set_speed() method to change the link speed. set_speed() changes the tx_clk and rx_clk frequencies. In general, the MiiPhy class is intended to be used for integration tests where the design expects to be directly connected to an external MII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, MiiPhy
mii_phy = MiiPhy(dut.txd, dut.tx_er, dut.tx_en, dut.tx_clk,
dut.rxd, dut.rx_er, dut.rx_en, dut.rx_clk, dut.rst, speed=100e6)
mii_phy.set_speed(10e6)
await mii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await mii_phy.tx.recv()
Signals
txd,rxd: datatx_er,rx_er: error (when asserted withtx_enorrx_dv)tx_en,rx_dv: data valid
Constructor parameters:
- data: data signal (txd, rxd, etc.)
- er: error signal (tx_er, rx_er, etc.) (optional)
- dv: data valid signal (tx_en, rx_dv, etc.)
- clock: clock signal
- reset: reset signal (optional)
- enable: clock enable (optional)
Attributes:
- queue_occupancy_bytes: number of bytes in queue
- queue_occupancy_frames: number of frames in queue
Methods
send(frame): send frame (blocking) (source)send_nowait(frame): send frame (non-blocking) (source)recv(): receive a frame as aGmiiFrame(blocking) (sink)recv_nowait(): receive a frame as aGmiiFrame(non-blocking) (sink)count(): returns the number of items in the queue (all)empty(): returns True if the queue is empty (all)idle(): returns True if no transfer is in progress (all) or if the queue is not empty (source)clear(): drop all data in queue (all)wait(): wait for idle (source)wait(timeout=0, timeout_unit='ns'): wait for frame received (sink)
MII timing diagram
Example transfer via MII at 100 Mbps:
_ _ _ _ _ _ _ _ _ _
tx_clk _/ \_/ \_/ \_/ \_/ \_/ ... _/ \_/ \_/ \_/ \_
___ ___ ___ ___ _ _ ___ ___
tx_d[3:0] XXXXXX_5_X_5_X_5_X_5_X_ ... _X_f_X_b_XXXXXXXX
tx_er _______________________ ... _________________
_________________ _________
tx_en _____/ ... \_______
RGMII
The RgmiiSource and RgmiiSink classes can be used to drive, receive, and monitor RGMII traffic. The RgmiiSource drives RGMII traffic into a design. The RgmiiSink receives RGMII traffic, including monitoring internal interfaces. The RgmiiPhy class is a wrapper around RgmiiSource and RgmiiSink that also provides clocking and rate-switching to emulate an RGMII PHY chip.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.eth import RgmiiSource, RgmiiSink
rgmii_source = RgmiiSource(dut.rxd, dut.rx_ctl, dut.clk, dut.rst)
rgmii_sink = RgmiiSink(dut.txd, dut.tx_ctl, dut.clk, dut.rst)
All signals must be passed separately into these classes.
To send data into a design with an RgmiiSource, call send() or send_nowait(). Accepted data types are iterables that can be converted to bytearray or GmiiFrame objects. Optionally, call wait() to wait for the transmit operation to complete. Example:
await rgmii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await rgmii_source.wait()
It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the GmiiFrame object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
await rgmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an RgmiiSink, call recv() or recv_nowait(). Optionally call wait() to wait for new receive data.
data = await rgmii_sink.recv()
The RgmiiPhy class provides a model of an RGMII PHY chip. It wraps instances of RgmiiSource (rx) and RgmiiSink (tx), provides the necessary clocking components, and provides the set_speed() method to change the link speed. set_speed() changes the rx_clk frequency and selects the appropriate mode (SDR or DDR) on the source and sink instances. In general, the RgmiiPhy class is intended to be used for integration tests where the design expects to be directly connected to an external RGMII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, RgmiiPhy
rgmii_phy = RgmiiPhy(dut.txd, dut.tx_ctl, dut.tx_clk,
dut.rxd, dut.rx_ctl, dut.rx_clk, dut.rst, speed=1000e6)
rgmii_phy.set_speed(100e6)
await rgmii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await rgmii_phy.tx.recv()
Signals
txd,rxd: data (DDR)tx_ctl,rx_ctl: control (DDR, combination of valid and error)
Constructor parameters:
- data: data signal (txd, rxd, etc.)
- ctrl: control
- clock: clock signal
- reset: reset signal (optional)
- enable: clock enable (optional)
- mii_select: MII mode select (optional)
Attributes:
- queue_occupancy_bytes: number of bytes in queue
- queue_occupancy_frames: number of frames in queue
- mii_mode: control MII mode when mii_select signal is not connected
Methods
send(frame): send frame (blocking) (source)send_nowait(frame): send frame (non-blocking) (source)recv(): receive a frame as aGmiiFrame(blocking) (sink)recv_nowait(): receive a frame as aGmiiFrame(non-blocking) (sink)count(): returns the number of items in the queue (all)empty(): returns True if the queue is empty (all)idle(): returns True if no transfer is in progress (all) or if the queue is not empty (source)clear(): drop all data in queue (all)wait(): wait for idle (source)wait(timeout=0, timeout_unit='ns'): wait for frame received (sink)
RGMII timing diagram
Example transfer via RGMII at 1 Gbps:
___ ___ ___ _ ___ ___
tx_clk _/ \___/ \___/ \___/ ... _/ \___/ \___
___ ___ ___ ___ ___ ___ ___
tx_d[3:0] XXXXXXXX_5_X_5_X_5_X_5_X_5_ ... _f_X_b_XXXXXXXXXX
___________________ _______
tx_ctl _______/ ... \_________
XGMII
The XgmiiSource and XgmiiSink classes can be used to drive, receive, and monitor XGMII traffic. The XgmiiSource drives XGMII traffic into a design. The XgmiiSink receives XGMII traffic, including monitoring internal interfaces. The modules are capable of operating with XGMII interface widths of 32 or 64 bits.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.eth import XgmiiSource, XgmiiSink
xgmii_source = XgmiiSource(dut.rxd, dut.rxc, dut.clk, dut.rst)
xgmii_sink = XgmiiSink(dut.txd, dut.txc, dut.clk, dut.rst)
All signals must be passed separately into these classes.
To send data into a design with an XgmiiSource, call send() or send_nowait(). Accepted data types are iterables that can be converted to bytearray or XgmiiFrame objects. Optionally, call wait() to wait for the transmit operation to complete. Example:
await xgmii_source.send(XgmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await xgmii_source.wait()
It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the XgmiiFrame object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
frame = XgmiiFrame.from_payload(b'test data', tx_complete=Event())
await xgmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an XgmiiSink, call recv() or recv_nowait(). Optionally call wait() to wait for new receive data.
data = await xgmii_sink.recv()
Signals
txd,rxd: datatxc,rxc: control
Constructor parameters:
- data: data signal (txd, rxd, etc.)
- ctrl: control signal (txc, rxc, etc.)
- clock: clock signal
- reset: reset signal (optional)
- enable: clock enable (optional)
Attributes:
- queue_occupancy_bytes: number of bytes in queue
- queue_occupancy_frames: number of frames in queue
Methods
send(frame): send frame (blocking) (source)send_nowait(frame): send frame (non-blocking) (source)recv(): receive a frame as anXgmiiFrame(blocking) (sink)recv_nowait(): receive a frame as anXgmiiFrame(non-blocking) (sink)count(): returns the number of items in the queue (all)empty(): returns True if the queue is empty (all)idle(): returns True if no transfer is in progress (all) or if the queue is not empty (source)clear(): drop all data in queue (all)wait(): wait for idle (source)wait(timeout=0, timeout_unit='ns'): wait for frame received (sink)
XGMII timing diagram
Example transfer via 64-bit XGMII:
__ __ __ __ __ _ __ __
tx_clk __/ \__/ \__/ \__/ \__/ \__/ ... _/ \__/ \__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[63:56] __X_07__X_d5__X_51__X_01__X_09__X_ ... _X_fb__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[55:48] __X_07__X_55__X_5a__X_00__X_08__X_ ... _X_72__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[47:40] __X_07__X_55__X_d5__X_00__X_07__X_ ... _X_0d__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[39:32] __X_07__X_55__X_d4__X_80__X_06__X_ ... _X_37__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[31:24] __X_07__X_55__X_d3__X_55__X_05__X_ ... _X_2d__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[23:16] __X_07__X_55__X_d2__X_54__X_04__X_ ... _X_2c__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[15:8] __X_07__X_55__X_d1__X_53__X_03__X_ ... _X_2b__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[7:0] __X_07__X_fb__X_da__X_52__X_02__X_ ... _X_2a__X_fd__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txc[7:0] __X_ff__X_01__X_00__X_00__X_00__X_ ... _X_00__X_ff__
XgmiiFrame object
The XgmiiFrame object is a container for a frame to be transferred via XGMII. The data field contains the packet data in the form of a list of bytes. ctrl contains the control signal level state associated with each byte as a list of ints. When ctrl is high, the corresponding data byte is interpreted as an XGMII control character.
Attributes:
data: bytearrayctrl: control field, optional; list, each entry qualifies the corresponding entry indataas an XGMII control character.sim_time_start: simulation time of first transfer cycle of frame.sim_time_sfd: simulation time at which the SFD was transferred.sim_time_end: simulation time of last transfer cycle of frame.start_lane: byte lane in which the start control character was transferred.tx_complete: event or callable triggered when frame is transmitted.
Methods:
from_payload(payload, min_len=60): createXgmiiFramefrom payload data, inserts preamble, zero-pads frame to minimum length and computes and inserts FCS (class method)from_raw_payload(payload): createXgmiiFramefrom payload data, inserts preamble only (class method)get_preamble_len(): locate SFD and return preamble lengthget_preamble(): return preambleget_payload(strip_fcs=True): return payload, optionally strip FCSget_fcs(): return FCScheck_fcs(): returns True if FCS is correctnormalize(): packerrorto the same length asdata, replicating last element if necessary, initialize to list of0if not specified.compact(): removeerrorif all zero
PTP clock
The PtpClock class implements a PTP hardware clock that produces IEEE 1588 format 96 and 64 bit PTP timestamps.
To use this module, import it and connect it to the DUT:
from cocotbext.eth import PtpClock
ptp_clock = PtpClock(
ts_96=dut.ts_96,
ts_64=dut.ts_64,
ts_step=dut.ts_step,
pps=dut.pps,
clock=dut.clk,
reset=dut.reset,
period_ns=6.4
)
Once the clock is instantiated, it will generate a continuous stream of monotonically increasing PTP timestamps on every clock edge.
Signals
ts_96: 96-bit timestamp (48 bit seconds, 32 bit ns, 16 bit fractional ns)ts_64: 64-bit timestamp (48 bit ns, 16 bit fractional ns)ts_step: step output, pulsed when non-monotonic step occurspps: pulse-per-second output, pulsed when ts_96 seconds field increments
Constructor parameters:
- ts_96: 96-bit timestamp signal (optional)
- ts_64: 64-bit timestamp signal (optional)
- ts_step: timestamp step signal (optional)
- pps: pulse-per-second signal (optional)
- clock: clock
- reset: reset (optional)
- period_ns: clock period (nanoseconds)
Attributes:
- ts_96_s: current 96-bit timestamp seconds field
- ts_96_ns: current 96-bit timestamp ns field
- ts_96_fns: current 96-bit timestamp fractional ns field
- ts_64_ns: current 64-bit timestamp ns field
- ts_64_fns: current 64-bit timestamp fractional ns field
Methods
set_period(ns, fns): set clock period from separate fieldsset_drift(ns, fns, rate): set clock drift from separate fieldsset_period_ns(t): set clock period in ns (float)get_period_ns(): return current clock period in ns (float)set_ts_96(ts_s, ts_ns=None, ts_fns=None): set 96-bit timestamp from integer or from separate fieldsset_ts_96_ns(t): set 96-bit timestamp from ns (float)set_ts_96_s(t): set 96-bit timestamp from seconds (float)get_ts_96(): return current 96-bit timestamp as an integerget_ts_96_ns(): return current 96-bit timestamp in ns (float)get_ts_96_s(): return current 96-bit timestamp in seconds (float)set_ts_64(ts_ns, ts_fns=None): set 64-bit timestamp from integer or from separate fieldsset_ts_64_ns(t): set 64-bit timestamp from ns (float)set_ts_64_s(t): set 64-bit timestamp from seconds (float)get_ts_64(): return current 64-bit timestamp as an integerget_ts_64_ns(): return current 64-bit timestamp in ns (float)get_ts_64_s(): return current 64-bit timestamp in seconds (float)