34 Commits

Author SHA1 Message Date
Alex Forencich
facd770568 Release v0.1.8 2021-03-17 18:52:05 -07:00
Alex Forencich
faad752b9a Use release version of cocotb for CI 2021-03-17 18:14:50 -07:00
Alex Forencich
5855644670 Use cocotb async queues 2021-03-17 18:13:04 -07:00
Alex Forencich
a81d43216e Bump to dev version v0.1.7 2021-03-06 18:39:03 -08:00
Alex Forencich
50a60b649e Release v0.1.6 2021-03-06 18:38:18 -08:00
Alex Forencich
bf19679007 Update readme 2021-03-06 18:34:50 -08:00
Alex Forencich
6835e921f8 Add reset_active_level parameters 2021-03-06 18:34:20 -08:00
Alex Forencich
1b180a1d64 Clean up reset implementation 2021-03-06 18:31:40 -08:00
Alex Forencich
63b60341ed Clear sim time fields on start transmit 2021-01-08 16:25:28 -08:00
Alex Forencich
f92bbaaa70 Update readme 2021-01-05 23:16:00 -08:00
Alex Forencich
a73c0b734e Support override of tx_complete 2021-01-04 22:42:53 -08:00
Alex Forencich
aa97848450 Store SFD transfer sim time 2021-01-03 23:28:33 -08:00
Alex Forencich
1bd01ae879 Fix RGMII error indication 2021-01-03 23:26:31 -08:00
Alex Forencich
cfbc80c0cb Improve transfer tracking 2021-01-03 22:55:09 -08:00
Alex Forencich
1d5688778a Add clear method 2021-01-03 12:52:21 -08:00
Alex Forencich
71d7c7e9d2 Remove extraneous code 2020-12-31 03:12:14 -08:00
Alex Forencich
30bc6f68a1 Rework sim_build output directory, fix default makefile target 2020-12-29 14:25:52 -08:00
Alex Forencich
16eaea6967 Bump to dev version v0.1.5 2020-12-27 23:29:34 -08:00
Alex Forencich
a1b88dfbf1 Release v0.1.4 2020-12-27 23:28:49 -08:00
Alex Forencich
15ff9f9907 Update readme 2020-12-27 23:06:20 -08:00
Alex Forencich
cda8910ccf Add PHY wrapper tests 2020-12-26 23:50:53 -08:00
Alex Forencich
7404c5cf5f Add GmiiPhy 2020-12-24 19:09:36 -08:00
Alex Forencich
2e0502fc6e Use F-string repr 2020-12-24 19:09:15 -08:00
Alex Forencich
5eeffc0c68 Rework resets 2020-12-24 19:08:41 -08:00
Alex Forencich
a3df4ed1f9 Add call to superclass init 2020-12-24 15:14:11 -08:00
Alex Forencich
318f48785a Remove inherit from object 2020-12-24 14:40:03 -08:00
Alex Forencich
4b768e267d Add RgmiiPhy 2020-12-24 14:35:51 -08:00
Alex Forencich
a8a1bbde30 Add mii_mode attribute to GMII and RGMII models 2020-12-24 14:35:16 -08:00
Alex Forencich
e7a3850dd2 Add MiiPhy 2020-12-24 14:34:35 -08:00
Alex Forencich
d97208d3c8 Remove await ReadOnly 2020-12-24 00:28:16 -08:00
Alex Forencich
0456e4af60 Update readme 2020-12-24 00:25:37 -08:00
Alex Forencich
c550a7315e Update readme 2020-12-22 16:58:16 -08:00
Alex Forencich
e858721dcc Add MII models 2020-12-22 16:54:41 -08:00
Alex Forencich
19306c9b55 Bump to dev version v0.1.3 2020-12-18 15:50:00 -08:00
30 changed files with 2397 additions and 322 deletions

278
README.md
View File

@@ -3,6 +3,7 @@
[![Build Status](https://github.com/alexforencich/cocotbext-eth/workflows/Regression%20Tests/badge.svg?branch=master)](https://github.com/alexforencich/cocotbext-eth/actions/) [![Build Status](https://github.com/alexforencich/cocotbext-eth/workflows/Regression%20Tests/badge.svg?branch=master)](https://github.com/alexforencich/cocotbext-eth/actions/)
[![codecov](https://codecov.io/gh/alexforencich/cocotbext-eth/branch/master/graph/badge.svg)](https://codecov.io/gh/alexforencich/cocotbext-eth) [![codecov](https://codecov.io/gh/alexforencich/cocotbext-eth/branch/master/graph/badge.svg)](https://codecov.io/gh/alexforencich/cocotbext-eth)
[![PyPI version](https://badge.fury.io/py/cocotbext-eth.svg)](https://pypi.org/project/cocotbext-eth) [![PyPI version](https://badge.fury.io/py/cocotbext-eth.svg)](https://pypi.org/project/cocotbext-eth)
[![Downloads](https://pepy.tech/badge/cocotbext-eth)](https://pepy.tech/project/cocotbext-eth)
GitHub repository: https://github.com/alexforencich/cocotbext-eth GitHub repository: https://github.com/alexforencich/cocotbext-eth
@@ -31,7 +32,7 @@ See the `tests` directory and [verilog-ethernet](https://github.com/alexforencic
### GMII ### GMII
The `GmiiSource` and `GmiiSink` classes can be used to drive, receive, and monitor GMII traffic. The `GmiiSource` drives GMII traffic into a design. The `GmiiSink` receives GMII traffic, including monitoring internal interfaces. The `GmiiSource` and `GmiiSink` classes can be used to drive, receive, and monitor GMII traffic. The `GmiiSource` drives GMII traffic into a design. The `GmiiSink` receives GMII traffic, including monitoring internal interfaces. The `GmiiPhy` class is a wrapper around `GmiiSource` and `GmiiSink` that also provides clocking and rate-switching to emulate a GMII PHY chip.
To use these modules, import the one you need and connect it to the DUT: To use these modules, import the one you need and connect it to the DUT:
@@ -40,15 +41,34 @@ To use these modules, import the one you need and connect it to the DUT:
gmii_source = GmiiSource(dut.rxd, dut.rx_er, dut.rx_en, dut.clk, dut.rst) gmii_source = GmiiSource(dut.rxd, dut.rx_er, dut.rx_en, dut.clk, dut.rst)
gmii_sink = GmiiSink(dut.txd, dut.tx_er, dut.tx_en, dut.clk, dut.rst) gmii_sink = GmiiSink(dut.txd, dut.tx_er, dut.tx_en, dut.clk, dut.rst)
To send data into a design with an `GmiiSource`, call `send()`. Accepted data types are iterables that can be converted to bytearray or `GmiiFrame` objects. Call `wait()` to wait for the transmit operation to complete. Example: To send data into a design with a `GmiiSource`, call `send()` or `send_nowait()`. Accepted data types are iterables that can be converted to bytearray or `GmiiFrame` objects. Optionally, call `wait()` to wait for the transmit operation to complete. Example:
gmii_source.send(GmiiFrame.from_payload(b'test data')) await gmii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await gmii_source.wait() await gmii_source.wait()
To receive data with a `GmiiSink`, call `recv()`. Call `wait()` to wait for new receive data. It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the `GmiiFrame` object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
await gmii_sink.wait() frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
data = gmii_sink.recv() await gmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with a `GmiiSink`, call `recv()` or `recv_nowait()`. Optionally call `wait()` to wait for new receive data.
data = await gmii_sink.recv()
The `GmiiPhy` class provides a model of a GMII PHY chip. It wraps instances of `GmiiSource` (`rx`) and `GmiiSink` (`tx`), provides the necessary clocking components, and provides the `set_speed()` method to change the link speed. `set_speed()` changes the `tx_clk` and `rx_clk` frequencies, switches between `gtx_clk` and `tx_clk`, and selects the appropriate mode (MII or GMII) on the source and sink instances. In general, the `GmiiPhy` class is intended to be used for integration tests where the design expects to be directly connected to an external GMII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, GmiiPhy
gmii_phy = GmiiPhy(dut.txd, dut.tx_er, dut.tx_en, dut.tx_clk, dut.gtx_clk,
dut.rxd, dut.rx_er, dut.rx_en, dut.rx_clk, dut.rst, speed=1000e6)
gmii_phy.set_speed(100e6)
await gmii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await gmii_phy.tx.recv()
#### Signals #### Signals
@@ -65,6 +85,124 @@ To receive data with a `GmiiSink`, call `recv()`. Call `wait()` to wait for new
* _reset_: reset signal (optional) * _reset_: reset signal (optional)
* _enable_: clock enable (optional) * _enable_: clock enable (optional)
* _mii_select_: MII mode select (optional) * _mii_select_: MII mode select (optional)
* _reset_active_level_: reset active level (optional, default `True`)
#### Attributes:
* _queue_occupancy_bytes_: number of bytes in queue
* _queue_occupancy_frames_: number of frames in queue
* _mii_mode_: control MII mode when _mii_select_ signal is not connected
#### Methods
* `send(frame)`: send _frame_ (blocking) (source)
* `send_nowait(frame)`: send _frame_ (non-blocking) (source)
* `recv()`: receive a frame as a `GmiiFrame` (blocking) (sink)
* `recv_nowait()`: receive a frame as a `GmiiFrame` (non-blocking) (sink)
* `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
#### GMII timing diagram
Example transfer via GMII at 1 Gbps:
__ __ __ __ _ __ __ __ __
tx_clk __/ \__/ \__/ \__/ \__/ ... _/ \__/ \__/ \__/ \__
_____ _____ _____ _ _ _____ _____
tx_d[7:0] XXXXXXXXX_55__X_55__X_55__X_ ... _X_72__X_fb__XXXXXXXXXXXX
tx_er ____________________________ ... _________________________
___________________ _____________
tx_en ________/ ... \___________
#### GmiiFrame object
The `GmiiFrame` object is a container for a frame to be transferred via GMII. The `data` field contains the packet data in the form of a list of bytes. `error` contains the `er` signal level state associated with each byte as a list of ints.
Attributes:
* `data`: bytearray
* `error`: error field, optional; list, each entry qualifies the corresponding entry in `data`.
* `sim_time_start`: simulation time of first transfer cycle of frame.
* `sim_time_sfd`: simulation time at which the SFD was transferred.
* `sim_time_end`: simulation time of last transfer cycle of frame.
* `start_lane`: byte lane in which the start control character was transferred.
* `tx_complete`: event or callable triggered when frame is transmitted.
Methods:
* `from_payload(payload, min_len=60)`: create `GmiiFrame` from payload data, inserts preamble, zero-pads frame to minimum length and computes and inserts FCS (class method)
* `from_raw_payload(payload)`: create `GmiiFrame` from payload data, inserts preamble only (class method)
* `get_preamble_len()`: locate SFD and return preamble length
* `get_preamble()`: return preamble
* `get_payload(strip_fcs=True)`: return payload, optionally strip FCS
* `get_fcs()`: return FCS
* `check_fcs()`: returns _True_ if FCS is correct
* `normalize()`: pack `error` to the same length as `data`, replicating last element if necessary, initialize to list of `0` if not specified.
* `compact()`: remove `error` if all zero
### MII
The `MiiSource` and `MiiSink` classes can be used to drive, receive, and monitor MII traffic. The `MiiSource` drives MII traffic into a design. The `MiiSink` receives MII traffic, including monitoring internal interfaces. The `MiiPhy` class is a wrapper around `MiiSource` and `MiiSink` that also provides clocking and rate-switching to emulate an MII PHY chip.
To use these modules, import the one you need and connect it to the DUT:
from cocotbext.eth import MiiSource, MiiSink
mii_source = MiiSource(dut.rxd, dut.rx_er, dut.rx_en, dut.clk, dut.rst)
mii_sink = MiiSink(dut.txd, dut.tx_er, dut.tx_en, dut.clk, dut.rst)
All signals must be passed separately into these classes.
To send data into a design with an `MiiSource`, call `send()` or `send_nowait()`. Accepted data types are iterables that can be converted to bytearray or `GmiiFrame` objects. Optionally, call `wait()` to wait for the transmit operation to complete. Example:
await mii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await mii_source.wait()
It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the `GmiiFrame` object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
await mii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an `MiiSink`, call `recv()` or `recv_nowait()`. Optionally call `wait()` to wait for new receive data.
data = await mii_sink.recv()
The `MiiPhy` class provides a model of an MII PHY chip. It wraps instances of `MiiSource` (`rx`) and `MiiSink` (`tx`), provides the necessary clocking components, and provides the `set_speed()` method to change the link speed. `set_speed()` changes the `tx_clk` and `rx_clk` frequencies. In general, the `MiiPhy` class is intended to be used for integration tests where the design expects to be directly connected to an external MII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, MiiPhy
mii_phy = MiiPhy(dut.txd, dut.tx_er, dut.tx_en, dut.tx_clk,
dut.rxd, dut.rx_er, dut.rx_en, dut.rx_clk, dut.rst, speed=100e6)
mii_phy.set_speed(10e6)
await mii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await mii_phy.tx.recv()
#### Signals
* `txd`, `rxd`: data
* `tx_er`, `rx_er`: error (when asserted with `tx_en` or `rx_dv`)
* `tx_en`, `rx_dv`: data valid
#### Constructor parameters:
* _data_: data signal (txd, rxd, etc.)
* _er_: error signal (tx_er, rx_er, etc.) (optional)
* _dv_: data valid signal (tx_en, rx_dv, etc.)
* _clock_: clock signal
* _reset_: reset signal (optional)
* _enable_: clock enable (optional)
* _reset_active_level_: reset active level (optional, default `True`)
#### Attributes: #### Attributes:
@@ -80,34 +218,27 @@ To receive data with a `GmiiSink`, call `recv()`. Call `wait()` to wait for new
* `count()`: returns the number of items in the queue (all) * `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all) * `empty()`: returns _True_ if the queue is empty (all)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source) * `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink) * `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
#### GmiiFrame object #### MII timing diagram
The `GmiiFrame` object is a container for a frame to be transferred via GMII. The `data` field contains the packet data in the form of a list of bytes. `error` contains the `er` signal level state associated with each byte as a list of ints. Example transfer via MII at 100 Mbps:
Attributes: _ _ _ _ _ _ _ _ _ _
tx_clk _/ \_/ \_/ \_/ \_/ \_/ ... _/ \_/ \_/ \_/ \_
___ ___ ___ ___ _ _ ___ ___
tx_d[3:0] XXXXXX_5_X_5_X_5_X_5_X_ ... _X_f_X_b_XXXXXXXX
* `data`: bytearray tx_er _______________________ ... _________________
* `error`: error field, optional; list, each entry qualifies the corresponding entry in `data`. _________________ _________
* `rx_sim_time`: simulation time when packet was received by sink. tx_en _____/ ... \_______
Methods:
* `from_payload(payload, min_len=60)`: create `GmiiFrame` from payload data, inserts preamble, zero-pads frame to minimum length and computes and inserts FCS (class method)
* `from_raw_payload(payload)`: create `GmiiFrame` from payload data, inserts preamble only (class method)
* `get_preamble_len()`: locate SFD and return preamble length
* `get_preamble()`: return preamble
* `get_payload(strip_fcs=True)`: return payload, optionally strip FCS
* `get_fcs()`: return FCS
* `check_fcs()`: returns _True_ if FCS is correct
* `normalize()`: pack `error` to the same length as `data`, replicating last element if necessary, initialize to list of `0` if not specified.
* `compact()`: remove `error` if all zero
### RGMII ### RGMII
The `RgmiiSource` and `RgmiiSink` classes can be used to drive, receive, and monitor RGMII traffic. The `RgmiiSource` drives RGMII traffic into a design. The `RgmiiSink` receives RGMII traffic, including monitoring internal interfaces. The `RgmiiSource` and `RgmiiSink` classes can be used to drive, receive, and monitor RGMII traffic. The `RgmiiSource` drives RGMII traffic into a design. The `RgmiiSink` receives RGMII traffic, including monitoring internal interfaces. The `RgmiiPhy` class is a wrapper around `RgmiiSource` and `RgmiiSink` that also provides clocking and rate-switching to emulate an RGMII PHY chip.
To use these modules, import the one you need and connect it to the DUT: To use these modules, import the one you need and connect it to the DUT:
@@ -118,15 +249,34 @@ To use these modules, import the one you need and connect it to the DUT:
All signals must be passed separately into these classes. All signals must be passed separately into these classes.
To send data into a design with an `RgmiiSource`, call `send()`. Accepted data types are iterables that can be converted to bytearray or `GmiiFrame` objects. Call `wait()` to wait for the transmit operation to complete. Example: To send data into a design with an `RgmiiSource`, call `send()` or `send_nowait()`. Accepted data types are iterables that can be converted to bytearray or `GmiiFrame` objects. Optionally, call `wait()` to wait for the transmit operation to complete. Example:
rgmii_source.send(GmiiFrame.from_payload(b'test data')) await rgmii_source.send(GmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await rgmii_source.wait() await rgmii_source.wait()
To receive data with an `RgmiiSink`, call `recv()`. Call `wait()` to wait for new receive data. It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the `GmiiFrame` object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
await rgmii_sink.wait() frame = GmiiFrame.from_payload(b'test data', tx_complete=Event())
data = rgmii_sink.recv() await rgmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an `RgmiiSink`, call `recv()` or `recv_nowait()`. Optionally call `wait()` to wait for new receive data.
data = await rgmii_sink.recv()
The `RgmiiPhy` class provides a model of an RGMII PHY chip. It wraps instances of `RgmiiSource` (`rx`) and `RgmiiSink` (`tx`), provides the necessary clocking components, and provides the `set_speed()` method to change the link speed. `set_speed()` changes the `rx_clk` frequency and selects the appropriate mode (SDR or DDR) on the source and sink instances. In general, the `RgmiiPhy` class is intended to be used for integration tests where the design expects to be directly connected to an external RGMII PHY chip and contains all of the necessary IO and clocking logic. Example:
from cocotbext.eth import GmiiFrame, RgmiiPhy
rgmii_phy = RgmiiPhy(dut.txd, dut.tx_ctl, dut.tx_clk,
dut.rxd, dut.rx_ctl, dut.rx_clk, dut.rst, speed=1000e6)
rgmii_phy.set_speed(100e6)
await rgmii_phy.rx.send(GmiiFrame.from_payload(b'test RX data'))
tx_data = await rgmii_phy.tx.recv()
#### Signals #### Signals
@@ -141,11 +291,13 @@ To receive data with an `RgmiiSink`, call `recv()`. Call `wait()` to wait for n
* _reset_: reset signal (optional) * _reset_: reset signal (optional)
* _enable_: clock enable (optional) * _enable_: clock enable (optional)
* _mii_select_: MII mode select (optional) * _mii_select_: MII mode select (optional)
* _reset_active_level_: reset active level (optional, default `True`)
#### Attributes: #### Attributes:
* _queue_occupancy_bytes_: number of bytes in queue * _queue_occupancy_bytes_: number of bytes in queue
* _queue_occupancy_frames_: number of frames in queue * _queue_occupancy_frames_: number of frames in queue
* _mii_mode_: control MII mode when _mii_select_ signal is not connected
#### Methods #### Methods
@@ -156,9 +308,22 @@ To receive data with an `RgmiiSink`, call `recv()`. Call `wait()` to wait for n
* `count()`: returns the number of items in the queue (all) * `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all) * `empty()`: returns _True_ if the queue is empty (all)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source) * `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink) * `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
#### RGMII timing diagram
Example transfer via RGMII at 1 Gbps:
___ ___ ___ _ ___ ___
tx_clk _/ \___/ \___/ \___/ ... _/ \___/ \___
___ ___ ___ ___ ___ ___ ___
tx_d[3:0] XXXXXXXX_5_X_5_X_5_X_5_X_5_ ... _f_X_b_XXXXXXXXXX
___________________ _______
tx_ctl _______/ ... \_________
### XGMII ### XGMII
The `XgmiiSource` and `XgmiiSink` classes can be used to drive, receive, and monitor XGMII traffic. The `XgmiiSource` drives XGMII traffic into a design. The `XgmiiSink` receives XGMII traffic, including monitoring internal interfaces. The modules are capable of operating with XGMII interface widths of 32 or 64 bits. The `XgmiiSource` and `XgmiiSink` classes can be used to drive, receive, and monitor XGMII traffic. The `XgmiiSource` drives XGMII traffic into a design. The `XgmiiSink` receives XGMII traffic, including monitoring internal interfaces. The modules are capable of operating with XGMII interface widths of 32 or 64 bits.
@@ -172,15 +337,22 @@ To use these modules, import the one you need and connect it to the DUT:
All signals must be passed separately into these classes. All signals must be passed separately into these classes.
To send data into a design with an `XgmiiSource`, call `send()`. Accepted data types are iterables that can be converted to bytearray or `XgmiiFrame` objects. Call `wait()` to wait for the transmit operation to complete. Example: To send data into a design with an `XgmiiSource`, call `send()` or `send_nowait()`. Accepted data types are iterables that can be converted to bytearray or `XgmiiFrame` objects. Optionally, call `wait()` to wait for the transmit operation to complete. Example:
xgmii_source.send(XgmiiFrame.from_payload(b'test data')) await xgmii_source.send(XgmiiFrame.from_payload(b'test data'))
# wait for operation to complete (optional)
await xgmii_source.wait() await xgmii_source.wait()
To receive data with an `XgmiiSink`, call `recv()`. Call `wait()` to wait for new receive data. It is also possible to wait for the transmission of a specific frame to complete by passing an event in the tx_complete field of the `XgmiiFrame` object, and then awaiting the event. The frame, with simulation time fields set, will be returned in the event data. Example:
await xgmii_sink.wait() frame = XgmiiFrame.from_payload(b'test data', tx_complete=Event())
data = xgmii_sink.recv() await xgmii_source.send(frame)
await frame.tx_complete.wait()
print(frame.tx_complete.data.sim_time_sfd)
To receive data with an `XgmiiSink`, call `recv()` or `recv_nowait()`. Optionally call `wait()` to wait for new receive data.
data = await xgmii_sink.recv()
#### Signals #### Signals
@@ -194,6 +366,7 @@ To receive data with an `XgmiiSink`, call `recv()`. Call `wait()` to wait for n
* _clock_: clock signal * _clock_: clock signal
* _reset_: reset signal (optional) * _reset_: reset signal (optional)
* _enable_: clock enable (optional) * _enable_: clock enable (optional)
* _reset_active_level_: reset active level (optional, default `True`)
#### Attributes: #### Attributes:
@@ -209,9 +382,36 @@ To receive data with an `XgmiiSink`, call `recv()`. Call `wait()` to wait for n
* `count()`: returns the number of items in the queue (all) * `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all) * `empty()`: returns _True_ if the queue is empty (all)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source) * `wait()`: wait for idle (source)
* `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink) * `wait(timeout=0, timeout_unit='ns')`: wait for frame received (sink)
#### XGMII timing diagram
Example transfer via 64-bit XGMII:
__ __ __ __ __ _ __ __
tx_clk __/ \__/ \__/ \__/ \__/ \__/ ... _/ \__/ \__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[63:56] __X_07__X_d5__X_51__X_01__X_09__X_ ... _X_fb__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[55:48] __X_07__X_55__X_5a__X_00__X_08__X_ ... _X_72__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[47:40] __X_07__X_55__X_d5__X_00__X_07__X_ ... _X_0d__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[39:32] __X_07__X_55__X_d4__X_80__X_06__X_ ... _X_37__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[31:24] __X_07__X_55__X_d3__X_55__X_05__X_ ... _X_2d__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[23:16] __X_07__X_55__X_d2__X_54__X_04__X_ ... _X_2c__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[15:8] __X_07__X_55__X_d1__X_53__X_03__X_ ... _X_2b__X_07__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txd[7:0] __X_07__X_fb__X_da__X_52__X_02__X_ ... _X_2a__X_fd__
__ _____ _____ _____ _____ _____ _ _ _____ _____
txc[7:0] __X_ff__X_01__X_00__X_00__X_00__X_ ... _X_00__X_ff__
#### XgmiiFrame object #### XgmiiFrame object
The `XgmiiFrame` object is a container for a frame to be transferred via XGMII. The `data` field contains the packet data in the form of a list of bytes. `ctrl` contains the control signal level state associated with each byte as a list of ints. When `ctrl` is high, the corresponding `data` byte is interpreted as an XGMII control character. The `XgmiiFrame` object is a container for a frame to be transferred via XGMII. The `data` field contains the packet data in the form of a list of bytes. `ctrl` contains the control signal level state associated with each byte as a list of ints. When `ctrl` is high, the corresponding `data` byte is interpreted as an XGMII control character.
@@ -220,8 +420,11 @@ Attributes:
* `data`: bytearray * `data`: bytearray
* `ctrl`: control field, optional; list, each entry qualifies the corresponding entry in `data` as an XGMII control character. * `ctrl`: control field, optional; list, each entry qualifies the corresponding entry in `data` as an XGMII control character.
* `rx_sim_time`: simulation time when packet was received by sink. * `sim_time_start`: simulation time of first transfer cycle of frame.
* `rx_start_lane`: byte lane that the frame start control character was received in. * `sim_time_sfd`: simulation time at which the SFD was transferred.
* `sim_time_end`: simulation time of last transfer cycle of frame.
* `start_lane`: byte lane in which the start control character was transferred.
* `tx_complete`: event or callable triggered when frame is transmitted.
Methods: Methods:
@@ -270,7 +473,8 @@ Once the clock is instantiated, it will generate a continuous stream of monotoni
* _pps_: pulse-per-second signal (optional) * _pps_: pulse-per-second signal (optional)
* _clock_: clock * _clock_: clock
* _reset_: reset (optional) * _reset_: reset (optional)
* _period_ns_: clock period (nanoseconds) * _reset_active_level_: reset active level (optional, default `True`)
* _period_ns_: clock period (nanoseconds, default `6.4`)
#### Attributes: #### Attributes:

View File

@@ -24,8 +24,9 @@ THE SOFTWARE.
from .version import __version__ from .version import __version__
from .gmii import GmiiFrame, GmiiSource, GmiiSink from .gmii import GmiiFrame, GmiiSource, GmiiSink, GmiiPhy
from .rgmii import RgmiiSource, RgmiiSink from .mii import MiiSource, MiiSink, MiiPhy
from .rgmii import RgmiiSource, RgmiiSink, RgmiiPhy
from .xgmii import XgmiiFrame, XgmiiSource, XgmiiSink from .xgmii import XgmiiFrame, XgmiiSource, XgmiiSink
from .ptp import PtpClock from .ptp import PtpClock

View File

@@ -25,43 +25,53 @@ THE SOFTWARE.
import logging import logging
import struct import struct
import zlib import zlib
from collections import deque
import cocotb import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event from cocotb.queue import Queue
from cocotb.utils import get_sim_time from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time, get_sim_steps
from .version import __version__ from .version import __version__
from .constants import EthPre, ETH_PREAMBLE from .constants import EthPre, ETH_PREAMBLE
from .reset import Reset
class GmiiFrame(object): class GmiiFrame:
def __init__(self, data=None, error=None): def __init__(self, data=None, error=None, tx_complete=None):
self.data = bytearray() self.data = bytearray()
self.error = None self.error = None
self.rx_sim_time = None self.sim_time_start = None
self.sim_time_sfd = None
self.sim_time_end = None
self.tx_complete = None
if type(data) is GmiiFrame: if type(data) is GmiiFrame:
self.data = bytearray(data.data) self.data = bytearray(data.data)
self.error = data.error self.error = data.error
self.rx_sim_time = data.rx_sim_time self.sim_time_start = data.sim_time_start
self.sim_time_sfd = data.sim_time_sfd
self.sim_time_end = data.sim_time_end
self.tx_complete = data.tx_complete
else: else:
self.data = bytearray(data) self.data = bytearray(data)
self.error = error self.error = error
if tx_complete is not None:
self.tx_complete = tx_complete
@classmethod @classmethod
def from_payload(cls, payload, min_len=60): def from_payload(cls, payload, min_len=60, tx_complete=None):
payload = bytearray(payload) payload = bytearray(payload)
if len(payload) < min_len: if len(payload) < min_len:
payload.extend(bytearray(min_len-len(payload))) payload.extend(bytearray(min_len-len(payload)))
payload.extend(struct.pack('<L', zlib.crc32(payload))) payload.extend(struct.pack('<L', zlib.crc32(payload)))
return cls.from_raw_payload(payload) return cls.from_raw_payload(payload, tx_complete=tx_complete)
@classmethod @classmethod
def from_raw_payload(cls, payload): def from_raw_payload(cls, payload, tx_complete=None):
data = bytearray(ETH_PREAMBLE) data = bytearray(ETH_PREAMBLE)
data.extend(payload) data.extend(payload)
return cls(data) return cls(data, tx_complete=tx_complete)
def get_preamble_len(self): def get_preamble_len(self):
return self.data.index(EthPre.SFD)+1 return self.data.index(EthPre.SFD)+1
@@ -93,15 +103,23 @@ class GmiiFrame(object):
if not any(self.error): if not any(self.error):
self.error = None self.error = None
def handle_tx_complete(self):
if isinstance(self.tx_complete, Event):
self.tx_complete.set(self)
elif callable(self.tx_complete):
self.tx_complete(self)
def __eq__(self, other): def __eq__(self, other):
if type(other) is GmiiFrame: if type(other) is GmiiFrame:
return self.data == other.data return self.data == other.data
def __repr__(self): def __repr__(self):
return ( return (
f"{type(self).__name__}(data={repr(self.data)}, " f"{type(self).__name__}(data={self.data!r}, "
f"error={repr(self.error)}, " f"error={self.error!r}, "
f"rx_sim_time={repr(self.rx_sim_time)})" f"sim_time_start={self.sim_time_start!r}, "
f"sim_time_sfd={self.sim_time_sfd!r}, "
f"sim_time_end={self.sim_time_end!r})"
) )
def __len__(self): def __len__(self):
@@ -114,9 +132,9 @@ class GmiiFrame(object):
return bytes(self.data) return bytes(self.data)
class GmiiSource(object): class GmiiSource(Reset):
def __init__(self, data, er, dv, clock, reset=None, enable=None, mii_select=None, *args, **kwargs): def __init__(self, data, er, dv, clock, reset=None, enable=None, mii_select=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.er = er self.er = er
@@ -134,9 +152,12 @@ class GmiiSource(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.idle_event = Event()
self.idle_event.set()
self.ifg = 12 self.ifg = 12
self.mii_mode = False
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -144,8 +165,6 @@ class GmiiSource(object):
self.width = 8 self.width = 8
self.byte_width = 1 self.byte_width = 1
self.reset = reset
assert len(self.data) == 8 assert len(self.data) == 8
self.data.setimmediatevalue(0) self.data.setimmediatevalue(0)
if self.er is not None: if self.er is not None:
@@ -154,29 +173,59 @@ class GmiiSource(object):
assert len(self.dv) == 1 assert len(self.dv) == 1
self.dv.setimmediatevalue(0) self.dv.setimmediatevalue(0)
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def send(self, frame): async def send(self, frame):
self.send_nowait(frame) frame = GmiiFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame): def send_nowait(self, frame):
frame = GmiiFrame(frame) frame = GmiiFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame)
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self): async def wait(self):
while not self.idle(): await self.idle_event.wait()
await RisingEdge(self.clock)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
self.data <= 0
if self.er is not None:
self.er <= 0
self.dv <= 0
async def _run(self): async def _run(self):
frame = None frame = None
@@ -184,19 +233,6 @@ class GmiiSource(object):
self.active = False self.active = False
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
frame = None
ifg_cnt = 0
self.active = False
self.data <= 0
if self.er is not None:
self.er <= 0
self.dv <= 0
continue
await RisingEdge(self.clock) await RisingEdge(self.clock)
if self.enable is None or self.enable.value: if self.enable is None or self.enable.value:
@@ -204,15 +240,21 @@ class GmiiSource(object):
# in IFG # in IFG
ifg_cnt -= 1 ifg_cnt -= 1
elif frame is None and self.queue: elif frame is None and not self.queue.empty():
# send frame # send frame
frame = self.queue.popleft() frame = self.queue.get_nowait()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
frame.sim_time_start = get_sim_time()
frame.sim_time_sfd = None
frame.sim_time_end = None
self.log.info("TX frame: %s", frame) self.log.info("TX frame: %s", frame)
frame.normalize() frame.normalize()
if self.mii_select is not None and self.mii_select.value: if self.mii_select is not None:
self.mii_mode = bool(self.mii_select.value.integer)
if self.mii_mode:
mii_data = [] mii_data = []
mii_error = [] mii_error = []
for b, e in zip(frame.data, frame.error): for b, e in zip(frame.data, frame.error):
@@ -226,13 +268,18 @@ class GmiiSource(object):
self.active = True self.active = True
if frame is not None: if frame is not None:
self.data <= frame.data.pop(0) d = frame.data.pop(0)
if frame.sim_time_sfd is None and d in (EthPre.SFD, 0xD):
frame.sim_time_sfd = get_sim_time()
self.data <= d
if self.er is not None: if self.er is not None:
self.er <= frame.error.pop(0) self.er <= frame.error.pop(0)
self.dv <= 1 self.dv <= 1
if not frame.data: if not frame.data:
ifg_cnt = max(self.ifg, 1) ifg_cnt = max(self.ifg, 1)
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None frame = None
else: else:
self.data <= 0 self.data <= 0
@@ -240,11 +287,12 @@ class GmiiSource(object):
self.er <= 0 self.er <= 0
self.dv <= 0 self.dv <= 0
self.active = False self.active = False
self.idle_event.set()
class GmiiSink(object): class GmiiSink(Reset):
def __init__(self, data, er, dv, clock, reset=None, enable=None, mii_select=None, *args, **kwargs): def __init__(self, data, er, dv, clock, reset=None, enable=None, mii_select=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.er = er self.er = er
@@ -262,8 +310,10 @@ class GmiiSink(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.sync = Event() self.active_event = Event()
self.mii_mode = False
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -271,60 +321,77 @@ class GmiiSink(object):
self.width = 8 self.width = 8
self.byte_width = 1 self.byte_width = 1
self.reset = reset
assert len(self.data) == 8 assert len(self.data) == 8
if self.er is not None: if self.er is not None:
assert len(self.er) == 1 assert len(self.er) == 1
if self.dv is not None: if self.dv is not None:
assert len(self.dv) == 1 assert len(self.dv) == 1
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def recv(self, compact=True): async def recv(self, compact=True):
while self.empty(): frame = await self.queue.get()
self.sync.clear() if self.queue.empty():
await self.sync.wait() self.active_event.clear()
return self.recv_nowait(compact) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
return frame
def recv_nowait(self, compact=True): def recv_nowait(self, compact=True):
if self.queue: if not self.queue.empty():
frame = self.queue.popleft() frame = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
return frame return frame
return None return None
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return not self.active return not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit=None): async def wait(self, timeout=0, timeout_unit=None):
if not self.empty(): if not self.empty():
return return
self.sync.clear()
if timeout: if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit)) await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else: else:
await self.sync.wait() await self.active_event.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self): async def _run(self):
frame = None frame = None
self.active = False self.active = False
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock) await RisingEdge(self.clock)
frame = None
self.active = False
continue
if self.enable is None or self.enable.value: if self.enable is None or self.enable.value:
d_val = self.data.value.integer d_val = self.data.value.integer
@@ -335,12 +402,15 @@ class GmiiSink(object):
if dv_val: if dv_val:
# start of frame # start of frame
frame = GmiiFrame(bytearray(), []) frame = GmiiFrame(bytearray(), [])
frame.rx_sim_time = get_sim_time() frame.sim_time_start = get_sim_time()
else: else:
if not dv_val: if not dv_val:
# end of frame # end of frame
if self.mii_select is not None and self.mii_select.value: if self.mii_select is not None:
self.mii_mode = bool(self.mii_select.value.integer)
if self.mii_mode:
odd = True odd = True
sync = False sync = False
b = 0 b = 0
@@ -362,18 +432,74 @@ class GmiiSink(object):
frame.error = error frame.error = error
frame.compact() frame.compact()
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame) self.log.info("RX frame: %s", frame)
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.put_nowait(frame)
self.sync.set() self.active_event.set()
frame = None frame = None
if frame is not None: if frame is not None:
if frame.sim_time_sfd is None and d_val in (EthPre.SFD, 0xD):
frame.sim_time_sfd = get_sim_time()
frame.data.append(d_val) frame.data.append(d_val)
frame.error.append(er_val) frame.error.append(er_val)
await RisingEdge(self.clock)
class GmiiPhy:
def __init__(self, txd, tx_er, tx_en, tx_clk, gtx_clk, rxd, rx_er, rx_dv, rx_clk,
reset=None, reset_active_level=True, speed=1000e6, *args, **kwargs):
self.gtx_clk = gtx_clk
self.tx_clk = tx_clk
self.rx_clk = rx_clk
super().__init__(*args, **kwargs)
self.tx = GmiiSink(txd, tx_er, tx_en, tx_clk, reset, reset_active_level=reset_active_level)
self.rx = GmiiSource(rxd, rx_er, rx_dv, rx_clk, reset_active_level=reset_active_level)
self.rx_clk.setimmediatevalue(0)
self._clock_cr = None
self.set_speed(speed)
def set_speed(self, speed):
if speed in (10e6, 100e6, 1000e6):
self.speed = speed
else:
raise ValueError("Invalid speed selection")
if self._clock_cr is not None:
self._clock_cr.kill()
if self.speed == 1000e6:
self._clock_cr = cocotb.fork(self._run_clocks(8*1e9/self.speed))
self.tx.mii_mode = False
self.rx.mii_mode = False
self.tx.clock = self.gtx_clk
else:
self._clock_cr = cocotb.fork(self._run_clocks(4*1e9/self.speed))
self.tx.mii_mode = True
self.rx.mii_mode = True
self.tx.clock = self.tx_clk
self.tx.assert_reset()
self.rx.assert_reset()
async def _run_clocks(self, period):
half_period = get_sim_steps(period / 2.0, 'ns')
t = Timer(half_period)
while True:
await t
self.rx_clk <= 1
self.tx_clk <= 1
await t
self.rx_clk <= 0
self.tx_clk <= 0

382
cocotbext/eth/mii.py Normal file
View File

@@ -0,0 +1,382 @@
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import logging
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time, get_sim_steps
from .version import __version__
from .gmii import GmiiFrame
from .constants import EthPre
from .reset import Reset
class MiiSource(Reset):
def __init__(self, data, er, dv, clock, reset=None, enable=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data
self.er = er
self.dv = dv
self.clock = clock
self.reset = reset
self.enable = enable
self.log.info("MII source")
self.log.info("cocotbext-eth version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-eth")
super().__init__(*args, **kwargs)
self.active = False
self.queue = Queue()
self.idle_event = Event()
self.idle_event.set()
self.ifg = 12
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.width = 4
self.byte_width = 1
assert len(self.data) == 4
self.data.setimmediatevalue(0)
if self.er is not None:
assert len(self.er) == 1
self.er.setimmediatevalue(0)
assert len(self.dv) == 1
self.dv.setimmediatevalue(0)
self._run_cr = None
self._init_reset(reset, reset_active_level)
async def send(self, frame):
frame = GmiiFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame):
frame = GmiiFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def count(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def idle(self):
return self.empty() and not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self):
await self.idle_event.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
self.data <= 0
if self.er is not None:
self.er <= 0
self.dv <= 0
async def _run(self):
frame = None
ifg_cnt = 0
self.active = False
while True:
await RisingEdge(self.clock)
if self.enable is None or self.enable.value:
if ifg_cnt > 0:
# in IFG
ifg_cnt -= 1
elif frame is None and not self.queue.empty():
# send frame
frame = self.queue.get_nowait()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
frame.sim_time_start = get_sim_time()
frame.sim_time_sfd = None
frame.sim_time_end = None
self.log.info("TX frame: %s", frame)
frame.normalize()
mii_data = []
mii_error = []
for b, e in zip(frame.data, frame.error):
mii_data.append(b & 0x0F)
mii_data.append(b >> 4)
mii_error.append(e)
mii_error.append(e)
frame.data = mii_data
frame.error = mii_error
self.active = True
if frame is not None:
d = frame.data.pop(0)
if frame.sim_time_sfd is None and d == 0xD:
frame.sim_time_sfd = get_sim_time()
self.data <= d
if self.er is not None:
self.er <= frame.error.pop(0)
self.dv <= 1
if not frame.data:
ifg_cnt = max(self.ifg, 1)
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None
else:
self.data <= 0
if self.er is not None:
self.er <= 0
self.dv <= 0
self.active = False
self.idle_event.set()
class MiiSink(Reset):
def __init__(self, data, er, dv, clock, reset=None, enable=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data
self.er = er
self.dv = dv
self.clock = clock
self.reset = reset
self.enable = enable
self.log.info("MII sink")
self.log.info("cocotbext-eth version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-eth")
super().__init__(*args, **kwargs)
self.active = False
self.queue = Queue()
self.active_event = Event()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.width = 4
self.byte_width = 1
assert len(self.data) == 4
if self.er is not None:
assert len(self.er) == 1
if self.dv is not None:
assert len(self.dv) == 1
self._run_cr = None
self._init_reset(reset, reset_active_level)
async def recv(self, compact=True):
frame = await self.queue.get()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
return frame
def recv_nowait(self, compact=True):
if not self.queue.empty():
frame = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
return frame
return None
def count(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def idle(self):
return not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit=None):
if not self.empty():
return
if timeout:
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else:
await self.active_event.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
frame = None
self.active = False
while True:
await RisingEdge(self.clock)
if self.enable is None or self.enable.value:
d_val = self.data.value.integer
dv_val = self.dv.value.integer
er_val = 0 if self.er is None else self.er.value.integer
if frame is None:
if dv_val:
# start of frame
frame = GmiiFrame(bytearray(), [])
frame.sim_time_start = get_sim_time()
else:
if not dv_val:
# end of frame
odd = True
sync = False
b = 0
be = 0
data = bytearray()
error = []
for n, e in zip(frame.data, frame.error):
odd = not odd
b = (n & 0x0F) << 4 | b >> 4
be |= e
if not sync and b == EthPre.SFD:
odd = True
sync = True
if odd:
data.append(b)
error.append(be)
be = 0
frame.data = data
frame.error = error
frame.compact()
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame)
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.put_nowait(frame)
self.active_event.set()
frame = None
if frame is not None:
if frame.sim_time_sfd is None and d_val == 0xD:
frame.sim_time_sfd = get_sim_time()
frame.data.append(d_val)
frame.error.append(er_val)
class MiiPhy:
def __init__(self, txd, tx_er, tx_en, tx_clk, rxd, rx_er, rx_dv, rx_clk, reset=None,
reset_active_level=True, speed=100e6, *args, **kwargs):
self.tx_clk = tx_clk
self.rx_clk = rx_clk
super().__init__(*args, **kwargs)
self.tx = MiiSink(txd, tx_er, tx_en, tx_clk, reset, reset_active_level=reset_active_level)
self.rx = MiiSource(rxd, rx_er, rx_dv, rx_clk, reset, reset_active_level=reset_active_level)
self.tx_clk.setimmediatevalue(0)
self.rx_clk.setimmediatevalue(0)
self._clock_cr = None
self.set_speed(speed)
def set_speed(self, speed):
if speed in (10e6, 100e6):
self.speed = speed
else:
raise ValueError("Invalid speed selection")
if self._clock_cr is not None:
self._clock_cr.kill()
self._clock_cr = cocotb.fork(self._run_clocks(4*1e9/self.speed))
async def _run_clocks(self, period):
half_period = get_sim_steps(period / 2.0, 'ns')
t = Timer(half_period)
while True:
await t
self.tx_clk <= 1
self.rx_clk <= 1
await t
self.tx_clk <= 0
self.rx_clk <= 0

View File

@@ -27,12 +27,13 @@ import math
from fractions import Fraction from fractions import Fraction
import cocotb import cocotb
from cocotb.triggers import RisingEdge, ReadOnly from cocotb.triggers import RisingEdge
from .version import __version__ from .version import __version__
from .reset import Reset
class PtpClock(object): class PtpClock(Reset):
def __init__( def __init__(
self, self,
@@ -42,6 +43,7 @@ class PtpClock(object):
pps=None, pps=None,
clock=None, clock=None,
reset=None, reset=None,
reset_active_level=True,
period_ns=6.4, period_ns=6.4,
*args, **kwargs): *args, **kwargs):
@@ -87,7 +89,9 @@ class PtpClock(object):
if self.pps is not None: if self.pps is not None:
self.pps.setimmediatevalue(0) self.pps.setimmediatevalue(0)
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
def set_period(self, ns, fns): def set_period(self, ns, fns):
self.period_ns = int(ns) self.period_ns = int(ns)
@@ -176,12 +180,17 @@ class PtpClock(object):
def get_ts_64_s(self): def get_ts_64_s(self):
return self.get_ts_64()*1e-9 return self.get_ts_64()*1e-9
async def _run(self): def _handle_reset(self, state):
while True: if state:
await ReadOnly() self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.ts_96_s = 0 self.ts_96_s = 0
self.ts_96_ns = 0 self.ts_96_ns = 0
self.ts_96_fns = 0 self.ts_96_fns = 0
@@ -196,8 +205,9 @@ class PtpClock(object):
self.ts_step <= 0 self.ts_step <= 0
if self.pps is not None: if self.pps is not None:
self.pps <= 0 self.pps <= 0
continue
async def _run(self):
while True:
await RisingEdge(self.clock) await RisingEdge(self.clock)
if self.ts_step is not None: if self.ts_step is not None:

66
cocotbext/eth/reset.py Normal file
View File

@@ -0,0 +1,66 @@
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import cocotb
from cocotb.triggers import RisingEdge, FallingEdge
class Reset:
def _init_reset(self, reset_signal=None, active_level=True):
self._local_reset = False
self._ext_reset = False
self._reset_state = True
if reset_signal is not None:
cocotb.fork(self._run_reset(reset_signal, bool(active_level)))
self._update_reset()
def assert_reset(self, val=None):
if val is None:
self.assert_reset(True)
self.assert_reset(False)
else:
self._local_reset = bool(val)
self._update_reset()
def _update_reset(self):
new_state = self._local_reset or self._ext_reset
if self._reset_state != new_state:
self._reset_state = new_state
self._handle_reset(new_state)
def _handle_reset(self, state):
pass
async def _run_reset(self, reset_signal, active_level):
while True:
if bool(reset_signal.value):
await FallingEdge(reset_signal)
self._ext_reset = not active_level
self._update_reset()
else:
await RisingEdge(reset_signal)
self._ext_reset = active_level
self._update_reset()

View File

@@ -23,20 +23,23 @@ THE SOFTWARE.
""" """
import logging import logging
from collections import deque
import cocotb import cocotb
from cocotb.triggers import RisingEdge, FallingEdge, ReadOnly, Timer, First, Event from cocotb.queue import Queue
from cocotb.utils import get_sim_time from cocotb.triggers import RisingEdge, FallingEdge, Timer, First, Event
from cocotb.utils import get_sim_time, get_sim_steps
from .version import __version__ from .version import __version__
from .gmii import GmiiFrame from .gmii import GmiiFrame
from .constants import EthPre from .constants import EthPre
from .reset import Reset
class RgmiiSource(object): class RgmiiSource(Reset):
def __init__(self, data, ctrl, clock, reset=None, enable=None, mii_select=None,
reset_active_level=True, *args, **kwargs):
def __init__(self, data, ctrl, clock, reset=None, enable=None, mii_select=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.ctrl = ctrl self.ctrl = ctrl
@@ -53,9 +56,12 @@ class RgmiiSource(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.idle_event = Event()
self.idle_event.set()
self.ifg = 12 self.ifg = 12
self.mii_mode = False
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -63,36 +69,62 @@ class RgmiiSource(object):
self.width = 8 self.width = 8
self.byte_width = 1 self.byte_width = 1
self.reset = reset
assert len(self.data) == 4 assert len(self.data) == 4
self.data.setimmediatevalue(0) self.data.setimmediatevalue(0)
assert len(self.ctrl) == 1 assert len(self.ctrl) == 1
self.ctrl.setimmediatevalue(0) self.ctrl.setimmediatevalue(0)
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def send(self, frame): async def send(self, frame):
self.send_nowait(frame) frame = GmiiFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame): def send_nowait(self, frame):
frame = GmiiFrame(frame) frame = GmiiFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame)
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self): async def wait(self):
while not self.idle(): await self.idle_event.wait()
await RisingEdge(self.clock)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
self.data <= 0
self.ctrl <= 0
async def _run(self): async def _run(self):
frame = None frame = None
@@ -103,20 +135,8 @@ class RgmiiSource(object):
en = 0 en = 0
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
frame = None
ifg_cnt = 0
self.active = False
self.data <= 0
self.ctrl <= 0
continue
await RisingEdge(self.clock) await RisingEdge(self.clock)
if self.mii_select is None or not self.mii_select.value:
# send high nibble after rising edge, leading in to falling edge # send high nibble after rising edge, leading in to falling edge
self.data <= d >> 4 self.data <= d >> 4
self.ctrl <= en ^ er self.ctrl <= en ^ er
@@ -126,20 +146,26 @@ class RgmiiSource(object):
# in IFG # in IFG
ifg_cnt -= 1 ifg_cnt -= 1
elif frame is None and self.queue: elif frame is None and not self.queue.empty():
# send frame # send frame
frame = self.queue.popleft() frame = self.queue.get_nowait()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
frame.sim_time_start = get_sim_time()
frame.sim_time_sfd = None
frame.sim_time_end = None
self.log.info("TX frame: %s", frame) self.log.info("TX frame: %s", frame)
frame.normalize() frame.normalize()
if self.mii_select is not None and self.mii_select.value: if self.mii_select is not None:
self.mii_mode = bool(self.mii_select.value.integer)
if self.mii_mode:
mii_data = [] mii_data = []
mii_error = [] mii_error = []
for b, e in zip(frame.data, frame.error): for b, e in zip(frame.data, frame.error):
mii_data.append(b & 0x0F) mii_data.append((b & 0x0F)*0x11)
mii_data.append(b >> 4) mii_data.append((b >> 4)*0x11)
mii_error.append(e) mii_error.append(e)
mii_error.append(e) mii_error.append(e)
frame.data = mii_data frame.data = mii_data
@@ -152,14 +178,20 @@ class RgmiiSource(object):
er = frame.error.pop(0) er = frame.error.pop(0)
en = 1 en = 1
if frame.sim_time_sfd is None and d in (EthPre.SFD, 0xD, 0xDD):
frame.sim_time_sfd = get_sim_time()
if not frame.data: if not frame.data:
ifg_cnt = max(self.ifg, 1) ifg_cnt = max(self.ifg, 1)
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None frame = None
else: else:
d = 0 d = 0
er = 0 er = 0
en = 0 en = 0
self.active = False self.active = False
self.idle_event.set()
await FallingEdge(self.clock) await FallingEdge(self.clock)
@@ -168,9 +200,11 @@ class RgmiiSource(object):
self.ctrl <= en self.ctrl <= en
class RgmiiSink(object): class RgmiiSink(Reset):
def __init__(self, data, ctrl, clock, reset=None, enable=None, mii_select=None,
reset_active_level=True, *args, **kwargs):
def __init__(self, data, ctrl, clock, reset=None, enable=None, mii_select=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.ctrl = ctrl self.ctrl = ctrl
@@ -187,8 +221,10 @@ class RgmiiSink(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.sync = Event() self.active_event = Event()
self.mii_mode = False
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -196,44 +232,67 @@ class RgmiiSink(object):
self.width = 8 self.width = 8
self.byte_width = 1 self.byte_width = 1
self.reset = reset
assert len(self.data) == 4 assert len(self.data) == 4
assert len(self.ctrl) == 1 assert len(self.ctrl) == 1
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def recv(self, compact=True): async def recv(self, compact=True):
while self.empty(): frame = await self.queue.get()
self.sync.clear() if self.queue.empty():
await self.sync.wait() self.active_event.clear()
return self.recv_nowait(compact) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
return frame
def recv_nowait(self, compact=True): def recv_nowait(self, compact=True):
if self.queue: if not self.queue.empty():
frame = self.queue.popleft() frame = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
return frame return frame
return None return None
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return not self.active return not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit=None): async def wait(self, timeout=0, timeout_unit=None):
if not self.empty(): if not self.empty():
return return
self.sync.clear()
if timeout: if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit)) await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else: else:
await self.sync.wait() await self.active_event.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self): async def _run(self):
frame = None frame = None
@@ -243,15 +302,15 @@ class RgmiiSink(object):
er_val = 0 er_val = 0
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock) await RisingEdge(self.clock)
frame = None
self.active = False
continue
# capture high nibble after rising edge, leading in to falling edge # capture low nibble on rising edge
d_val = self.data.value.integer
dv_val = self.ctrl.value.integer
await FallingEdge(self.clock)
# capture high nibble on falling edge
d_val |= self.data.value.integer << 4 d_val |= self.data.value.integer << 4
er_val = dv_val ^ self.ctrl.value.integer er_val = dv_val ^ self.ctrl.value.integer
@@ -261,12 +320,15 @@ class RgmiiSink(object):
if dv_val: if dv_val:
# start of frame # start of frame
frame = GmiiFrame(bytearray(), []) frame = GmiiFrame(bytearray(), [])
frame.rx_sim_time = get_sim_time() frame.sim_time_start = get_sim_time()
else: else:
if not dv_val: if not dv_val:
# end of frame # end of frame
if self.mii_select is not None and self.mii_select.value: if self.mii_select is not None:
self.mii_mode = bool(self.mii_select.value.integer)
if self.mii_mode:
odd = True odd = True
sync = False sync = False
b = 0 b = 0
@@ -288,25 +350,66 @@ class RgmiiSink(object):
frame.error = error frame.error = error
frame.compact() frame.compact()
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame) self.log.info("RX frame: %s", frame)
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.put_nowait(frame)
self.sync.set() self.active_event.set()
frame = None frame = None
if frame is not None: if frame is not None:
if frame.sim_time_sfd is None and d_val in (EthPre.SFD, 0xD, 0xDD):
frame.sim_time_sfd = get_sim_time()
frame.data.append(d_val) frame.data.append(d_val)
frame.error.append(er_val) frame.error.append(er_val)
await FallingEdge(self.clock)
await ReadOnly()
# capture low nibble after falling edge, leading in to rising edge class RgmiiPhy:
d_val = self.data.value.integer def __init__(self, txd, tx_ctl, tx_clk, rxd, rx_ctl, rx_clk, reset=None,
dv_val = self.ctrl.value.integer reset_active_level=True, speed=1000e6, *args, **kwargs):
await RisingEdge(self.clock) self.tx_clk = tx_clk
self.rx_clk = rx_clk
super().__init__(*args, **kwargs)
self.tx = RgmiiSink(txd, tx_ctl, tx_clk, reset, reset_active_level=reset_active_level)
self.rx = RgmiiSource(rxd, rx_ctl, rx_clk, reset, reset_active_level=reset_active_level)
self.rx_clk.setimmediatevalue(0)
self._clock_cr = None
self.set_speed(speed)
def set_speed(self, speed):
if speed in (10e6, 100e6, 1000e6):
self.speed = speed
else:
raise ValueError("Invalid speed selection")
if self._clock_cr is not None:
self._clock_cr.kill()
if self.speed == 1000e6:
self._clock_cr = cocotb.fork(self._run_clock(8*1e9/self.speed))
self.tx.mii_mode = False
self.rx.mii_mode = False
else:
self._clock_cr = cocotb.fork(self._run_clock(4*1e9/self.speed))
self.tx.mii_mode = True
self.rx.mii_mode = True
async def _run_clock(self, period):
half_period = get_sim_steps(period / 2.0, 'ns')
t = Timer(half_period)
while True:
await t
self.rx_clk <= 1
await t
self.rx_clk <= 0

View File

@@ -1 +1 @@
__version__ = "0.1.2" __version__ = "0.1.8"

View File

@@ -25,45 +25,55 @@ THE SOFTWARE.
import logging import logging
import struct import struct
import zlib import zlib
from collections import deque
import cocotb import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Timer, First, Event from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time from cocotb.utils import get_sim_time
from .version import __version__ from .version import __version__
from .constants import EthPre, ETH_PREAMBLE, XgmiiCtrl from .constants import EthPre, ETH_PREAMBLE, XgmiiCtrl
from .reset import Reset
class XgmiiFrame(object): class XgmiiFrame:
def __init__(self, data=None, ctrl=None): def __init__(self, data=None, ctrl=None, tx_complete=None):
self.data = bytearray() self.data = bytearray()
self.ctrl = None self.ctrl = None
self.rx_sim_time = None self.sim_time_start = None
self.rx_start_lane = None self.sim_time_sfd = None
self.sim_time_end = None
self.start_lane = None
self.tx_complete = None
if type(data) is XgmiiFrame: if type(data) is XgmiiFrame:
self.data = bytearray(data.data) self.data = bytearray(data.data)
self.ctrl = data.ctrl self.ctrl = data.ctrl
self.rx_sim_time = data.rx_sim_time self.sim_time_start = data.sim_time_start
self.rx_start_lane = data.rx_start_lane self.sim_time_sfd = data.sim_time_sfd
self.sim_time_end = data.sim_time_end
self.start_lane = data.start_lane
self.tx_complete = data.tx_complete
else: else:
self.data = bytearray(data) self.data = bytearray(data)
self.ctrl = ctrl self.ctrl = ctrl
if tx_complete is not None:
self.tx_complete = tx_complete
@classmethod @classmethod
def from_payload(cls, payload, min_len=60): def from_payload(cls, payload, min_len=60, tx_complete=None):
payload = bytearray(payload) payload = bytearray(payload)
if len(payload) < min_len: if len(payload) < min_len:
payload.extend(bytearray(min_len-len(payload))) payload.extend(bytearray(min_len-len(payload)))
payload.extend(struct.pack('<L', zlib.crc32(payload))) payload.extend(struct.pack('<L', zlib.crc32(payload)))
return cls.from_raw_payload(payload) return cls.from_raw_payload(payload, tx_complete=tx_complete)
@classmethod @classmethod
def from_raw_payload(cls, payload): def from_raw_payload(cls, payload, tx_complete=None):
data = bytearray(ETH_PREAMBLE) data = bytearray(ETH_PREAMBLE)
data.extend(payload) data.extend(payload)
return cls(data) return cls(data, tx_complete=tx_complete)
def get_preamble_len(self): def get_preamble_len(self):
return self.data.index(EthPre.SFD)+1 return self.data.index(EthPre.SFD)+1
@@ -95,16 +105,24 @@ class XgmiiFrame(object):
if not any(self.ctrl): if not any(self.ctrl):
self.ctrl = None self.ctrl = None
def handle_tx_complete(self):
if isinstance(self.tx_complete, Event):
self.tx_complete.set(self)
elif callable(self.tx_complete):
self.tx_complete(self)
def __eq__(self, other): def __eq__(self, other):
if type(other) is XgmiiFrame: if type(other) is XgmiiFrame:
return self.data == other.data return self.data == other.data
def __repr__(self): def __repr__(self):
return ( return (
f"{type(self).__name__}(data={repr(self.data)}, " f"{type(self).__name__}(data={self.data!r}, "
f"ctrl={repr(self.ctrl)}, " f"ctrl={self.ctrl!r}, "
f"rx_sim_time={repr(self.rx_sim_time)}, " f"sim_time_start={self.sim_time_start!r}, "
f"rx_start_lane={repr(self.rx_start_lane)})" f"sim_time_sfd={self.sim_time_sfd!r}, "
f"sim_time_end={self.sim_time_end!r}, "
f"start_lane={self.start_lane!r})"
) )
def __len__(self): def __len__(self):
@@ -117,9 +135,9 @@ class XgmiiFrame(object):
return bytes(self.data) return bytes(self.data)
class XgmiiSource(object): class XgmiiSource(Reset):
def __init__(self, data, ctrl, clock, reset=None, enable=None, *args, **kwargs): def __init__(self, data, ctrl, clock, reset=None, enable=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.ctrl = ctrl self.ctrl = ctrl
@@ -135,7 +153,9 @@ class XgmiiSource(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.idle_event = Event()
self.idle_event.set()
self.enable_dic = True self.enable_dic = True
self.ifg = 12 self.ifg = 12
@@ -147,8 +167,6 @@ class XgmiiSource(object):
self.width = len(self.data) self.width = len(self.data)
self.byte_width = len(self.ctrl) self.byte_width = len(self.ctrl)
self.reset = reset
assert self.width == self.byte_width * 8 assert self.width == self.byte_width * 8
self.idle_d = 0 self.idle_d = 0
@@ -161,29 +179,57 @@ class XgmiiSource(object):
self.data.setimmediatevalue(0) self.data.setimmediatevalue(0)
self.ctrl.setimmediatevalue(0) self.ctrl.setimmediatevalue(0)
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def send(self, frame): async def send(self, frame):
self.send_nowait(frame) frame = XgmiiFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame): def send_nowait(self, frame):
frame = XgmiiFrame(frame) frame = XgmiiFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame)
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self): async def wait(self):
while not self.idle(): await self.idle_event.wait()
await RisingEdge(self.clock)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
self.data <= 0
self.ctrl <= 0
async def _run(self): async def _run(self):
frame = None frame = None
@@ -192,18 +238,6 @@ class XgmiiSource(object):
self.active = False self.active = False
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
frame = None
ifg_cnt = 0
deficit_idle_cnt = 0
self.active = False
self.data <= 0
self.ctrl <= 0
continue
await RisingEdge(self.clock) await RisingEdge(self.clock)
if self.enable is None or self.enable.value: if self.enable is None or self.enable.value:
@@ -217,13 +251,17 @@ class XgmiiSource(object):
elif frame is None: elif frame is None:
# idle # idle
if self.queue: if not self.queue.empty():
# send frame # send frame
frame = self.queue.popleft() frame = self.queue.get_nowait()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
frame.sim_time_start = get_sim_time()
frame.sim_time_sfd = None
frame.sim_time_end = None
self.log.info("TX frame: %s", frame) self.log.info("TX frame: %s", frame)
frame.normalize() frame.normalize()
frame.start_lane = 0
assert frame.data[0] == EthPre.PRE assert frame.data[0] == EthPre.PRE
assert frame.ctrl[0] == 0 assert frame.ctrl[0] == 0
frame.data[0] = XgmiiCtrl.START frame.data[0] = XgmiiCtrl.START
@@ -239,6 +277,7 @@ class XgmiiSource(object):
if self.byte_width > 4 and (ifg_cnt > min_ifg or self.force_offset_start): if self.byte_width > 4 and (ifg_cnt > min_ifg or self.force_offset_start):
ifg_cnt = ifg_cnt-4 ifg_cnt = ifg_cnt-4
frame.start_lane = 4
frame.data = bytearray([XgmiiCtrl.IDLE]*4)+frame.data frame.data = bytearray([XgmiiCtrl.IDLE]*4)+frame.data
frame.ctrl = [1]*4+frame.ctrl frame.ctrl = [1]*4+frame.ctrl
@@ -257,11 +296,16 @@ class XgmiiSource(object):
for k in range(self.byte_width): for k in range(self.byte_width):
if frame is not None: if frame is not None:
d_val |= frame.data.pop(0) << k*8 d = frame.data.pop(0)
if frame.sim_time_sfd is None and d == EthPre.SFD:
frame.sim_time_sfd = get_sim_time()
d_val |= d << k*8
c_val |= frame.ctrl.pop(0) << k c_val |= frame.ctrl.pop(0) << k
if not frame.data: if not frame.data:
ifg_cnt = max(self.ifg - (self.byte_width-k), 0) ifg_cnt = max(self.ifg - (self.byte_width-k), 0)
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None frame = None
else: else:
d_val |= XgmiiCtrl.IDLE << k*8 d_val |= XgmiiCtrl.IDLE << k*8
@@ -273,11 +317,12 @@ class XgmiiSource(object):
self.data <= self.idle_d self.data <= self.idle_d
self.ctrl <= self.idle_c self.ctrl <= self.idle_c
self.active = False self.active = False
self.idle_event.set()
class XgmiiSink(object): class XgmiiSink(Reset):
def __init__(self, data, ctrl, clock, reset=None, enable=None, *args, **kwargs): def __init__(self, data, ctrl, clock, reset=None, enable=None, reset_active_level=True, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{data._path}") self.log = logging.getLogger(f"cocotb.{data._path}")
self.data = data self.data = data
self.ctrl = ctrl self.ctrl = ctrl
@@ -293,8 +338,8 @@ class XgmiiSink(object):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.sync = Event() self.active_event = Event()
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -302,56 +347,73 @@ class XgmiiSink(object):
self.width = len(self.data) self.width = len(self.data)
self.byte_width = len(self.ctrl) self.byte_width = len(self.ctrl)
self.reset = reset
assert self.width == self.byte_width * 8 assert self.width == self.byte_width * 8
cocotb.fork(self._run()) self._run_cr = None
self._init_reset(reset, reset_active_level)
async def recv(self, compact=True): async def recv(self, compact=True):
while self.empty(): frame = await self.queue.get()
self.sync.clear() if self.queue.empty():
await self.sync.wait() self.active_event.clear()
return self.recv_nowait(compact) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
return frame
def recv_nowait(self, compact=True): def recv_nowait(self, compact=True):
if self.queue: if not self.queue.empty():
frame = self.queue.popleft() frame = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
return frame return frame
return None return None
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def idle(self): def idle(self):
return not self.active return not self.active
def clear(self):
while not self.queue.empty():
self.queue.get_nowait()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit=None): async def wait(self, timeout=0, timeout_unit=None):
if not self.empty(): if not self.empty():
return return
self.sync.clear()
if timeout: if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit)) await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else: else:
await self.sync.wait() await self.active_event.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self): async def _run(self):
frame = None frame = None
self.active = False self.active = False
while True: while True:
await ReadOnly()
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock) await RisingEdge(self.clock)
frame = None
self.active = False
continue
if self.enable is None or self.enable.value: if self.enable is None or self.enable.value:
for offset in range(self.byte_width): for offset in range(self.byte_width):
@@ -362,8 +424,8 @@ class XgmiiSink(object):
if c_val and d_val == XgmiiCtrl.START: if c_val and d_val == XgmiiCtrl.START:
# start # start
frame = XgmiiFrame(bytearray([EthPre.PRE]), [0]) frame = XgmiiFrame(bytearray([EthPre.PRE]), [0])
frame.rx_sim_time = get_sim_time() frame.sim_time_start = get_sim_time()
frame.rx_start_lane = offset frame.start_lane = offset
else: else:
if c_val: if c_val:
# got a control character; terminate frame reception # got a control character; terminate frame reception
@@ -373,17 +435,19 @@ class XgmiiSink(object):
frame.ctrl.append(c_val) frame.ctrl.append(c_val)
frame.compact() frame.compact()
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame) self.log.info("RX frame: %s", frame)
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.put_nowait(frame)
self.sync.set() self.active_event.set()
frame = None frame = None
else: else:
if frame.sim_time_sfd is None and d_val == EthPre.SFD:
frame.sim_time_sfd = get_sim_time()
frame.data.append(d_val) frame.data.append(d_val)
frame.ctrl.append(c_val) frame.ctrl.append(c_val)
await RisingEdge(self.clock)

View File

@@ -65,7 +65,6 @@ deps =
cocotb-test cocotb-test
coverage coverage
pytest-cov pytest-cov
git+https://github.com/cocotb/cocotb.git@e892a3ea48#egg=cocotb
commands = commands =
pytest --cov=cocotbext --cov=tests --cov-branch -n auto pytest --cov=cocotbext --cov=tests --cov-branch -n auto

View File

@@ -31,8 +31,6 @@ TOPLEVEL = $(DUT)
MODULE = $(DUT) MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v VERILOG_SOURCES += $(DUT).v
SIM_BUILD ?= sim_build_$(MODULE)
ifeq ($(SIM), icarus) ifeq ($(SIM), icarus)
PLUSARGS += -fst PLUSARGS += -fst
@@ -48,6 +46,8 @@ else ifeq ($(SIM), verilator)
endif endif
endif endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v: iverilog_dump.v:
echo 'module iverilog_dump();' > $@ echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@ echo 'initial begin' >> $@
@@ -57,9 +57,5 @@ iverilog_dump.v:
echo 'endmodule' >> $@ echo 'endmodule' >> $@
clean:: clean::
@rm -rf sim_build_*
@rm -rf iverilog_dump.v @rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst @rm -rf dump.fst $(TOPLEVEL).fst
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -37,7 +37,7 @@ from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, GmiiSource, GmiiSink from cocotbext.eth import GmiiFrame, GmiiSource, GmiiSink
class TB(object): class TB:
def __init__(self, dut): def __init__(self, dut):
self.dut = dut self.dut = dut
@@ -144,7 +144,6 @@ if cocotb.SIM_NAME:
# cocotb-test # cocotb-test
tests_dir = os.path.dirname(__file__) tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
def test_gmii(request): def test_gmii(request):
@@ -160,8 +159,8 @@ def test_gmii(request):
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, sim_build = os.path.join(tests_dir, "sim_build",
"sim_build_"+request.node.name.replace('[', '-').replace(']', '')) request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run( cocotb_test.simulator.run(
python_search=[tests_dir], python_search=[tests_dir],

61
tests/gmii_phy/Makefile Normal file
View File

@@ -0,0 +1,61 @@
# Copyright (c) 2020 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ns
DUT = test_gmii_phy
TOPLEVEL = $(DUT)
MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v
ifeq ($(SIM), icarus)
PLUSARGS += -fst
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

View File

@@ -0,0 +1,183 @@
#!/usr/bin/env python
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import cocotb_test.simulator
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, GmiiSource, GmiiSink, GmiiPhy
class TB:
def __init__(self, dut, speed=1000e6):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
cocotb.fork(Clock(dut.phy_gtx_clk, 8, units="ns").start())
self.gmii_phy = GmiiPhy(dut.phy_txd, dut.phy_tx_er, dut.phy_tx_en, dut.phy_tx_clk, dut.phy_gtx_clk,
dut.phy_rxd, dut.phy_rx_er, dut.phy_rx_dv, dut.phy_rx_clk, dut.phy_rst, speed=speed)
if speed == 1000e6:
self.source = GmiiSource(dut.phy_txd, dut.phy_tx_er, dut.phy_tx_en, dut.phy_gtx_clk, dut.phy_rst)
self.source.mii_mode = False
self.sink = GmiiSink(dut.phy_rxd, dut.phy_rx_er, dut.phy_rx_dv, dut.phy_rx_clk, dut.phy_rst)
self.sink.mii_mode = False
else:
self.source = GmiiSource(dut.phy_txd, dut.phy_tx_er, dut.phy_tx_en, dut.phy_tx_clk, dut.phy_rst)
self.source.mii_mode = True
self.sink = GmiiSink(dut.phy_rxd, dut.phy_rx_er, dut.phy_rx_dv, dut.phy_rx_clk, dut.phy_rst)
self.sink.mii_mode = True
async def reset(self):
self.dut.phy_rst.setimmediatevalue(0)
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 1
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 0
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.gmii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.source.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.gmii_phy.tx.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.gmii_phy.tx.empty()
await RisingEdge(dut.phy_tx_clk)
await RisingEdge(dut.phy_tx_clk)
async def run_test_rx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.gmii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.gmii_phy.rx.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.phy_rx_clk)
await RisingEdge(dut.phy_rx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
def incrementing_payload(length):
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
def cycle_en():
return itertools.cycle([0, 0, 0, 1])
if cocotb.SIM_NAME:
for test in [run_test_tx, run_test_rx]:
factory = TestFactory(test)
factory.add_option("payload_lengths", [size_list])
factory.add_option("payload_data", [incrementing_payload])
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
def test_gmii_phy(request):
dut = "test_gmii_phy"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(tests_dir, f"{dut}.v"),
]
parameters = {}
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,46 @@
/*
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
// Language: Verilog 2001
`timescale 1ns / 1ns
/*
* GMII PHY test
*/
module test_gmii_phy
(
inout wire phy_rst,
inout wire [7:0] phy_txd,
inout wire phy_tx_er,
inout wire phy_tx_en,
inout wire phy_tx_clk,
inout wire phy_gtx_clk,
inout wire [7:0] phy_rxd,
inout wire phy_rx_er,
inout wire phy_rx_dv,
inout wire phy_rx_clk
);
endmodule

61
tests/mii/Makefile Normal file
View File

@@ -0,0 +1,61 @@
# Copyright (c) 2020 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ns
DUT = test_mii
TOPLEVEL = $(DUT)
MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v
ifeq ($(SIM), icarus)
PLUSARGS += -fst
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

170
tests/mii/test_mii.py Normal file
View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import cocotb_test.simulator
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, MiiSource, MiiSink
class TB:
def __init__(self, dut):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
self._enable_generator = None
self._enable_cr = None
cocotb.fork(Clock(dut.clk, 2, units="ns").start())
self.source = MiiSource(dut.mii_d, dut.mii_er, dut.mii_en,
dut.clk, dut.rst, dut.mii_clk_en)
self.sink = MiiSink(dut.mii_d, dut.mii_er, dut.mii_en,
dut.clk, dut.rst, dut.mii_clk_en)
dut.mii_clk_en.setimmediatevalue(1)
async def reset(self):
self.dut.rst.setimmediatevalue(0)
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 1
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
self.dut.rst <= 0
await RisingEdge(self.dut.clk)
await RisingEdge(self.dut.clk)
def set_enable_generator(self, generator=None):
if self._enable_cr is not None:
self._enable_cr.kill()
self._enable_cr = None
self._enable_generator = generator
if self._enable_generator is not None:
self._enable_cr = cocotb.fork(self._run_enable())
def clear_enable_generator(self):
self.set_enable_generator(None)
async def _run_enable(self):
for val in self._enable_generator:
self.dut.mii_clk_en <= val
await RisingEdge(self.dut.clk)
async def run_test(dut, payload_lengths=None, payload_data=None, ifg=12, enable_gen=None):
tb = TB(dut)
tb.source.ifg = ifg
if enable_gen is not None:
tb.set_enable_generator(enable_gen())
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.source.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
def size_list():
return list(range(60, 128)) + [512, 1514, 9214] + [60]*10
def incrementing_payload(length):
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
def cycle_en():
return itertools.cycle([0, 0, 0, 1])
if cocotb.SIM_NAME:
factory = TestFactory(run_test)
factory.add_option("payload_lengths", [size_list])
factory.add_option("payload_data", [incrementing_payload])
factory.add_option("ifg", [12, 0])
factory.add_option("enable_gen", [None, cycle_en])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
def test_mii(request):
dut = "test_mii"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(tests_dir, f"{dut}.v"),
]
parameters = {}
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

46
tests/mii/test_mii.v Normal file
View File

@@ -0,0 +1,46 @@
/*
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
// Language: Verilog 2001
`timescale 1ns / 1ns
/*
* MII test
*/
module test_mii #
(
parameter DATA_WIDTH = 4
)
(
input wire clk,
input wire rst,
inout wire [DATA_WIDTH-1:0] mii_d,
inout wire mii_er,
inout wire mii_en,
inout wire mii_clk_en
);
endmodule

61
tests/mii_phy/Makefile Normal file
View File

@@ -0,0 +1,61 @@
# Copyright (c) 2020 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ns
DUT = test_mii_phy
TOPLEVEL = $(DUT)
MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v
ifeq ($(SIM), icarus)
PLUSARGS += -fst
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import cocotb_test.simulator
import cocotb
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, MiiSource, MiiSink, MiiPhy
class TB:
def __init__(self, dut, speed=100e6):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
self.mii_phy = MiiPhy(dut.phy_txd, dut.phy_tx_er, dut.phy_tx_en, dut.phy_tx_clk,
dut.phy_rxd, dut.phy_rx_er, dut.phy_rx_dv, dut.phy_rx_clk, dut.phy_rst, speed=speed)
self.source = MiiSource(dut.phy_txd, dut.phy_tx_er, dut.phy_tx_en,
dut.phy_tx_clk, dut.phy_rst)
self.sink = MiiSink(dut.phy_rxd, dut.phy_rx_er, dut.phy_rx_dv,
dut.phy_rx_clk, dut.phy_rst)
async def reset(self):
self.dut.phy_rst.setimmediatevalue(0)
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 1
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 0
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=100e6):
tb = TB(dut, speed)
tb.mii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.source.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.mii_phy.tx.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.mii_phy.tx.empty()
await RisingEdge(dut.phy_tx_clk)
await RisingEdge(dut.phy_tx_clk)
async def run_test_rx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=100e6):
tb = TB(dut, speed)
tb.mii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.mii_phy.rx.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.phy_rx_clk)
await RisingEdge(dut.phy_rx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
def incrementing_payload(length):
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
def cycle_en():
return itertools.cycle([0, 0, 0, 1])
if cocotb.SIM_NAME:
for test in [run_test_tx, run_test_rx]:
factory = TestFactory(test)
factory.add_option("payload_lengths", [size_list])
factory.add_option("payload_data", [incrementing_payload])
factory.add_option("speed", [100e6, 10e6])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
def test_mii_phy(request):
dut = "test_mii_phy"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(tests_dir, f"{dut}.v"),
]
parameters = {}
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,45 @@
/*
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
// Language: Verilog 2001
`timescale 1ns / 1ns
/*
* MII PHY test
*/
module test_mii_phy
(
inout wire phy_rst,
inout wire [3:0] phy_txd,
inout wire phy_tx_er,
inout wire phy_tx_en,
inout wire phy_tx_clk,
inout wire [3:0] phy_rxd,
inout wire phy_rx_er,
inout wire phy_rx_dv,
inout wire phy_rx_clk
);
endmodule

View File

@@ -31,8 +31,6 @@ TOPLEVEL = $(DUT)
MODULE = $(DUT) MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v VERILOG_SOURCES += $(DUT).v
SIM_BUILD ?= sim_build_$(MODULE)
ifeq ($(SIM), icarus) ifeq ($(SIM), icarus)
PLUSARGS += -fst PLUSARGS += -fst
@@ -48,6 +46,8 @@ else ifeq ($(SIM), verilator)
endif endif
endif endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v: iverilog_dump.v:
echo 'module iverilog_dump();' > $@ echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@ echo 'initial begin' >> $@
@@ -57,9 +57,5 @@ iverilog_dump.v:
echo 'endmodule' >> $@ echo 'endmodule' >> $@
clean:: clean::
@rm -rf sim_build_*
@rm -rf iverilog_dump.v @rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst @rm -rf dump.fst $(TOPLEVEL).fst
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -36,7 +36,7 @@ from cocotb.utils import get_sim_time
from cocotbext.eth import PtpClock from cocotbext.eth import PtpClock
class TB(object): class TB:
def __init__(self, dut): def __init__(self, dut):
self.dut = dut self.dut = dut
@@ -164,6 +164,8 @@ async def run_seconds_increment(dut):
tb.ptp_clock.set_ts_64(999990000*2**16) tb.ptp_clock.set_ts_64(999990000*2**16)
await RisingEdge(dut.clk) await RisingEdge(dut.clk)
await RisingEdge(dut.clk)
start_time = get_sim_time('sec') start_time = get_sim_time('sec')
start_ts_96 = (dut.ts_96.value.integer >> 48) + ((dut.ts_96.value.integer & 0xffffffffffff)/2**16*1e-9) start_ts_96 = (dut.ts_96.value.integer >> 48) + ((dut.ts_96.value.integer & 0xffffffffffff)/2**16*1e-9)
start_ts_64 = dut.ts_64.value.integer/2**16*1e-9 start_ts_64 = dut.ts_64.value.integer/2**16*1e-9
@@ -292,7 +294,6 @@ async def run_drift_adjustment(dut):
# cocotb-test # cocotb-test
tests_dir = os.path.dirname(__file__) tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
def test_ptp_clock(request): def test_ptp_clock(request):
@@ -308,8 +309,8 @@ def test_ptp_clock(request):
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, sim_build = os.path.join(tests_dir, "sim_build",
"sim_build_"+request.node.name.replace('[', '-').replace(']', '')) request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run( cocotb_test.simulator.run(
python_search=[tests_dir], python_search=[tests_dir],

View File

@@ -31,8 +31,6 @@ TOPLEVEL = $(DUT)
MODULE = $(DUT) MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v VERILOG_SOURCES += $(DUT).v
SIM_BUILD ?= sim_build_$(MODULE)
ifeq ($(SIM), icarus) ifeq ($(SIM), icarus)
PLUSARGS += -fst PLUSARGS += -fst
@@ -48,6 +46,8 @@ else ifeq ($(SIM), verilator)
endif endif
endif endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v: iverilog_dump.v:
echo 'module iverilog_dump();' > $@ echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@ echo 'initial begin' >> $@
@@ -57,9 +57,5 @@ iverilog_dump.v:
echo 'endmodule' >> $@ echo 'endmodule' >> $@
clean:: clean::
@rm -rf sim_build_*
@rm -rf iverilog_dump.v @rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst @rm -rf dump.fst $(TOPLEVEL).fst
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -37,7 +37,7 @@ from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, RgmiiSource, RgmiiSink from cocotbext.eth import GmiiFrame, RgmiiSource, RgmiiSink
class TB(object): class TB:
def __init__(self, dut): def __init__(self, dut):
self.dut = dut self.dut = dut
@@ -141,7 +141,6 @@ if cocotb.SIM_NAME:
# cocotb-test # cocotb-test
tests_dir = os.path.dirname(__file__) tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
def test_rgmii(request): def test_rgmii(request):
@@ -157,8 +156,8 @@ def test_rgmii(request):
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, sim_build = os.path.join(tests_dir, "sim_build",
"sim_build_"+request.node.name.replace('[', '-').replace(']', '')) request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run( cocotb_test.simulator.run(
python_search=[tests_dir], python_search=[tests_dir],

61
tests/rgmii_phy/Makefile Normal file
View File

@@ -0,0 +1,61 @@
# Copyright (c) 2020 Alex Forencich
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
TOPLEVEL_LANG = verilog
SIM ?= icarus
WAVES ?= 0
COCOTB_HDL_TIMEUNIT = 1ns
COCOTB_HDL_TIMEPRECISION = 1ns
DUT = test_rgmii_phy
TOPLEVEL = $(DUT)
MODULE = $(DUT)
VERILOG_SOURCES += $(DUT).v
ifeq ($(SIM), icarus)
PLUSARGS += -fst
ifeq ($(WAVES), 1)
VERILOG_SOURCES += iverilog_dump.v
COMPILE_ARGS += -s iverilog_dump
endif
else ifeq ($(SIM), verilator)
COMPILE_ARGS += -Wno-SELRANGE -Wno-WIDTH
ifeq ($(WAVES), 1)
COMPILE_ARGS += --trace-fst
endif
endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v:
echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@
echo ' $$dumpfile("$(TOPLEVEL).fst");' >> $@
echo ' $$dumpvars(0, $(TOPLEVEL));' >> $@
echo 'end' >> $@
echo 'endmodule' >> $@
clean::
@rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python
"""
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import itertools
import logging
import os
import cocotb_test.simulator
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge
from cocotb.regression import TestFactory
from cocotbext.eth import GmiiFrame, RgmiiSource, RgmiiSink, RgmiiPhy
class TB:
def __init__(self, dut, speed=1000e6):
self.dut = dut
self.log = logging.getLogger("cocotb.tb")
self.log.setLevel(logging.DEBUG)
if speed == 1000e6:
cocotb.fork(Clock(dut.phy_tx_clk, 8, units="ns").start())
elif speed == 100e6:
cocotb.fork(Clock(dut.phy_tx_clk, 40, units="ns").start())
elif speed == 10e6:
cocotb.fork(Clock(dut.phy_tx_clk, 400, units="ns").start())
self.rgmii_phy = RgmiiPhy(dut.phy_txd, dut.phy_tx_ctl, dut.phy_tx_clk,
dut.phy_rxd, dut.phy_rx_ctl, dut.phy_rx_clk, dut.phy_rst, speed=speed)
self.source = RgmiiSource(dut.phy_txd, dut.phy_tx_ctl, dut.phy_tx_clk, dut.phy_rst)
self.sink = RgmiiSink(dut.phy_rxd, dut.phy_rx_ctl, dut.phy_rx_clk, dut.phy_rst)
if speed == 1000e6:
self.source.mii_mode = False
self.sink.mii_mode = False
else:
self.source.mii_mode = True
self.sink.mii_mode = True
async def reset(self):
self.dut.phy_rst.setimmediatevalue(0)
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 1
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
self.dut.phy_rst <= 0
await RisingEdge(self.dut.phy_tx_clk)
await RisingEdge(self.dut.phy_tx_clk)
async def run_test_tx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.rgmii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.source.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.rgmii_phy.tx.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.rgmii_phy.tx.empty()
await RisingEdge(dut.phy_tx_clk)
await RisingEdge(dut.phy_tx_clk)
async def run_test_rx(dut, payload_lengths=None, payload_data=None, ifg=12, speed=1000e6):
tb = TB(dut, speed)
tb.rgmii_phy.rx.ifg = ifg
tb.source.ifg = ifg
await tb.reset()
test_frames = [payload_data(x) for x in payload_lengths()]
for test_data in test_frames:
test_frame = GmiiFrame.from_payload(test_data)
await tb.rgmii_phy.rx.send(test_frame)
for test_data in test_frames:
rx_frame = await tb.sink.recv()
assert rx_frame.get_payload() == test_data
assert rx_frame.check_fcs()
assert rx_frame.error is None
assert tb.sink.empty()
await RisingEdge(dut.phy_rx_clk)
await RisingEdge(dut.phy_rx_clk)
def size_list():
return list(range(60, 128)) + [512, 1514] + [60]*10
def incrementing_payload(length):
return bytearray(itertools.islice(itertools.cycle(range(256)), length))
def cycle_en():
return itertools.cycle([0, 0, 0, 1])
if cocotb.SIM_NAME:
for test in [run_test_tx, run_test_rx]:
factory = TestFactory(test)
factory.add_option("payload_lengths", [size_list])
factory.add_option("payload_data", [incrementing_payload])
factory.add_option("speed", [1000e6, 100e6, 10e6])
factory.generate_tests()
# cocotb-test
tests_dir = os.path.dirname(__file__)
def test_rgmii_phy(request):
dut = "test_rgmii_phy"
module = os.path.splitext(os.path.basename(__file__))[0]
toplevel = dut
verilog_sources = [
os.path.join(tests_dir, f"{dut}.v"),
]
parameters = {}
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, "sim_build",
request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run(
python_search=[tests_dir],
verilog_sources=verilog_sources,
toplevel=toplevel,
module=module,
parameters=parameters,
sim_build=sim_build,
extra_env=extra_env,
)

View File

@@ -0,0 +1,43 @@
/*
Copyright (c) 2020 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
// Language: Verilog 2001
`timescale 1ns / 1ns
/*
* RGMII PHY test
*/
module test_rgmii_phy
(
inout wire phy_rst,
inout wire [3:0] phy_txd,
inout wire phy_tx_ctl,
inout wire phy_tx_clk,
inout wire [3:0] phy_rxd,
inout wire phy_rx_ctl,
inout wire phy_rx_clk
);
endmodule

View File

@@ -35,8 +35,6 @@ VERILOG_SOURCES += $(DUT).v
export PARAM_DATA_WIDTH ?= 64 export PARAM_DATA_WIDTH ?= 64
export PARAM_CTRL_WIDTH ?= $(shell expr $(PARAM_DATA_WIDTH) / 8 ) export PARAM_CTRL_WIDTH ?= $(shell expr $(PARAM_DATA_WIDTH) / 8 )
SIM_BUILD ?= sim_build_$(MODULE)-$(PARAM_DATA_WIDTH)
ifeq ($(SIM), icarus) ifeq ($(SIM), icarus)
PLUSARGS += -fst PLUSARGS += -fst
@@ -58,6 +56,8 @@ else ifeq ($(SIM), verilator)
endif endif
endif endif
include $(shell cocotb-config --makefiles)/Makefile.sim
iverilog_dump.v: iverilog_dump.v:
echo 'module iverilog_dump();' > $@ echo 'module iverilog_dump();' > $@
echo 'initial begin' >> $@ echo 'initial begin' >> $@
@@ -67,9 +67,5 @@ iverilog_dump.v:
echo 'endmodule' >> $@ echo 'endmodule' >> $@
clean:: clean::
@rm -rf sim_build_*
@rm -rf iverilog_dump.v @rm -rf iverilog_dump.v
@rm -rf dump.fst $(TOPLEVEL).fst @rm -rf dump.fst $(TOPLEVEL).fst
include $(shell cocotb-config --makefiles)/Makefile.sim

View File

@@ -38,7 +38,7 @@ from cocotb.regression import TestFactory
from cocotbext.eth import XgmiiFrame, XgmiiSource, XgmiiSink from cocotbext.eth import XgmiiFrame, XgmiiSource, XgmiiSink
class TB(object): class TB:
def __init__(self, dut): def __init__(self, dut):
self.dut = dut self.dut = dut
@@ -150,7 +150,7 @@ async def run_test_alignment(dut, payload_data=None, ifg=12, enable_dic=True,
assert rx_frame.check_fcs() assert rx_frame.check_fcs()
assert rx_frame.ctrl is None assert rx_frame.ctrl is None
start_lane.append(rx_frame.rx_start_lane) start_lane.append(rx_frame.start_lane)
tb.log.info("length: %d", length) tb.log.info("length: %d", length)
tb.log.info("start_lane: %s", start_lane) tb.log.info("start_lane: %s", start_lane)
@@ -229,7 +229,6 @@ if cocotb.SIM_NAME:
# cocotb-test # cocotb-test
tests_dir = os.path.dirname(__file__) tests_dir = os.path.dirname(__file__)
rtl_dir = os.path.abspath(os.path.join(tests_dir, '..', '..', 'rtl'))
@pytest.mark.parametrize("data_width", [32, 64]) @pytest.mark.parametrize("data_width", [32, 64])
@@ -249,8 +248,8 @@ def test_xgmii(request, data_width):
extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()} extra_env = {f'PARAM_{k}': str(v) for k, v in parameters.items()}
sim_build = os.path.join(tests_dir, sim_build = os.path.join(tests_dir, "sim_build",
"sim_build_"+request.node.name.replace('[', '-').replace(']', '')) request.node.name.replace('[', '-').replace(']', ''))
cocotb_test.simulator.run( cocotb_test.simulator.run(
python_search=[tests_dir], python_search=[tests_dir],