diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index 088a2d7..2dad41c 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -231,11 +231,18 @@ class AxiStreamFrame: return bytes(self.tdata) -class AxiStreamSource(Reset): +class AxiStreamBase(Reset): _signals = ["tdata"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] + _type = "base" + + _init_x = False + + _valid_init = None + _ready_init = None + def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): self.log = logging.getLogger(f"cocotb.{entity._name}.{name}") self.entity = entity @@ -243,7 +250,7 @@ class AxiStreamSource(Reset): self.reset = reset self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs) - self.log.info("AXI stream source") + self.log.info("AXI stream %s", self._type) self.log.info("cocotbext-axi version %s", __version__) self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-axi") @@ -252,10 +259,7 @@ class AxiStreamSource(Reset): self.active = False self.queue = deque() - - self.pause = False - self._pause_generator = None - self._pause_cr = None + self.queue_sync = Event() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 @@ -263,19 +267,17 @@ class AxiStreamSource(Reset): self.width = len(self.bus.tdata) self.byte_lanes = 1 - self.bus.tdata.setimmediatevalue(0) - if hasattr(self.bus, "tvalid"): - self.bus.tvalid.setimmediatevalue(0) - if hasattr(self.bus, "tlast"): - self.bus.tlast.setimmediatevalue(0) - if hasattr(self.bus, "tkeep"): - self.bus.tkeep.setimmediatevalue(0) - if hasattr(self.bus, "tid"): - self.bus.tid.setimmediatevalue(0) - if hasattr(self.bus, "tdest"): - self.bus.tdest.setimmediatevalue(0) - if hasattr(self.bus, "tuser"): - self.bus.tuser.setimmediatevalue(0) + if self._valid_init is not None and hasattr(self.bus, "tvalid"): + self.bus.tvalid.setimmediatevalue(self._valid_init) + if self._ready_init is not None and hasattr(self.bus, "tready"): + self.bus.tready.setimmediatevalue(self._ready_init) + + for sig in self._signals+self._optional_signals: + if hasattr(self.bus, sig): + if self._init_x and sig not in ("tvalid", "tready"): + v = getattr(self.bus, sig).value + v.binstr = 'x'*len(v) + getattr(self.bus, sig).setimmediatevalue(v) if hasattr(self.bus, "tkeep"): self.byte_lanes = len(self.bus.tkeep) @@ -292,7 +294,7 @@ class AxiStreamSource(Reset): self.byte_size = self.width // self.byte_lanes self.byte_mask = 2**self.byte_size-1 - self.log.info("AXI stream source configuration:") + self.log.info("AXI stream %s configuration:", self._type) self.log.info(" Byte size: %d bits", self.byte_size) self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") @@ -323,6 +325,70 @@ class AxiStreamSource(Reset): self._init_reset(reset) + def count(self): + return len(self.queue) + + def empty(self): + return not self.queue + + def clear(self): + self.queue.clear() + self.queue_occupancy_bytes = 0 + self.queue_occupancy_frames = 0 + + def _handle_reset(self, state): + if state: + self.log.info("Reset asserted") + if self._run_cr is not None: + self._run_cr.kill() + self._run_cr = None + else: + self.log.info("Reset de-asserted") + if self._run_cr is None: + self._run_cr = cocotb.fork(self._run()) + + self.active = False + + async def _run(self): + raise NotImplementedError() + + +class AxiStreamPause: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.pause = False + self._pause_generator = None + self._pause_cr = None + + def set_pause_generator(self, generator=None): + if self._pause_cr is not None: + self._pause_cr.kill() + self._pause_cr = None + + self._pause_generator = generator + + if self._pause_generator is not None: + self._pause_cr = cocotb.fork(self._run_pause()) + + def clear_pause_generator(self): + self.set_pause_generator(None) + + async def _run_pause(self): + for val in self._pause_generator: + self.pause = val + await RisingEdge(self.clock) + + +class AxiStreamSource(AxiStreamBase, AxiStreamPause): + + _type = "source" + + _init_x = True + + _valid_init = 0 + _ready_init = None + async def send(self, frame): self.send_nowait(frame) @@ -338,49 +404,16 @@ class AxiStreamSource(Reset): def write_nowait(self, data): self.send_nowait(data) - def count(self): - return len(self.queue) - - def empty(self): - return not self.queue - def idle(self): return self.empty() and not self.active - def clear(self): - self.queue.clear() - self.queue_occupancy_bytes = 0 - self.queue_occupancy_frames = 0 - async def wait(self): while not self.idle(): await RisingEdge(self.clock) - def set_pause_generator(self, generator=None): - if self._pause_cr is not None: - self._pause_cr.kill() - self._pause_cr = None - - self._pause_generator = generator - - if self._pause_generator is not None: - self._pause_cr = cocotb.fork(self._run_pause()) - - def clear_pause_generator(self): - self.set_pause_generator(None) - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._run_cr is not None: - self._run_cr.kill() - self._run_cr = None - else: - self.log.info("Reset de-asserted") - if self._run_cr is None: - self._run_cr = cocotb.fork(self._run()) + super()._handle_reset(state) - self.active = False self.bus.tdata <= 0 if hasattr(self.bus, "tvalid"): self.bus.tvalid <= 0 @@ -459,101 +492,25 @@ class AxiStreamSource(Reset): self.bus.tlast <= 0 self.active = bool(frame) - async def _run_pause(self): - for val in self._pause_generator: - self.pause = val - await RisingEdge(self.clock) +class AxiStreamMonitor(AxiStreamBase): -class AxiStreamSink(Reset): + _type = "monitor" - _signals = ["tdata"] - _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] + _init_x = False + + _valid_init = None + _ready_init = None def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): - self.log = logging.getLogger(f"cocotb.{entity._name}.{name}") - self.entity = entity - self.clock = clock - self.reset = reset - self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs) + super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs) - self.log.info("AXI stream sink") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") - - super().__init__(*args, **kwargs) - - self.active = False - self.queue = deque() - self.sync = Event() self.read_queue = [] - self.pause = False - self._pause_generator = None - self._pause_cr = None - - self.queue_occupancy_bytes = 0 - self.queue_occupancy_frames = 0 - self.queue_occupancy_limit_bytes = None - self.queue_occupancy_limit_frames = None - - self.width = len(self.bus.tdata) - self.byte_lanes = 1 - - if hasattr(self.bus, "tready"): - self.bus.tready.setimmediatevalue(0) - - if hasattr(self.bus, "tkeep"): - self.byte_lanes = len(self.bus.tkeep) - if byte_size is not None or byte_lanes is not None: - raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected") - else: - if byte_lanes is not None: - self.byte_lanes = byte_lanes - if byte_size is not None: - raise ValueError("Cannot specify both byte_size and byte_lanes") - elif byte_size is not None: - self.byte_lanes = self.width // byte_size - - self.byte_size = self.width // self.byte_lanes - self.byte_mask = 2**self.byte_size-1 - - self.log.info("AXI stream sink configuration:") - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") - self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") - self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") - if hasattr(self.bus, "tkeep"): - self.log.info(" tkeep width: %d bits", len(self.bus.tkeep)) - else: - self.log.info(" tkeep: not present") - if hasattr(self.bus, "tid"): - self.log.info(" tid width: %d bits", len(self.bus.tid)) - else: - self.log.info(" tid: not present") - if hasattr(self.bus, "tdest"): - self.log.info(" tdest width: %d bits", len(self.bus.tdest)) - else: - self.log.info(" tdest: not present") - if hasattr(self.bus, "tuser"): - self.log.info(" tuser width: %d bits", len(self.bus.tuser)) - else: - self.log.info(" tuser: not present") - - if self.byte_lanes * self.byte_size != self.width: - raise ValueError(f"Bus does not evenly divide into byte lanes " - f"({self.byte_lanes} * {self.byte_size} != {self.width})") - - self._run_cr = None - - self._init_reset(reset) - async def recv(self, compact=True): while self.empty(): - self.sync.clear() - await self.sync.wait() + self.queue_sync.clear() + await self.queue_sync.wait() return self.recv_nowait(compact) def recv_nowait(self, compact=True): @@ -582,62 +539,88 @@ class AxiStreamSink(Reset): del self.read_queue[:count] return data - def count(self): - return len(self.queue) - - def empty(self): - return not self.queue - - def full(self): - if self.queue_occupancy_limit_bytes and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes: - return True - elif self.queue_occupancy_limit_frames and self.queue_occupancy_frames > self.queue_occupancy_limit_frames: - return True - else: - return False - def idle(self): return not self.active - def clear(self): - self.queue.clear() - self.queue_occupancy_bytes = 0 - self.queue_occupancy_frames = 0 - async def wait(self, timeout=0, timeout_unit='ns'): if not self.empty(): return - self.sync.clear() + self.queue_sync.clear() if timeout: - await First(self.sync.wait(), Timer(timeout, timeout_unit)) + await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) else: - await self.sync.wait() + await self.queue_sync.wait() - def set_pause_generator(self, generator=None): - if self._pause_cr is not None: - self._pause_cr.kill() - self._pause_cr = None + async def _run(self): + frame = None + self.active = False - self._pause_generator = generator + while True: + await RisingEdge(self.clock) - if self._pause_generator is not None: - self._pause_cr = cocotb.fork(self._run_pause()) + # read handshake signals + tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value + tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value - def clear_pause_generator(self): - self.set_pause_generator(None) + if tready_sample and tvalid_sample: + if frame is None: + if self.byte_size == 8: + frame = AxiStreamFrame(bytearray(), [], [], [], []) + else: + frame = AxiStreamFrame([], [], [], [], []) + frame.sim_time_start = get_sim_time() + + for offset in range(self.byte_lanes): + + frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask) + if hasattr(self.bus, "tkeep"): + frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1) + if hasattr(self.bus, "tid"): + frame.tid.append(self.bus.tid.value.integer) + if hasattr(self.bus, "tdest"): + frame.tdest.append(self.bus.tdest.value.integer) + if hasattr(self.bus, "tuser"): + frame.tuser.append(self.bus.tuser.value.integer) + + if not hasattr(self.bus, "tlast") or self.bus.tlast.value: + frame.sim_time_end = get_sim_time() + self.log.info("RX frame: %s", frame) + + self.queue_occupancy_bytes += len(frame) + self.queue_occupancy_frames += 1 + + self.queue.append(frame) + self.queue_sync.set() + + frame = None + + +class AxiStreamSink(AxiStreamMonitor, AxiStreamPause): + + _type = "sink" + + _init_x = False + + _valid_init = None + _ready_init = 0 + + def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): + super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs) + + self.queue_occupancy_limit_bytes = -1 + self.queue_occupancy_limit_frames = -1 + + def full(self): + if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes: + return True + elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames: + return True + else: + return False def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._run_cr is not None: - self._run_cr.kill() - self._run_cr = None - else: - self.log.info("Reset de-asserted") - if self._run_cr is None: - self._run_cr = cocotb.fork(self._run()) + super()._handle_reset(state) - self.active = False if hasattr(self.bus, "tready"): self.bus.tready <= 0 @@ -680,199 +663,9 @@ class AxiStreamSink(Reset): self.queue_occupancy_frames += 1 self.queue.append(frame) - self.sync.set() + self.queue_sync.set() frame = None if hasattr(self.bus, "tready"): self.bus.tready <= (not self.full() and not self.pause) - - async def _run_pause(self): - for val in self._pause_generator: - self.pause = val - await RisingEdge(self.clock) - - -class AxiStreamMonitor(Reset): - - _signals = ["tdata"] - _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] - - def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): - self.log = logging.getLogger(f"cocotb.{entity._name}.{name}") - self.entity = entity - self.clock = clock - self.reset = reset - self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs) - - self.log.info("AXI stream monitor") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") - - super().__init__(*args, **kwargs) - - self.active = False - self.queue = deque() - self.sync = Event() - self.read_queue = [] - - self.queue_occupancy_bytes = 0 - self.queue_occupancy_frames = 0 - - self.width = len(self.bus.tdata) - self.byte_lanes = 1 - - if hasattr(self.bus, "tkeep"): - self.byte_lanes = len(self.bus.tkeep) - if byte_size is not None or byte_lanes is not None: - raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected") - else: - if byte_lanes is not None: - self.byte_lanes = byte_lanes - if byte_size is not None: - raise ValueError("Cannot specify both byte_size and byte_lanes") - elif byte_size is not None: - self.byte_lanes = self.width // byte_size - - self.byte_size = self.width // self.byte_lanes - self.byte_mask = 2**self.byte_size-1 - - self.log.info("AXI stream monitor configuration:") - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present") - self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present") - self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present") - if hasattr(self.bus, "tkeep"): - self.log.info(" tkeep width: %d bits", len(self.bus.tkeep)) - else: - self.log.info(" tkeep: not present") - if hasattr(self.bus, "tid"): - self.log.info(" tid width: %d bits", len(self.bus.tid)) - else: - self.log.info(" tid: not present") - if hasattr(self.bus, "tdest"): - self.log.info(" tdest width: %d bits", len(self.bus.tdest)) - else: - self.log.info(" tdest: not present") - if hasattr(self.bus, "tuser"): - self.log.info(" tuser width: %d bits", len(self.bus.tuser)) - else: - self.log.info(" tuser: not present") - - if self.byte_lanes * self.byte_size != self.width: - raise ValueError(f"Bus does not evenly divide into byte lanes " - f"({self.byte_lanes} * {self.byte_size} != {self.width})") - - self._run_cr = None - - self._init_reset(reset) - - async def recv(self, compact=True): - while self.empty(): - self.sync.clear() - await self.sync.wait() - return self.recv_nowait(compact) - - def recv_nowait(self, compact=True): - if self.queue: - frame = self.queue.popleft() - self.queue_occupancy_bytes -= len(frame) - self.queue_occupancy_frames -= 1 - if compact: - frame.compact() - return frame - return None - - async def read(self, count=-1): - while not self.read_queue: - frame = await self.recv(compact=True) - self.read_queue.extend(frame.tdata) - return self.read_nowait(count) - - def read_nowait(self, count=-1): - while not self.empty(): - frame = self.recv_nowait(compact=True) - self.read_queue.extend(frame.tdata) - if count < 0: - count = len(self.read_queue) - data = self.read_queue[:count] - del self.read_queue[:count] - return data - - def count(self): - return len(self.queue) - - def empty(self): - return not self.queue - - def idle(self): - return not self.active - - def clear(self): - self.queue.clear() - self.queue_occupancy_bytes = 0 - self.queue_occupancy_frames = 0 - - async def wait(self, timeout=0, timeout_unit='ns'): - if not self.empty(): - return - self.sync.clear() - if timeout: - await First(self.sync.wait(), Timer(timeout, timeout_unit)) - else: - await self.sync.wait() - - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._run_cr is not None: - self._run_cr.kill() - self._run_cr = None - else: - self.log.info("Reset de-asserted") - if self._run_cr is None: - self._run_cr = cocotb.fork(self._run()) - - self.active = False - - async def _run(self): - frame = None - self.active = False - - while True: - await RisingEdge(self.clock) - - # read handshake signals - tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value - tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value - - if tready_sample and tvalid_sample: - if frame is None: - if self.byte_size == 8: - frame = AxiStreamFrame(bytearray(), [], [], [], []) - else: - frame = AxiStreamFrame([], [], [], [], []) - frame.sim_time_start = get_sim_time() - - for offset in range(self.byte_lanes): - - frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask) - if hasattr(self.bus, "tkeep"): - frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1) - if hasattr(self.bus, "tid"): - frame.tid.append(self.bus.tid.value.integer) - if hasattr(self.bus, "tdest"): - frame.tdest.append(self.bus.tdest.value.integer) - if hasattr(self.bus, "tuser"): - frame.tuser.append(self.bus.tuser.value.integer) - - if not hasattr(self.bus, "tlast") or self.bus.tlast.value: - frame.sim_time_end = get_sim_time() - self.log.info("RX frame: %s", frame) - - self.queue.append(frame) - self.sync.set() - - frame = None