Use cocotb async queues
This commit is contained in:
@@ -23,9 +23,9 @@ THE SOFTWARE.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from collections import deque
|
||||
|
||||
import cocotb
|
||||
from cocotb.queue import Queue
|
||||
from cocotb.triggers import RisingEdge, Event, First, Timer
|
||||
from cocotb_bus.bus import Bus
|
||||
|
||||
@@ -94,8 +94,10 @@ class StreamBase(Reset):
|
||||
|
||||
self.active = False
|
||||
|
||||
self.queue = deque()
|
||||
self.queue_sync = Event()
|
||||
self.queue = Queue()
|
||||
self.idle_event = Event()
|
||||
self.idle_event.set()
|
||||
self.active_event = Event()
|
||||
|
||||
self.ready = None
|
||||
self.valid = None
|
||||
@@ -124,13 +126,16 @@ class StreamBase(Reset):
|
||||
self._init_reset(reset, reset_active_level)
|
||||
|
||||
def count(self):
|
||||
return len(self.queue)
|
||||
return self.queue.qsize()
|
||||
|
||||
def empty(self):
|
||||
return not self.queue
|
||||
return self.queue.empty()
|
||||
|
||||
def clear(self):
|
||||
self.queue.clear()
|
||||
while not self.queue.empty():
|
||||
self.queue.get_nowait()
|
||||
self.idle_event.set()
|
||||
self.active_event.clear()
|
||||
|
||||
def _handle_reset(self, state):
|
||||
if state:
|
||||
@@ -184,17 +189,18 @@ class StreamSource(StreamBase, StreamPause):
|
||||
_ready_init = None
|
||||
|
||||
async def send(self, obj):
|
||||
self.send_nowait(obj)
|
||||
await self.queue.put(obj)
|
||||
self.idle_event.clear()
|
||||
|
||||
def send_nowait(self, obj):
|
||||
self.queue.append(obj)
|
||||
self.queue.put_nowait(obj)
|
||||
self.idle_event.clear()
|
||||
|
||||
def idle(self):
|
||||
return self.empty() and not self.active
|
||||
|
||||
async def wait(self):
|
||||
while not self.idle():
|
||||
await RisingEdge(self.clock)
|
||||
await self.idle_event.wait()
|
||||
|
||||
def _handle_reset(self, state):
|
||||
super()._handle_reset(state)
|
||||
@@ -211,15 +217,17 @@ class StreamSource(StreamBase, StreamPause):
|
||||
valid_sample = self.valid is None or self.valid.value
|
||||
|
||||
if (ready_sample and valid_sample) or (not valid_sample):
|
||||
if self.queue and not self.pause:
|
||||
self.bus.drive(self.queue.popleft())
|
||||
if not self.queue.empty() and not self.pause:
|
||||
self.bus.drive(self.queue.get_nowait())
|
||||
if self.queue.empty():
|
||||
self.idle_event.set()
|
||||
if self.valid is not None:
|
||||
self.valid <= 1
|
||||
self.active = True
|
||||
else:
|
||||
if self.valid is not None:
|
||||
self.valid <= 0
|
||||
self.active = bool(self.queue)
|
||||
self.active = not self.queue.empty()
|
||||
|
||||
|
||||
class StreamMonitor(StreamBase):
|
||||
@@ -230,24 +238,26 @@ class StreamMonitor(StreamBase):
|
||||
_ready_init = None
|
||||
|
||||
async def recv(self):
|
||||
while self.empty():
|
||||
self.queue_sync.clear()
|
||||
await self.queue_sync.wait()
|
||||
return self.recv_nowait()
|
||||
item = await self.queue.get()
|
||||
if self.queue.empty():
|
||||
self.active_event.clear()
|
||||
return item
|
||||
|
||||
def recv_nowait(self):
|
||||
if self.queue:
|
||||
return self.queue.popleft()
|
||||
if not self.queue.empty():
|
||||
item = self.queue.get_nowait()
|
||||
if self.queue.empty():
|
||||
self.active_event.clear()
|
||||
return item
|
||||
return None
|
||||
|
||||
async def wait(self, timeout=0, timeout_unit=None):
|
||||
if not self.empty():
|
||||
return
|
||||
self.queue_sync.clear()
|
||||
if timeout:
|
||||
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
|
||||
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
|
||||
else:
|
||||
await self.queue_sync.wait()
|
||||
await self.active_event.wait()
|
||||
|
||||
async def _run(self):
|
||||
while True:
|
||||
@@ -260,8 +270,8 @@ class StreamMonitor(StreamBase):
|
||||
if ready_sample and valid_sample:
|
||||
obj = self._transaction_obj()
|
||||
self.bus.sample(obj)
|
||||
self.queue.append(obj)
|
||||
self.queue_sync.set()
|
||||
self.queue.put_nowait(obj)
|
||||
self.active_event.set()
|
||||
|
||||
|
||||
class StreamSink(StreamMonitor, StreamPause):
|
||||
@@ -277,7 +287,7 @@ class StreamSink(StreamMonitor, StreamPause):
|
||||
self.queue_occupancy_limit = -1
|
||||
|
||||
def full(self):
|
||||
if self.queue_occupancy_limit > 0 and len(self.queue) >= self.queue_occupancy_limit:
|
||||
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
@@ -299,8 +309,8 @@ class StreamSink(StreamMonitor, StreamPause):
|
||||
if ready_sample and valid_sample:
|
||||
obj = self._transaction_obj()
|
||||
self.bus.sample(obj)
|
||||
self.queue.append(obj)
|
||||
self.queue_sync.set()
|
||||
self.queue.put_nowait(obj)
|
||||
self.active_event.set()
|
||||
|
||||
if self.ready is not None:
|
||||
self.ready <= (not self.full() and not self.pause)
|
||||
|
||||
Reference in New Issue
Block a user