""" Copyright (c) 2020 Alex Forencich Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import logging import struct import zlib import cocotb from cocotb.queue import Queue from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.utils import get_sim_time from .version import __version__ from .constants import EthPre, ETH_PREAMBLE, XgmiiCtrl from .reset import Reset class XgmiiFrame: def __init__(self, data=None, ctrl=None, tx_complete=None): self.data = bytearray() self.ctrl = None self.sim_time_start = None self.sim_time_sfd = None self.sim_time_end = None self.start_lane = None self.tx_complete = None if type(data) is XgmiiFrame: self.data = bytearray(data.data) self.ctrl = data.ctrl self.sim_time_start = data.sim_time_start self.sim_time_sfd = data.sim_time_sfd self.sim_time_end = data.sim_time_end self.start_lane = data.start_lane self.tx_complete = data.tx_complete else: self.data = bytearray(data) self.ctrl = ctrl if tx_complete is not None: self.tx_complete = tx_complete @classmethod def from_payload(cls, payload, min_len=60, tx_complete=None): payload = bytearray(payload) if len(payload) < min_len: payload.extend(bytearray(min_len-len(payload))) payload.extend(struct.pack(' self.byte_width-1 or (not self.enable_dic and ifg_cnt > 4): # in IFG ifg_cnt = ifg_cnt - self.byte_width if ifg_cnt < 0: if self.enable_dic: deficit_idle_cnt = max(deficit_idle_cnt+ifg_cnt, 0) ifg_cnt = 0 elif frame is None: # idle if not self.queue.empty(): # send frame frame = self.queue.get_nowait() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 frame.sim_time_start = get_sim_time() frame.sim_time_sfd = None frame.sim_time_end = None self.log.info("TX frame: %s", frame) frame.normalize() frame.start_lane = 0 assert frame.data[0] == EthPre.PRE assert frame.ctrl[0] == 0 frame.data[0] = XgmiiCtrl.START frame.ctrl[0] = 1 frame.data.append(XgmiiCtrl.TERM) frame.ctrl.append(1) # offset start if self.enable_dic: min_ifg = 3 - deficit_idle_cnt else: min_ifg = 0 if self.byte_width > 4 and (ifg_cnt > min_ifg or self.force_offset_start): ifg_cnt = ifg_cnt-4 frame.start_lane = 4 frame.data = bytearray([XgmiiCtrl.IDLE]*4)+frame.data frame.ctrl = [1]*4+frame.ctrl if self.enable_dic: deficit_idle_cnt = max(deficit_idle_cnt+ifg_cnt, 0) ifg_cnt = 0 self.active = True else: # clear counters deficit_idle_cnt = 0 ifg_cnt = 0 if frame is not None: d_val = 0 c_val = 0 for k in range(self.byte_width): if frame is not None: d = frame.data.pop(0) if frame.sim_time_sfd is None and d == EthPre.SFD: frame.sim_time_sfd = get_sim_time() d_val |= d << k*8 c_val |= frame.ctrl.pop(0) << k if not frame.data: ifg_cnt = max(self.ifg - (self.byte_width-k), 0) frame.sim_time_end = get_sim_time() frame.handle_tx_complete() frame = None else: d_val |= XgmiiCtrl.IDLE << k*8 c_val |= 1 << k self.data <= d_val self.ctrl <= c_val else: self.data <= self.idle_d self.ctrl <= self.idle_c self.active = False self.idle_event.set() class XgmiiSink(Reset): def __init__(self, data, ctrl, clock, reset=None, enable=None, reset_active_level=True, *args, **kwargs): self.log = logging.getLogger(f"cocotb.{data._path}") self.data = data self.ctrl = ctrl self.clock = clock self.reset = reset self.enable = enable self.log.info("XGMII sink") self.log.info("cocotbext-eth version %s", __version__) self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-eth") super().__init__(*args, **kwargs) self.active = False self.queue = Queue() self.active_event = Event() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 self.width = len(self.data) self.byte_width = len(self.ctrl) assert self.width == self.byte_width * 8 self._run_cr = None self._init_reset(reset, reset_active_level) async def recv(self, compact=True): frame = await self.queue.get() if self.queue.empty(): self.active_event.clear() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 return frame def recv_nowait(self, compact=True): if not self.queue.empty(): frame = self.queue.get_nowait() if self.queue.empty(): self.active_event.clear() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 return frame return None def count(self): return self.queue.qsize() def empty(self): return self.queue.empty() def idle(self): return not self.active def clear(self): while not self.queue.empty(): self.queue.get_nowait() self.active_event.clear() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 async def wait(self, timeout=0, timeout_unit=None): if not self.empty(): return if timeout: await First(self.active_event.wait(), Timer(timeout, timeout_unit)) else: await self.active_event.wait() def _handle_reset(self, state): if state: self.log.info("Reset asserted") if self._run_cr is not None: self._run_cr.kill() self._run_cr = None else: self.log.info("Reset de-asserted") if self._run_cr is None: self._run_cr = cocotb.scheduler.start_soon(self._run()) self.active = False async def _run(self): frame = None self.active = False while True: await RisingEdge(self.clock) if self.enable is None or self.enable.value: for offset in range(self.byte_width): d_val = (self.data.value.integer >> (offset*8)) & 0xff c_val = (self.ctrl.value.integer >> offset) & 1 if frame is None: if c_val and d_val == XgmiiCtrl.START: # start frame = XgmiiFrame(bytearray([EthPre.PRE]), [0]) frame.sim_time_start = get_sim_time() frame.start_lane = offset else: if c_val: # got a control character; terminate frame reception if d_val != XgmiiCtrl.TERM: # store control character if it's not a termination frame.data.append(d_val) frame.ctrl.append(c_val) frame.compact() frame.sim_time_end = get_sim_time() self.log.info("RX frame: %s", frame) self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 self.queue.put_nowait(frame) self.active_event.set() frame = None else: if frame.sim_time_sfd is None and d_val == EthPre.SFD: frame.sim_time_sfd = get_sim_time() frame.data.append(d_val) frame.ctrl.append(c_val)