diff --git a/cocotbext/axi/axi_master.py b/cocotbext/axi/axi_master.py index 19778c6..d16abde 100644 --- a/cocotbext/axi/axi_master.py +++ b/cocotbext/axi/axi_master.py @@ -23,9 +23,10 @@ THE SOFTWARE. """ import logging -from collections import deque, namedtuple, Counter +from collections import namedtuple, Counter import cocotb +from cocotb.queue import Queue from cocotb.triggers import Event from .version import __version__ @@ -61,18 +62,15 @@ class AxiMasterWrite(Reset): self.w_channel = AxiWSource(bus.w, clock, reset, reset_active_level) self.b_channel = AxiBSink(bus.b, clock, reset, reset_active_level) - self.write_command_queue = deque() - self.write_command_sync = Event() - self.write_resp_queue = deque() - self.write_resp_sync = Event() + self.write_command_queue = Queue() + self.write_resp_queue = Queue() self.id_count = 2**len(self.aw_channel.bus.awid) self.cur_id = 0 self.active_id = Counter() - self.int_write_resp_command_queue = deque() - self.int_write_resp_command_sync = Event() - self.int_write_resp_queue_list = [deque() for k in range(self.id_count)] + self.int_write_resp_command_queue = Queue() + self.int_write_resp_queue_list = [Queue() for k in range(self.id_count)] self.in_flight_operations = 0 self._idle = Event() @@ -138,8 +136,7 @@ class AxiMasterWrite(Reset): cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock, cache, prot, qos, region, user, wuser, event) - self.write_command_queue.append(cmd) - self.write_command_sync.set() + self.write_command_queue.put_nowait(cmd) def idle(self): return not self.in_flight_operations @@ -149,11 +146,11 @@ class AxiMasterWrite(Reset): await self._idle.wait() def write_resp_ready(self): - return bool(self.write_resp_queue) + return not self.write_resp_queue.empty() def get_write_resp(self): - if self.write_resp_queue: - return self.write_resp_queue.popleft() + if not self.write_resp_queue.empty(): + return self.write_resp_queue.get_nowait() return None async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, @@ -220,28 +217,25 @@ class AxiMasterWrite(Reset): self.w_channel.clear() self.b_channel.clear() - while self.write_command_queue: - cmd = self.write_command_queue.popleft() + while not self.write_command_queue.empty(): + cmd = self.write_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - while self.int_write_resp_command_queue: - cmd = self.int_write_resp_command_queue.popleft() + while not self.int_write_resp_command_queue.empty(): + cmd = self.int_write_resp_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - self.write_resp_queue.clear() + while not self.write_resp_queue.empty(): + self.write_resp_queue.get_nowait() self.in_flight_operations = 0 self._idle.set() async def _process_write(self): while True: - if not self.write_command_queue: - self.write_command_sync.clear() - await self.write_command_sync.wait() - - cmd = self.write_command_queue.popleft() + cmd = await self.write_command_queue.get() num_bytes = 2**cmd.size @@ -340,22 +334,17 @@ class AxiMasterWrite(Reset): cycle_offset = (cycle_offset + num_bytes) % self.byte_width resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event) - self.int_write_resp_command_queue.append(resp_cmd) - self.int_write_resp_command_sync.set() + await self.int_write_resp_command_queue.put(resp_cmd) async def _process_write_resp(self): while True: - if not self.int_write_resp_command_queue: - self.int_write_resp_command_sync.clear() - await self.int_write_resp_command_sync.wait() - - cmd = self.int_write_resp_command_queue.popleft() + cmd = await self.int_write_resp_command_queue.get() resp = AxiResp.OKAY user = [] for bid, burst_length in cmd.burst_list: - while not self.int_write_resp_queue_list[bid]: + while self.int_write_resp_queue_list[bid].empty(): b = await self.b_channel.recv() i = int(b.bid) @@ -363,9 +352,9 @@ class AxiMasterWrite(Reset): if self.active_id[i] <= 0: raise Exception(f"Unexpected burst ID {bid}") - self.int_write_resp_queue_list[i].append(b) + self.int_write_resp_queue_list[i].put_nowait(b) - b = self.int_write_resp_queue_list[bid].popleft() + b = self.int_write_resp_queue_list[bid].get_nowait() burst_id = int(b.bid) burst_resp = AxiResp(b.bresp) @@ -392,8 +381,7 @@ class AxiMasterWrite(Reset): if cmd.event is not None: cmd.event.set(write_resp) else: - self.write_resp_queue.append(write_resp) - self.write_resp_sync.set() + self.write_resp_queue.put_nowait(write_resp) self.in_flight_operations -= 1 @@ -413,18 +401,15 @@ class AxiMasterRead(Reset): self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level) self.r_channel = AxiRSink(bus.r, clock, reset, reset_active_level) - self.read_command_queue = deque() - self.read_command_sync = Event() - self.read_data_queue = deque() - self.read_data_sync = Event() + self.read_command_queue = Queue() + self.read_data_queue = Queue() self.id_count = 2**len(self.ar_channel.bus.arid) self.cur_id = 0 self.active_id = Counter() - self.int_read_resp_command_queue = deque() - self.int_read_resp_command_sync = Event() - self.int_read_resp_queue_list = [deque() for k in range(self.id_count)] + self.int_read_resp_command_queue = Queue() + self.int_read_resp_queue_list = [Queue() for k in range(self.id_count)] self.in_flight_operations = 0 self._idle = Event() @@ -483,8 +468,7 @@ class AxiMasterRead(Reset): self._idle.clear() cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event) - self.read_command_queue.append(cmd) - self.read_command_sync.set() + self.read_command_queue.put_nowait(cmd) def idle(self): return not self.in_flight_operations @@ -494,11 +478,11 @@ class AxiMasterRead(Reset): await self._idle.wait() def read_data_ready(self): - return bool(self.read_data_queue) + return not self.read_data_queue.empty() def get_read_data(self): - if self.read_data_queue: - return self.read_data_queue.popleft() + if not self.read_data_queue.empty(): + return self.read_data_queue.get_nowait() return None async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None, @@ -564,28 +548,25 @@ class AxiMasterRead(Reset): self.ar_channel.clear() self.r_channel.clear() - while self.read_command_queue: - cmd = self.read_command_queue.popleft() + while not self.read_command_queue.empty(): + cmd = self.read_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - while self.int_read_resp_command_queue: - cmd = self.int_read_resp_command_queue.popleft() + while not self.int_read_resp_command_queue.empty(): + cmd = self.int_read_resp_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - self.read_data_queue.clear() + while not self.read_data_queue.empty(): + self.read_data_queue.get_nowait() self.in_flight_operations = 0 self._idle.set() async def _process_read(self): while True: - if not self.read_command_queue: - self.read_command_sync.clear() - await self.read_command_sync.wait() - - cmd = self.read_command_queue.popleft() + cmd = await self.read_command_queue.get() num_bytes = 2**cmd.size @@ -643,16 +624,11 @@ class AxiMasterRead(Reset): cur_addr += num_bytes resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event) - self.int_read_resp_command_queue.append(resp_cmd) - self.int_read_resp_command_sync.set() + await self.int_read_resp_command_queue.put(resp_cmd) async def _process_read_resp(self): while True: - if not self.int_read_resp_command_queue: - self.int_read_resp_command_sync.clear() - await self.int_read_resp_command_sync.wait() - - cmd = self.int_read_resp_command_queue.popleft() + cmd = await self.int_read_resp_command_queue.get() num_bytes = 2**cmd.size @@ -671,7 +647,7 @@ class AxiMasterRead(Reset): for rid, burst_length in cmd.burst_list: for k in range(burst_length): - while not self.int_read_resp_queue_list[rid]: + while self.int_read_resp_queue_list[rid].empty(): r = await self.r_channel.recv() i = int(r.rid) @@ -679,9 +655,9 @@ class AxiMasterRead(Reset): if self.active_id[i] <= 0: raise Exception(f"Unexpected burst ID {rid}") - self.int_read_resp_queue_list[i].append(r) + self.int_read_resp_queue_list[i].put_nowait(r) - r = self.int_read_resp_queue_list[rid].popleft() + r = self.int_read_resp_queue_list[rid].get_nowait() cycle_id = int(r.rid) cycle_data = int(r.rdata) @@ -727,8 +703,7 @@ class AxiMasterRead(Reset): if cmd.event is not None: cmd.event.set(read_resp) else: - self.read_data_queue.append(read_resp) - self.read_data_sync.set() + self.read_data_queue.put_nowait(read_resp) self.in_flight_operations -= 1 diff --git a/cocotbext/axi/axil_master.py b/cocotbext/axi/axil_master.py index f3bd219..bec22cd 100644 --- a/cocotbext/axi/axil_master.py +++ b/cocotbext/axi/axil_master.py @@ -23,9 +23,10 @@ THE SOFTWARE. """ import logging -from collections import deque, namedtuple +from collections import namedtuple import cocotb +from cocotb.queue import Queue from cocotb.triggers import Event from .version import __version__ @@ -57,13 +58,10 @@ class AxiLiteMasterWrite(Reset): self.w_channel = AxiLiteWSource(bus.w, clock, reset, reset_active_level) self.b_channel = AxiLiteBSink(bus.b, clock, reset, reset_active_level) - self.write_command_queue = deque() - self.write_command_sync = Event() - self.write_resp_queue = deque() - self.write_resp_sync = Event() + self.write_command_queue = Queue() + self.write_resp_queue = Queue() - self.int_write_resp_command_queue = deque() - self.int_write_resp_command_sync = Event() + self.int_write_resp_command_queue = Queue() self.in_flight_operations = 0 self._idle = Event() @@ -94,8 +92,7 @@ class AxiLiteMasterWrite(Reset): self.in_flight_operations += 1 self._idle.clear() - self.write_command_queue.append(AxiLiteWriteCmd(address, bytearray(data), prot, event)) - self.write_command_sync.set() + self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event)) def idle(self): return not self.in_flight_operations @@ -105,11 +102,11 @@ class AxiLiteMasterWrite(Reset): await self._idle.wait() def write_resp_ready(self): - return bool(self.write_resp_queue) + return not self.write_resp_queue.empty() def get_write_resp(self): - if self.write_resp_queue: - return self.write_resp_queue.popleft() + if not self.write_resp_queue.empty(): + return self.write_resp_queue.get_nowait() return None async def write(self, address, data, prot=AxiProt.NONSECURE): @@ -163,28 +160,25 @@ class AxiLiteMasterWrite(Reset): self.w_channel.clear() self.b_channel.clear() - while self.write_command_queue: - cmd = self.write_command_queue.popleft() + while not self.write_command_queue.empty(): + cmd = self.write_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - while self.int_write_resp_command_queue: - cmd = self.int_write_resp_command_queue.popleft() + while not self.int_write_resp_command_queue.empty(): + cmd = self.int_write_resp_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - self.write_resp_queue.clear() + while not self.write_resp_queue.empty(): + self.write_resp_queue.get_nowait() self.in_flight_operations = 0 self._idle.set() async def _process_write(self): while True: - if not self.write_command_queue: - self.write_command_sync.clear() - await self.write_command_sync.wait() - - cmd = self.write_command_queue.popleft() + cmd = await self.write_command_queue.get() word_addr = (cmd.address // self.byte_width) * self.byte_width @@ -197,8 +191,7 @@ class AxiLiteMasterWrite(Reset): cycles = (len(cmd.data) + (cmd.address % self.byte_width) + self.byte_width-1) // self.byte_width resp_cmd = AxiLiteWriteRespCmd(cmd.address, len(cmd.data), cycles, cmd.prot, cmd.event) - self.int_write_resp_command_queue.append(resp_cmd) - self.int_write_resp_command_sync.set() + await self.int_write_resp_command_queue.put(resp_cmd) offset = 0 @@ -235,11 +228,7 @@ class AxiLiteMasterWrite(Reset): async def _process_write_resp(self): while True: - if not self.int_write_resp_command_queue: - self.int_write_resp_command_sync.clear() - await self.int_write_resp_command_sync.wait() - - cmd = self.int_write_resp_command_queue.popleft() + cmd = await self.int_write_resp_command_queue.get() resp = AxiResp.OKAY @@ -259,8 +248,7 @@ class AxiLiteMasterWrite(Reset): if cmd.event is not None: cmd.event.set(write_resp) else: - self.write_resp_queue.append(write_resp) - self.write_resp_sync.set() + self.write_resp_queue.put_nowait(write_resp) self.in_flight_operations -= 1 @@ -280,13 +268,10 @@ class AxiLiteMasterRead(Reset): self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level) self.r_channel = AxiLiteRSink(bus.r, clock, reset, reset_active_level) - self.read_command_queue = deque() - self.read_command_sync = Event() - self.read_data_queue = deque() - self.read_data_sync = Event() + self.read_command_queue = Queue() + self.read_data_queue = Queue() - self.int_read_resp_command_queue = deque() - self.int_read_resp_command_sync = Event() + self.int_read_resp_command_queue = Queue() self.in_flight_operations = 0 self._idle = Event() @@ -315,8 +300,7 @@ class AxiLiteMasterRead(Reset): self.in_flight_operations += 1 self._idle.clear() - self.read_command_queue.append(AxiLiteReadCmd(address, length, prot, event)) - self.read_command_sync.set() + self.read_command_queue.put_nowait(AxiLiteReadCmd(address, length, prot, event)) def idle(self): return not self.in_flight_operations @@ -326,11 +310,11 @@ class AxiLiteMasterRead(Reset): await self._idle.wait() def read_data_ready(self): - return bool(self.read_data_queue) + return not self.read_data_queue.empty() def get_read_data(self): - if self.read_data_queue: - return self.read_data_queue.popleft() + if not self.read_data_queue.empty(): + return self.read_data_queue.get_nowait() return None async def read(self, address, length, prot=AxiProt.NONSECURE): @@ -383,36 +367,32 @@ class AxiLiteMasterRead(Reset): self.ar_channel.clear() self.r_channel.clear() - while self.read_command_queue: - cmd = self.read_command_queue.popleft() + while not self.read_command_queue.empty(): + cmd = self.read_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - while self.int_read_resp_command_queue: - cmd = self.int_read_resp_command_queue.popleft() + while not self.int_read_resp_command_queue.empty(): + cmd = self.int_read_resp_command_queue.get_nowait() if cmd.event: cmd.event.set(None) - self.read_data_queue.clear() + while not self.read_data_queue.empty(): + self.read_data_queue.get_nowait() self.in_flight_operations = 0 self._idle.set() async def _process_read(self): while True: - if not self.read_command_queue: - self.read_command_sync.clear() - await self.read_command_sync.wait() - - cmd = self.read_command_queue.popleft() + cmd = await self.read_command_queue.get() word_addr = (cmd.address // self.byte_width) * self.byte_width cycles = (cmd.length + self.byte_width-1 + (cmd.address % self.byte_width)) // self.byte_width resp_cmd = AxiLiteReadRespCmd(cmd.address, cmd.length, cycles, cmd.prot, cmd.event) - self.int_read_resp_command_queue.append(resp_cmd) - self.int_read_resp_command_sync.set() + await self.int_read_resp_command_queue.put(resp_cmd) self.log.info("Read start addr: 0x%08x prot: %s length: %d", cmd.address, cmd.prot, cmd.length) @@ -426,11 +406,7 @@ class AxiLiteMasterRead(Reset): async def _process_read_resp(self): while True: - if not self.int_read_resp_command_queue: - self.int_read_resp_command_sync.clear() - await self.int_read_resp_command_sync.wait() - - cmd = self.int_read_resp_command_queue.popleft() + cmd = await self.int_read_resp_command_queue.get() start_offset = cmd.address % self.byte_width end_offset = ((cmd.address + cmd.length - 1) % self.byte_width) + 1 @@ -467,8 +443,7 @@ class AxiLiteMasterRead(Reset): if cmd.event is not None: cmd.event.set(read_resp) else: - self.read_data_queue.append(read_resp) - self.read_data_sync.set() + self.read_data_queue.put_nowait(read_resp) self.in_flight_operations -= 1 diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index 8840800..2910d99 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -23,9 +23,9 @@ THE SOFTWARE. """ import logging -from collections import deque import cocotb +from cocotb.queue import Queue from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.utils import get_sim_time from cocotb_bus.bus import Bus @@ -276,8 +276,10 @@ class AxiStreamBase(Reset): super().__init__(*args, **kwargs) self.active = False - self.queue = deque() - self.queue_sync = Event() + self.queue = Queue() + self.idle_event = Event() + self.idle_event.set() + self.active_event = Event() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 @@ -344,13 +346,16 @@ class AxiStreamBase(Reset): self._init_reset(reset, reset_active_level) def count(self): - return len(self.queue) + return self.queue.qlen() def empty(self): - return not self.queue + return self.queue.empty() def clear(self): - self.queue.clear() + while not self.queue.empty(): + self.queue.get_nowait() + self.idle_event.set() + self.active_event.clear() self.queue_occupancy_bytes = 0 self.queue_occupancy_frames = 0 @@ -408,13 +413,18 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): _ready_init = None async def send(self, frame): - self.send_nowait(frame) + frame = AxiStreamFrame(frame) + await self.queue.put(frame) + self.idle_event.clear() + self.queue_occupancy_bytes += len(frame) + self.queue_occupancy_frames += 1 def send_nowait(self, frame): frame = AxiStreamFrame(frame) + self.queue.put_nowait(frame) + self.idle_event.clear() self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 - self.queue.append(frame) async def write(self, data): await self.send(data) @@ -426,8 +436,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): return self.empty() and not self.active async def wait(self): - while not self.idle(): - await RisingEdge(self.clock) + await self.idle_event.wait() def _handle_reset(self, state): super()._handle_reset(state) @@ -458,8 +467,10 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause): tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value if (tready_sample and tvalid_sample) or not tvalid_sample: - if frame is None and self.queue: - frame = self.queue.popleft() + if frame is None and not self.queue.empty(): + frame = self.queue.get_nowait() + if self.queue.empty(): + self.idle_event.set() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 frame.sim_time_start = get_sim_time() @@ -528,14 +539,20 @@ class AxiStreamMonitor(AxiStreamBase): self.read_queue = [] async def recv(self, compact=True): - while self.empty(): - self.queue_sync.clear() - await self.queue_sync.wait() - return self.recv_nowait(compact) + frame = await self.queue.get() + if self.queue.empty(): + self.active_event.clear() + self.queue_occupancy_bytes -= len(frame) + self.queue_occupancy_frames -= 1 + if compact: + frame.compact() + return frame def recv_nowait(self, compact=True): - if self.queue: - frame = self.queue.popleft() + if not self.queue.empty(): + frame = self.queue.get_nowait() + if self.queue.empty(): + self.active_event.clear() self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_frames -= 1 if compact: @@ -565,11 +582,10 @@ class AxiStreamMonitor(AxiStreamBase): async def wait(self, timeout=0, timeout_unit='ns'): if not self.empty(): return - self.queue_sync.clear() if timeout: - await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) + await First(self.active_event.wait(), Timer(timeout, timeout_unit)) else: - await self.queue_sync.wait() + await self.active_event.wait() async def _run(self): frame = None @@ -609,8 +625,8 @@ class AxiStreamMonitor(AxiStreamBase): self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 - self.queue.append(frame) - self.queue_sync.set() + self.queue.put_nowait(frame) + self.active_event.set() frame = None @@ -684,8 +700,8 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause): self.queue_occupancy_bytes += len(frame) self.queue_occupancy_frames += 1 - self.queue.append(frame) - self.queue_sync.set() + self.queue.put_nowait(frame) + self.active_event.set() frame = None diff --git a/cocotbext/axi/stream.py b/cocotbext/axi/stream.py index de52873..be2e4af 100644 --- a/cocotbext/axi/stream.py +++ b/cocotbext/axi/stream.py @@ -23,9 +23,9 @@ THE SOFTWARE. """ import logging -from collections import deque import cocotb +from cocotb.queue import Queue from cocotb.triggers import RisingEdge, Event, First, Timer from cocotb_bus.bus import Bus @@ -94,8 +94,10 @@ class StreamBase(Reset): self.active = False - self.queue = deque() - self.queue_sync = Event() + self.queue = Queue() + self.idle_event = Event() + self.idle_event.set() + self.active_event = Event() self.ready = None self.valid = None @@ -124,13 +126,16 @@ class StreamBase(Reset): self._init_reset(reset, reset_active_level) def count(self): - return len(self.queue) + return self.queue.qsize() def empty(self): - return not self.queue + return self.queue.empty() def clear(self): - self.queue.clear() + while not self.queue.empty(): + self.queue.get_nowait() + self.idle_event.set() + self.active_event.clear() def _handle_reset(self, state): if state: @@ -184,17 +189,18 @@ class StreamSource(StreamBase, StreamPause): _ready_init = None async def send(self, obj): - self.send_nowait(obj) + await self.queue.put(obj) + self.idle_event.clear() def send_nowait(self, obj): - self.queue.append(obj) + self.queue.put_nowait(obj) + self.idle_event.clear() def idle(self): return self.empty() and not self.active async def wait(self): - while not self.idle(): - await RisingEdge(self.clock) + await self.idle_event.wait() def _handle_reset(self, state): super()._handle_reset(state) @@ -211,15 +217,17 @@ class StreamSource(StreamBase, StreamPause): valid_sample = self.valid is None or self.valid.value if (ready_sample and valid_sample) or (not valid_sample): - if self.queue and not self.pause: - self.bus.drive(self.queue.popleft()) + if not self.queue.empty() and not self.pause: + self.bus.drive(self.queue.get_nowait()) + if self.queue.empty(): + self.idle_event.set() if self.valid is not None: self.valid <= 1 self.active = True else: if self.valid is not None: self.valid <= 0 - self.active = bool(self.queue) + self.active = not self.queue.empty() class StreamMonitor(StreamBase): @@ -230,24 +238,26 @@ class StreamMonitor(StreamBase): _ready_init = None async def recv(self): - while self.empty(): - self.queue_sync.clear() - await self.queue_sync.wait() - return self.recv_nowait() + item = await self.queue.get() + if self.queue.empty(): + self.active_event.clear() + return item def recv_nowait(self): - if self.queue: - return self.queue.popleft() + if not self.queue.empty(): + item = self.queue.get_nowait() + if self.queue.empty(): + self.active_event.clear() + return item return None async def wait(self, timeout=0, timeout_unit=None): if not self.empty(): return - self.queue_sync.clear() if timeout: - await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) + await First(self.active_event.wait(), Timer(timeout, timeout_unit)) else: - await self.queue_sync.wait() + await self.active_event.wait() async def _run(self): while True: @@ -260,8 +270,8 @@ class StreamMonitor(StreamBase): if ready_sample and valid_sample: obj = self._transaction_obj() self.bus.sample(obj) - self.queue.append(obj) - self.queue_sync.set() + self.queue.put_nowait(obj) + self.active_event.set() class StreamSink(StreamMonitor, StreamPause): @@ -277,7 +287,7 @@ class StreamSink(StreamMonitor, StreamPause): self.queue_occupancy_limit = -1 def full(self): - if self.queue_occupancy_limit > 0 and len(self.queue) >= self.queue_occupancy_limit: + if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit: return True else: return False @@ -299,8 +309,8 @@ class StreamSink(StreamMonitor, StreamPause): if ready_sample and valid_sample: obj = self._transaction_obj() self.bus.sample(obj) - self.queue.append(obj) - self.queue_sync.set() + self.queue.put_nowait(obj) + self.active_event.set() if self.ready is not None: self.ready <= (not self.full() and not self.pause)