diff --git a/cocotbext/axi/stream.py b/cocotbext/axi/stream.py index a2af96c..3d94db5 100644 --- a/cocotbext/axi/stream.py +++ b/cocotbext/axi/stream.py @@ -216,17 +216,17 @@ class StreamSource(StreamBase, StreamPause): await self.drive(self.queue.popleft()) -class StreamSink(StreamBase, StreamPause): +class StreamMonitor(StreamBase): _init_x = False _valid_init = None - _ready_init = 0 + _ready_init = None def __init__(self, entity, name, clock, reset=None, *args, **kwargs): super().__init__(entity, name, clock, reset, *args, **kwargs) - cocotb.fork(self._run_sink()) + cocotb.fork(self._run()) async def recv(self): while self.empty(): @@ -248,7 +248,33 @@ class StreamSink(StreamBase, StreamPause): else: await self.queue_sync.wait() - async def _run_sink(self): + async def _run(self): + while True: + await RisingEdge(self.clock) + + # read handshake signals + ready_sample = self.ready is None or self.ready.value + valid_sample = self.valid is None or self.valid.value + + if self.reset is not None and self.reset.value: + self.clear() + continue + + if ready_sample and valid_sample: + obj = self._transaction_obj() + self.bus.sample(obj) + self.queue.append(obj) + self.queue_sync.set() + + +class StreamSink(StreamMonitor, StreamPause): + + _init_x = False + + _valid_init = None + _ready_init = 0 + + async def _run(self): while True: await RisingEdge(self.clock) @@ -272,63 +298,6 @@ class StreamSink(StreamBase, StreamPause): self.ready <= (not self.pause) -class StreamMonitor(StreamBase): - - _init_x = False - - _valid_init = None - _ready_init = None - - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): - super().__init__(entity, name, clock, reset, *args, **kwargs) - - cocotb.fork(self._run_monitor()) - - async def recv(self): - while self.empty(): - self.queue_sync.clear() - await self.queue_sync.wait() - return self.recv_nowait() - - def recv_nowait(self): - if self.queue: - return self.queue.popleft() - return None - - def count(self): - return len(self.queue) - - def empty(self): - return not self.queue - - async def wait(self, timeout=0, timeout_unit=None): - if not self.empty(): - return - self.queue_sync.clear() - if timeout: - await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) - else: - await self.queue_sync.wait() - - async def _run_monitor(self): - while True: - await RisingEdge(self.clock) - - # read handshake signals - ready_sample = self.ready is None or self.ready.value - valid_sample = self.valid is None or self.valid.value - - if self.reset is not None and self.reset.value: - self.clear() - continue - - if ready_sample and valid_sample: - obj = self._transaction_obj() - self.bus.sample(obj) - self.queue.append(obj) - self.queue_sync.set() - - def define_stream(name, signals, optional_signals=None, valid_signal=None, ready_signal=None, signal_widths=None): all_signals = signals.copy()