Put source to sleep when there is no data to send

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich
2023-01-20 15:49:16 -08:00
parent cd1a8b47a5
commit ede6270ed7
2 changed files with 10 additions and 0 deletions

View File

@@ -425,6 +425,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame = AxiStreamFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.active_event.set()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
@@ -434,6 +435,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.active_event.set()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
@@ -561,6 +563,9 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
self.active = bool(frame)
if not frame and self.queue.empty():
self.idle_event.set()
self.active_event.clear()
await self.active_event.wait()
class AxiStreamMonitor(AxiStreamBase):

View File

@@ -206,12 +206,14 @@ class StreamSource(StreamBase, StreamPause):
await self.dequeue_event.wait()
await self.queue.put(obj)
self.idle_event.clear()
self.active_event.set()
def send_nowait(self, obj):
if self.full():
raise QueueFull()
self.queue.put_nowait(obj)
self.idle_event.clear()
self.active_event.set()
def full(self):
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
@@ -255,6 +257,9 @@ class StreamSource(StreamBase, StreamPause):
self.active = not self.queue.empty()
if self.queue.empty():
self.idle_event.set()
self.active_event.clear()
await self.active_event.wait()
class StreamMonitor(StreamBase):