diff --git a/cocotbext/axi/stream.py b/cocotbext/axi/stream.py index 89c1f4d..ef48794 100644 --- a/cocotbext/axi/stream.py +++ b/cocotbext/axi/stream.py @@ -29,6 +29,8 @@ import cocotb from cocotb.triggers import RisingEdge, Event, First, Timer from cocotb.bus import Bus +from .reset import Reset + class StreamTransaction: @@ -48,7 +50,7 @@ class StreamTransaction: return f"{type(self).__name__}({', '.join(f'{s}={int(getattr(self, s))}' for s in self._signals)})" -class StreamBase: +class StreamBase(Reset): _signals = ["data", "valid", "ready"] _optional_signals = [] @@ -73,6 +75,8 @@ class StreamBase: super().__init__(*args, **kwargs) + self.active = False + self.queue = deque() self.queue_sync = Event() @@ -98,6 +102,10 @@ class StreamBase: v.binstr = 'x'*len(v) getattr(self.bus, sig).setimmediatevalue(v) + self._run_cr = None + + self._init_reset(reset) + def count(self): return len(self.queue) @@ -107,6 +115,22 @@ class StreamBase: def clear(self): self.queue.clear() + def _handle_reset(self, state): + if state: + self.log.info("Reset asserted") + if self._run_cr is not None: + self._run_cr.kill() + self._run_cr = None + else: + self.log.info("Reset de-asserted") + if self._run_cr is None: + self._run_cr = cocotb.fork(self._run()) + + self.active = False + + async def _run(self): + raise NotImplementedError() + class StreamPause: def __init__(self, *args, **kwargs): @@ -142,13 +166,6 @@ class StreamSource(StreamBase, StreamPause): _valid_init = 0 _ready_init = None - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): - super().__init__(entity, name, clock, reset, *args, **kwargs) - - self.active = False - - cocotb.fork(self._run()) - async def send(self, obj): self.send_nowait(obj) @@ -162,6 +179,12 @@ class StreamSource(StreamBase, StreamPause): while not self.idle(): await RisingEdge(self.clock) + def _handle_reset(self, state): + super()._handle_reset(state) + + if self.valid is not None: + self.valid <= 0 + async def _run(self): while True: await RisingEdge(self.clock) @@ -170,13 +193,6 @@ class StreamSource(StreamBase, StreamPause): ready_sample = self.ready is None or self.ready.value valid_sample = self.valid is None or self.valid.value - if self.reset is not None and self.reset.value: - self.clear() - if self.valid is not None: - self.valid <= 0 - self.active = False - continue - if (ready_sample and valid_sample) or (not valid_sample): if self.queue and not self.pause: self.bus.drive(self.queue.popleft()) @@ -196,11 +212,6 @@ class StreamMonitor(StreamBase): _valid_init = None _ready_init = None - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): - super().__init__(entity, name, clock, reset, *args, **kwargs) - - cocotb.fork(self._run()) - async def recv(self): while self.empty(): self.queue_sync.clear() @@ -229,10 +240,6 @@ class StreamMonitor(StreamBase): ready_sample = self.ready is None or self.ready.value valid_sample = self.valid is None or self.valid.value - if self.reset is not None and self.reset.value: - self.clear() - continue - if ready_sample and valid_sample: obj = self._transaction_obj() self.bus.sample(obj) @@ -258,6 +265,12 @@ class StreamSink(StreamMonitor, StreamPause): else: return False + def _handle_reset(self, state): + super()._handle_reset(state) + + if self.ready is not None: + self.ready <= 0 + async def _run(self): while True: await RisingEdge(self.clock) @@ -266,12 +279,6 @@ class StreamSink(StreamMonitor, StreamPause): ready_sample = self.ready is None or self.ready.value valid_sample = self.valid is None or self.valid.value - if self.reset is not None and self.reset.value: - self.clear() - if self.ready is not None: - self.ready <= 0 - continue - if ready_sample and valid_sample: obj = self._transaction_obj() self.bus.sample(obj)