From 4bee96ea9aa096b3b7201b669bc4d1827c054fcb Mon Sep 17 00:00:00 2001 From: Alex Forencich Date: Sun, 21 Mar 2021 22:24:59 -0700 Subject: [PATCH] Enforce max queue depth on streaming sources --- README.md | 2 +- cocotbext/axi/axis.py | 26 +++++++++++++++++++++++++- cocotbext/axi/stream.py | 21 ++++++++++++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 59ac178..c519cac 100644 --- a/README.md +++ b/README.md @@ -270,7 +270,7 @@ Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related, * `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor) * `count()`: returns the number of items in the queue (all) * `empty()`: returns _True_ if the queue is empty (all) -* `full()`: returns _True_ if the queue occupancy limits are met (sink) +* `full()`: returns _True_ if the queue occupancy limits are met (source/sink) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `clear()`: drop all data in queue (all) * `wait()`: wait for idle (source) diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index e746c1d..bb663cc 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -25,7 +25,7 @@ THE SOFTWARE. import logging import cocotb -from cocotb.queue import Queue +from cocotb.queue import Queue, QueueFull from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.utils import get_sim_time from cocotb_bus.bus import Bus @@ -277,6 +277,7 @@ class AxiStreamBase(Reset): self.active = False self.queue = Queue() + self.dequeue_event = Event() self.current_frame = None self.idle_event = Event() self.idle_event.set() @@ -357,6 +358,7 @@ class AxiStreamBase(Reset): frame = self.queue.get_nowait() frame.sim_time_end = None frame.handle_tx_complete() + self.dequeue_event.set() self.idle_event.set() self.active_event.clear() self.queue_occupancy_bytes = 0 @@ -418,7 +420,18 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): _valid_init = 0 _ready_init = None + def __init__(self, bus, clock, reset=None, reset_active_level=True, + byte_size=None, byte_lanes=None, *args, **kwargs): + + super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs) + + self.queue_occupancy_limit_bytes = -1 + self.queue_occupancy_limit_frames = -1 + async def send(self, frame): + while self.full(): + self.dequeue_event.clear() + await self.dequeue_event.wait() frame = AxiStreamFrame(frame) await self.queue.put(frame) self.idle_event.clear() @@ -426,6 +439,8 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): self.queue_occupancy_frames += 1 def send_nowait(self, frame): + if self.full(): + raise QueueFull() frame = AxiStreamFrame(frame) self.queue.put_nowait(frame) self.idle_event.clear() @@ -438,6 +453,14 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): def write_nowait(self, data): self.send_nowait(data) + def full(self): + if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes: + return True + elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames: + return True + else: + return False + def idle(self): return self.empty() and not self.active @@ -481,6 +504,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): if (tready_sample and tvalid_sample) or not tvalid_sample: if frame is None and not self.queue.empty(): frame = self.queue.get_nowait() + self.dequeue_event.set() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 self.current_frame = frame diff --git a/cocotbext/axi/stream.py b/cocotbext/axi/stream.py index 299172a..7dafd3a 100644 --- a/cocotbext/axi/stream.py +++ b/cocotbext/axi/stream.py @@ -25,7 +25,7 @@ THE SOFTWARE. import logging import cocotb -from cocotb.queue import Queue +from cocotb.queue import Queue, QueueFull from cocotb.triggers import RisingEdge, Event, First, Timer from cocotb_bus.bus import Bus @@ -95,6 +95,7 @@ class StreamBase(Reset): self.active = False self.queue = Queue() + self.dequeue_event = Event() self.idle_event = Event() self.idle_event.set() self.active_event = Event() @@ -134,6 +135,7 @@ class StreamBase(Reset): def clear(self): while not self.queue.empty(): self.queue.get_nowait() + self.dequeue_event.set() self.idle_event.set() self.active_event.clear() @@ -191,14 +193,30 @@ class StreamSource(StreamBase, StreamPause): _valid_init = 0 _ready_init = None + def __init__(self, bus, clock, reset=None, reset_active_level=True, *args, **kwargs): + super().__init__(bus, clock, reset, reset_active_level, *args, **kwargs) + + self.queue_occupancy_limit = -1 + async def send(self, obj): + while self.full(): + self.dequeue_event.clear() + await self.dequeue_event.wait() await self.queue.put(obj) self.idle_event.clear() def send_nowait(self, obj): + if self.full(): + raise QueueFull() self.queue.put_nowait(obj) self.idle_event.clear() + def full(self): + if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit: + return True + else: + return False + def idle(self): return self.empty() and not self.active @@ -223,6 +241,7 @@ class StreamSource(StreamBase, StreamPause): if (ready_sample and valid_sample) or (not valid_sample): if not self.queue.empty() and not self.pause: self.bus.drive(self.queue.get_nowait()) + self.dequeue_event.set() if self.valid is not None: self.valid <= 1 self.active = True