#!/usr/bin/env python # SPDX-License-Identifier: CERN-OHL-S-2.0 """ Copyright (c) 2021-2025 FPGA Ninja, LLC Authors: - Alex Forencich """ import logging import cocotb from cocotb.queue import Queue, QueueFull from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.utils import get_sim_time from cocotbext.eth.constants import (EthPre, XgmiiCtrl, BaseRCtrl, BaseRO, BaseRSync, BaseRBlockType, xgmii_ctrl_to_baser_mapping, baser_ctrl_to_xgmii_mapping, block_type_term_lane_mapping) from cocotbext.eth import XgmiiFrame class BaseRSerdesSource(): def __init__(self, data, header, clock, enable=None, slip=None, scramble=True, reverse=False, *args, **kwargs): self.log = logging.getLogger(f"cocotb.{data._path}") self.data = data self.header = header self.clock = clock self.enable = enable self.slip = slip self.scramble = scramble self.reverse = reverse self.log.info("BASE-R serdes source") self.log.info("Copyright (c) 2021 Alex Forencich") self.log.info("https://github.com/alexforencich/verilog-ethernet") super().__init__(*args, **kwargs) self.active = False self.queue = Queue() self.dequeue_event = Event() self.current_frame = None self.idle_event = Event() self.idle_event.set() self.enable_dic = True self.ifg = 12 self.force_offset_start = False self.bit_offset = 0 self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 self.queue_occupancy_limit_bytes = -1 self.queue_occupancy_limit_frames = -1 self.width = len(self.data) self.byte_size = 8 self.byte_lanes = 8 assert self.width == self.byte_lanes * self.byte_size self.log.info("BASE-R serdes source model configuration") self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) self.log.info(" Enable scrambler: %s", self.scramble) self.log.info(" Bit reverse: %s", self.reverse) self.data.setimmediatevalue(0) self.header.setimmediatevalue(0) self._run_cr = cocotb.start_soon(self._run()) async def send(self, frame): while self.full(): self.dequeue_event.clear() await self.dequeue_event.wait() frame = XgmiiFrame(frame) await self.queue.put(frame) self.idle_event.clear() self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 def send_nowait(self, frame): if self.full(): raise QueueFull() frame = XgmiiFrame(frame) self.queue.put_nowait(frame) self.idle_event.clear() self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 def count(self): return self.queue.qsize() def empty(self): return self.queue.empty() def full(self): if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes: return True elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames: return True else: return False def idle(self): return self.empty() and not self.active def clear(self): while not self.queue.empty(): frame = self.queue.get_nowait() frame.sim_time_end = None frame.handle_tx_complete() self.dequeue_event.set() self.idle_event.set() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 async def wait(self): await self.idle_event.wait() async def _run(self): frame = None frame_offset = 0 ifg_cnt = 0 deficit_idle_cnt = 0 scrambler_state = 0 last_d = 0 self.active = False while True: await RisingEdge(self.clock) if self.enable is None or self.enable.value: if ifg_cnt + deficit_idle_cnt > self.byte_lanes-1 or (not self.enable_dic and ifg_cnt > 4): # in IFG ifg_cnt = ifg_cnt - self.byte_lanes if ifg_cnt < 0: if self.enable_dic: deficit_idle_cnt = max(deficit_idle_cnt+ifg_cnt, 0) ifg_cnt = 0 elif frame is None: # idle if not self.queue.empty(): # send frame frame = self.queue.get_nowait() self.dequeue_event.set() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 self.current_frame = frame frame.sim_time_start = get_sim_time() frame.sim_time_sfd = None frame.sim_time_end = None self.log.info("TX frame: %s", frame) frame.normalize() frame.start_lane = 0 assert frame.data[0] == EthPre.PRE assert frame.ctrl[0] == 0 frame.data[0] = XgmiiCtrl.START frame.ctrl[0] = 1 frame.data.append(XgmiiCtrl.TERM) frame.ctrl.append(1) # offset start if self.enable_dic: min_ifg = 3 - deficit_idle_cnt else: min_ifg = 0 if self.byte_lanes > 4 and (ifg_cnt > min_ifg or self.force_offset_start): ifg_cnt = ifg_cnt-4 frame.start_lane = 4 frame.data = bytearray([XgmiiCtrl.IDLE]*4)+frame.data frame.ctrl = [1]*4+frame.ctrl if self.enable_dic: deficit_idle_cnt = max(deficit_idle_cnt+ifg_cnt, 0) ifg_cnt = 0 self.active = True frame_offset = 0 else: # clear counters deficit_idle_cnt = 0 ifg_cnt = 0 if frame is not None: dl = bytearray() cl = [] for k in range(self.byte_lanes): if frame is not None: d = frame.data[frame_offset] if frame.sim_time_sfd is None and d == EthPre.SFD: frame.sim_time_sfd = get_sim_time() dl.append(d) cl.append(frame.ctrl[frame_offset]) frame_offset += 1 if frame_offset >= len(frame.data): ifg_cnt = max(self.ifg - (self.byte_lanes-k), 0) frame.sim_time_end = get_sim_time() frame.handle_tx_complete() frame = None self.current_frame = None else: dl.append(XgmiiCtrl.IDLE) cl.append(1) # remap control characters ctrl = sum(xgmii_ctrl_to_baser_mapping.get(d, BaseRCtrl.ERROR) << i*7 for i, d in enumerate(dl)) if not any(cl): # data header = BaseRSync.DATA data = int.from_bytes(dl, 'little') else: # control header = BaseRSync.CTRL if cl[0] and dl[0] == XgmiiCtrl.START and not any(cl[1:]): # start in lane 0 data = BaseRBlockType.START_0 for i in range(1, 8): data |= dl[i] << i*8 elif cl[4] and dl[4] == XgmiiCtrl.START and not any(cl[5:]): # start in lane 4 if cl[0] and (dl[0] == XgmiiCtrl.SEQ_OS or dl[0] == XgmiiCtrl.SIG_OS) and not any(cl[1:4]): # ordered set in lane 0 data = BaseRBlockType.OS_START for i in range(1, 4): data |= dl[i] << i*8 if dl[0] == XgmiiCtrl.SIG_OS: # signal ordered set data |= BaseRO.SIG_OS << 32 else: # other control data = BaseRBlockType.START_4 | (ctrl & 0xfffffff) << 8 for i in range(5, 8): data |= dl[i] << i*8 elif cl[0] and (dl[0] == XgmiiCtrl.SEQ_OS or dl[0] == XgmiiCtrl.SIG_OS) and not any(cl[1:4]): # ordered set in lane 0 if cl[4] and (dl[4] == XgmiiCtrl.SEQ_OS or dl[4] == XgmiiCtrl.SIG_OS) and not any(cl[5:8]): # ordered set in lane 4 data = BaseRBlockType.OS_04 for i in range(5, 8): data |= dl[i] << i*8 if dl[4] == XgmiiCtrl.SIG_OS: # signal ordered set data |= BaseRO.SIG_OS << 36 else: data = BaseRBlockType.OS_0 | (ctrl & 0xfffffff) << 40 for i in range(1, 4): data |= dl[i] << i*8 if dl[0] == XgmiiCtrl.SIG_OS: # signal ordered set data |= BaseRO.SIG_OS << 32 elif cl[4] and (dl[4] == XgmiiCtrl.SEQ_OS or dl[4] == XgmiiCtrl.SIG_OS) and not any(cl[5:8]): # ordered set in lane 4 data = BaseRBlockType.OS_4 | (ctrl & 0xfffffff) << 8 for i in range(5, 8): data |= dl[i] << i*8 if dl[4] == XgmiiCtrl.SIG_OS: # signal ordered set data |= BaseRO.SIG_OS << 36 elif cl[0] and dl[0] == XgmiiCtrl.TERM: # terminate in lane 0 data = BaseRBlockType.TERM_0 | (ctrl & 0xffffffffffff80) << 8 elif cl[1] and dl[1] == XgmiiCtrl.TERM and not cl[0]: # terminate in lane 1 data = BaseRBlockType.TERM_1 | (ctrl & 0xffffffffffc000) << 8 | dl[0] << 8 elif cl[2] and dl[2] == XgmiiCtrl.TERM and not any(cl[0:2]): # terminate in lane 2 data = BaseRBlockType.TERM_2 | (ctrl & 0xffffffffe00000) << 8 for i in range(2): data |= dl[i] << ((i+1)*8) elif cl[3] and dl[3] == XgmiiCtrl.TERM and not any(cl[0:3]): # terminate in lane 3 data = BaseRBlockType.TERM_3 | (ctrl & 0xfffffff0000000) << 8 for i in range(3): data |= dl[i] << ((i+1)*8) elif cl[4] and dl[4] == XgmiiCtrl.TERM and not any(cl[0:4]): # terminate in lane 4 data = BaseRBlockType.TERM_4 | (ctrl & 0xfffff800000000) << 8 for i in range(4): data |= dl[i] << ((i+1)*8) elif cl[5] and dl[5] == XgmiiCtrl.TERM and not any(cl[0:5]): # terminate in lane 5 data = BaseRBlockType.TERM_5 | (ctrl & 0xfffc0000000000) << 8 for i in range(5): data |= dl[i] << ((i+1)*8) elif cl[6] and dl[6] == XgmiiCtrl.TERM and not any(cl[0:6]): # terminate in lane 6 data = BaseRBlockType.TERM_6 | (ctrl & 0xfe000000000000) << 8 for i in range(6): data |= dl[i] << ((i+1)*8) elif cl[7] and dl[7] == XgmiiCtrl.TERM and not any(cl[0:7]): # terminate in lane 7 data = BaseRBlockType.TERM_7 for i in range(7): data |= dl[i] << ((i+1)*8) else: # all control data = BaseRBlockType.CTRL | ctrl << 8 else: data = BaseRBlockType.CTRL header = BaseRSync.CTRL self.active = False self.idle_event.set() if self.scramble: # 64b/66b scrambler b = 0 for i in range(len(self.data)): if bool(scrambler_state & (1 << 38)) ^ bool(scrambler_state & (1 << 57)) ^ bool(data & (1 << i)): scrambler_state = ((scrambler_state & 0x1ffffffffffffff) << 1) | 1 b = b | (1 << i) else: scrambler_state = (scrambler_state & 0x1ffffffffffffff) << 1 data = b if self.slip is not None and self.slip.value: self.bit_offset += 1 self.bit_offset = max(0, self.bit_offset) % 66 if self.bit_offset != 0: d = data << 2 | header out_d = ((last_d | d << 66) >> 66-self.bit_offset) & 0x3ffffffffffffffff last_d = d data = out_d >> 2 header = out_d & 3 if self.reverse: # bit reverse data = sum(1 << (63-i) for i in range(64) if (data >> i) & 1) header = sum(1 << (1-i) for i in range(2) if (header >> i) & 1) self.data.value = data self.header.value = header class BaseRSerdesSink: def __init__(self, data, header, clock, enable=None, scramble=True, reverse=False, *args, **kwargs): self.log = logging.getLogger(f"cocotb.{data._path}") self.data = data self.header = header self.clock = clock self.enable = enable self.scramble = scramble self.reverse = reverse self.log.info("BASE-R serdes sink") self.log.info("Copyright (c) 2021 Alex Forencich") self.log.info("https://github.com/alexforencich/verilog-ethernet") super().__init__(*args, **kwargs) self.active = False self.queue = Queue() self.active_event = Event() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 self.width = len(self.data) self.byte_size = 8 self.byte_lanes = 8 assert self.width == self.byte_lanes * self.byte_size self.log.info("BASE-R serdes sink model configuration") self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) self.log.info(" Enable scrambler: %s", self.scramble) self.log.info(" Bit reverse: %s", self.reverse) self._run_cr = cocotb.start_soon(self._run()) def _recv(self, frame, compact=True): if self.queue.empty(): self.active_event.clear() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 if compact: frame.compact() return frame async def recv(self, compact=True): frame = await self.queue.get() return self._recv(frame, compact) def recv_nowait(self, compact=True): frame = self.queue.get_nowait() return self._recv(frame, compact) def count(self): return self.queue.qsize() def empty(self): return self.queue.empty() def idle(self): return not self.active def clear(self): while not self.queue.empty(): self.queue.get_nowait() self.active_event.clear() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 async def wait(self, timeout=0, timeout_unit=None): if not self.empty(): return if timeout: await First(self.active_event.wait(), Timer(timeout, timeout_unit)) else: await self.active_event.wait() async def _run(self): frame = None scrambler_state = 0 self.active = False while True: await RisingEdge(self.clock) if self.enable is None or self.enable.value: data = self.data.value.integer header = self.header.value.integer if self.reverse: # bit reverse data = sum(1 << (63-i) for i in range(64) if (data >> i) & 1) header = sum(1 << (1-i) for i in range(2) if (header >> i) & 1) if self.scramble: # 64b/66b descrambler b = 0 for i in range(len(self.data)): if bool(scrambler_state & (1 << 38)) ^ bool(scrambler_state & (1 << 57)) ^ bool(data & (1 << i)): b = b | (1 << i) scrambler_state = (scrambler_state & 0x1ffffffffffffff) << 1 | bool(data & (1 << i)) data = b # 10GBASE-R decoding # remap control characters ctrl = bytearray(baser_ctrl_to_xgmii_mapping.get((data >> i*7+8) & 0x7f, XgmiiCtrl.ERROR) for i in range(8)) data = data.to_bytes(8, 'little') dl = bytearray() cl = [] if header == BaseRSync.DATA: # data dl = data cl = [0]*8 elif header == BaseRSync.CTRL: if data[0] == BaseRBlockType.CTRL: # C7 C6 C5 C4 C3 C2 C1 C0 BT dl = ctrl cl = [1]*8 elif data[0] == BaseRBlockType.OS_4: # D7 D6 D5 O4 C3 C2 C1 C0 BT dl = ctrl[0:4] cl = [1]*4 if (data[4] >> 4) & 0xf == BaseRO.SEQ_OS: dl.append(XgmiiCtrl.SEQ_OS) elif (data[4] >> 4) & 0xf == BaseRO.SIG_OS: dl.append(XgmiiCtrl.SIG_OS) else: dl.append(XgmiiCtrl.ERROR) cl.append(1) dl += data[5:] cl += [0]*3 elif data[0] == BaseRBlockType.START_4: # D7 D6 D5 C3 C2 C1 C0 BT dl = ctrl[0:4] cl = [1]*4 dl.append(XgmiiCtrl.START) cl.append(1) dl += data[5:] cl += [0]*3 elif data[0] == BaseRBlockType.OS_START: # D7 D6 D5 O0 D3 D2 D1 BT if data[4] & 0xf == BaseRO.SEQ_OS: dl.append(XgmiiCtrl.SEQ_OS) elif data[4] & 0xf == BaseRO.SIG_OS: dl.append(XgmiiCtrl.SIG_OS) else: dl.append(XgmiiCtrl.ERROR) cl.append(1) dl += data[1:4] cl += [0]*3 dl.append(XgmiiCtrl.START) cl.append(1) dl += data[5:] cl += [0]*3 elif data[0] == BaseRBlockType.OS_04: # D7 D6 D5 O4 O0 D3 D2 D1 BT if data[4] & 0xf == BaseRO.SEQ_OS: dl.append(XgmiiCtrl.SEQ_OS) elif data[4] & 0xf == BaseRO.SIG_OS: dl.append(XgmiiCtrl.SIG_OS) else: dl.append(XgmiiCtrl.ERROR) cl.append(1) dl += data[1:4] cl += [0]*3 if (data[4] >> 4) & 0xf == BaseRO.SEQ_OS: dl.append(XgmiiCtrl.SEQ_OS) elif (data[4] >> 4) & 0xf == BaseRO.SIG_OS: dl.append(XgmiiCtrl.SIG_OS) else: dl.append(XgmiiCtrl.ERROR) cl.append(1) dl += data[5:] cl += [0]*3 elif data[0] == BaseRBlockType.START_0: # D7 D6 D5 D4 D3 D2 D1 BT dl.append(XgmiiCtrl.START) cl.append(1) dl += data[1:] cl += [0]*7 elif data[0] == BaseRBlockType.OS_0: # C7 C6 C5 C4 O0 D3 D2 D1 BT if data[4] & 0xf == BaseRO.SEQ_OS: dl.append(XgmiiCtrl.SEQ_OS) elif data[4] & 0xf == BaseRO.SIG_OS: dl.append(XgmiiCtrl.SEQ_OS) else: dl.append(XgmiiCtrl.ERROR) cl.append(1) dl += data[1:4] cl += [0]*3 dl += ctrl[4:] cl += [1]*4 elif data[0] in {BaseRBlockType.TERM_0, BaseRBlockType.TERM_1, BaseRBlockType.TERM_2, BaseRBlockType.TERM_3, BaseRBlockType.TERM_4, BaseRBlockType.TERM_5, BaseRBlockType.TERM_6, BaseRBlockType.TERM_7}: # C7 C6 C5 C4 C3 C2 C1 BT # C7 C6 C5 C4 C3 C2 D0 BT # C7 C6 C5 C4 C3 D1 D0 BT # C7 C6 C5 C4 D2 D1 D0 BT # C7 C6 C5 D3 D2 D1 D0 BT # C7 C6 D4 D3 D2 D1 D0 BT # C7 D5 D4 D3 D2 D1 D0 BT # D6 D5 D4 D3 D2 D1 D0 BT term_lane = block_type_term_lane_mapping[data[0]] dl += data[1:term_lane+1] cl += [0]*term_lane dl.append(XgmiiCtrl.TERM) cl.append(1) dl += ctrl[term_lane+1:] cl += [1]*(7-term_lane) else: # invalid block type self.log.warning("Invalid block type") dl = [XgmiiCtrl.ERROR]*8 cl = [1]*8 else: # invalid sync header self.log.warning("Invalid sync header") dl = [XgmiiCtrl.ERROR]*8 cl = [1]*8 for offset in range(self.byte_lanes): d_val = dl[offset] c_val = cl[offset] if frame is None: if c_val and d_val == XgmiiCtrl.START: # start frame = XgmiiFrame(bytearray([EthPre.PRE]), [0]) frame.sim_time_start = get_sim_time() frame.start_lane = offset else: if c_val: # got a control character; terminate frame reception if d_val != XgmiiCtrl.TERM: # store control character if it's not a termination frame.data.append(d_val) frame.ctrl.append(c_val) frame.compact() frame.sim_time_end = get_sim_time() self.log.info("RX frame: %s", frame) self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 self.queue.put_nowait(frame) self.active_event.set() frame = None else: if frame.sim_time_sfd is None and d_val == EthPre.SFD: frame.sim_time_sfd = get_sim_time() frame.data.append(d_val) frame.ctrl.append(c_val)