Enforce max queue depth on streaming sources

This commit is contained in:
Alex Forencich
2021-03-21 22:24:59 -07:00
parent a66dfea6f7
commit 4bee96ea9a
3 changed files with 46 additions and 3 deletions

View File

@@ -270,7 +270,7 @@ Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related,
* `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor) * `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor)
* `count()`: returns the number of items in the queue (all) * `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all) * `empty()`: returns _True_ if the queue is empty (all)
* `full()`: returns _True_ if the queue occupancy limits are met (sink) * `full()`: returns _True_ if the queue occupancy limits are met (source/sink)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source) * `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all) * `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source) * `wait()`: wait for idle (source)

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
import logging import logging
import cocotb import cocotb
from cocotb.queue import Queue from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time from cocotb.utils import get_sim_time
from cocotb_bus.bus import Bus from cocotb_bus.bus import Bus
@@ -277,6 +277,7 @@ class AxiStreamBase(Reset):
self.active = False self.active = False
self.queue = Queue() self.queue = Queue()
self.dequeue_event = Event()
self.current_frame = None self.current_frame = None
self.idle_event = Event() self.idle_event = Event()
self.idle_event.set() self.idle_event.set()
@@ -357,6 +358,7 @@ class AxiStreamBase(Reset):
frame = self.queue.get_nowait() frame = self.queue.get_nowait()
frame.sim_time_end = None frame.sim_time_end = None
frame.handle_tx_complete() frame.handle_tx_complete()
self.dequeue_event.set()
self.idle_event.set() self.idle_event.set()
self.active_event.clear() self.active_event.clear()
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
@@ -418,7 +420,18 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
_valid_init = 0 _valid_init = 0
_ready_init = None _ready_init = None
def __init__(self, bus, clock, reset=None, reset_active_level=True,
byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1
async def send(self, frame): async def send(self, frame):
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
frame = AxiStreamFrame(frame) frame = AxiStreamFrame(frame)
await self.queue.put(frame) await self.queue.put(frame)
self.idle_event.clear() self.idle_event.clear()
@@ -426,6 +439,8 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
def send_nowait(self, frame): def send_nowait(self, frame):
if self.full():
raise QueueFull()
frame = AxiStreamFrame(frame) frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame) self.queue.put_nowait(frame)
self.idle_event.clear() self.idle_event.clear()
@@ -438,6 +453,14 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
def write_nowait(self, data): def write_nowait(self, data):
self.send_nowait(data) self.send_nowait(data)
def full(self):
if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames:
return True
else:
return False
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
@@ -481,6 +504,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
if (tready_sample and tvalid_sample) or not tvalid_sample: if (tready_sample and tvalid_sample) or not tvalid_sample:
if frame is None and not self.queue.empty(): if frame is None and not self.queue.empty():
frame = self.queue.get_nowait() frame = self.queue.get_nowait()
self.dequeue_event.set()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
self.current_frame = frame self.current_frame = frame

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
import logging import logging
import cocotb import cocotb
from cocotb.queue import Queue from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Event, First, Timer from cocotb.triggers import RisingEdge, Event, First, Timer
from cocotb_bus.bus import Bus from cocotb_bus.bus import Bus
@@ -95,6 +95,7 @@ class StreamBase(Reset):
self.active = False self.active = False
self.queue = Queue() self.queue = Queue()
self.dequeue_event = Event()
self.idle_event = Event() self.idle_event = Event()
self.idle_event.set() self.idle_event.set()
self.active_event = Event() self.active_event = Event()
@@ -134,6 +135,7 @@ class StreamBase(Reset):
def clear(self): def clear(self):
while not self.queue.empty(): while not self.queue.empty():
self.queue.get_nowait() self.queue.get_nowait()
self.dequeue_event.set()
self.idle_event.set() self.idle_event.set()
self.active_event.clear() self.active_event.clear()
@@ -191,14 +193,30 @@ class StreamSource(StreamBase, StreamPause):
_valid_init = 0 _valid_init = 0
_ready_init = None _ready_init = None
def __init__(self, bus, clock, reset=None, reset_active_level=True, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, *args, **kwargs)
self.queue_occupancy_limit = -1
async def send(self, obj): async def send(self, obj):
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
await self.queue.put(obj) await self.queue.put(obj)
self.idle_event.clear() self.idle_event.clear()
def send_nowait(self, obj): def send_nowait(self, obj):
if self.full():
raise QueueFull()
self.queue.put_nowait(obj) self.queue.put_nowait(obj)
self.idle_event.clear() self.idle_event.clear()
def full(self):
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True
else:
return False
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
@@ -223,6 +241,7 @@ class StreamSource(StreamBase, StreamPause):
if (ready_sample and valid_sample) or (not valid_sample): if (ready_sample and valid_sample) or (not valid_sample):
if not self.queue.empty() and not self.pause: if not self.queue.empty() and not self.pause:
self.bus.drive(self.queue.get_nowait()) self.bus.drive(self.queue.get_nowait())
self.dequeue_event.set()
if self.valid is not None: if self.valid is not None:
self.valid <= 1 self.valid <= 1
self.active = True self.active = True