Factor out common recv code; throw QueueEmpty exception in get_nowait
This commit is contained in:
@@ -552,8 +552,7 @@ class AxiStreamMonitor(AxiStreamBase):
|
|||||||
|
|
||||||
self.read_queue = []
|
self.read_queue = []
|
||||||
|
|
||||||
async def recv(self, compact=True):
|
def _recv(self, frame, compact=True):
|
||||||
frame = await self.queue.get()
|
|
||||||
if self.queue.empty():
|
if self.queue.empty():
|
||||||
self.active_event.clear()
|
self.active_event.clear()
|
||||||
self.queue_occupancy_bytes -= len(frame)
|
self.queue_occupancy_bytes -= len(frame)
|
||||||
@@ -562,17 +561,13 @@ class AxiStreamMonitor(AxiStreamBase):
|
|||||||
frame.compact()
|
frame.compact()
|
||||||
return frame
|
return frame
|
||||||
|
|
||||||
|
async def recv(self, compact=True):
|
||||||
|
frame = await self.queue.get()
|
||||||
|
return self._recv(frame, compact)
|
||||||
|
|
||||||
def recv_nowait(self, compact=True):
|
def recv_nowait(self, compact=True):
|
||||||
if not self.queue.empty():
|
|
||||||
frame = self.queue.get_nowait()
|
frame = self.queue.get_nowait()
|
||||||
if self.queue.empty():
|
return self._recv(frame, compact)
|
||||||
self.active_event.clear()
|
|
||||||
self.queue_occupancy_bytes -= len(frame)
|
|
||||||
self.queue_occupancy_frames -= 1
|
|
||||||
if compact:
|
|
||||||
frame.compact()
|
|
||||||
return frame
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def read(self, count=-1):
|
async def read(self, count=-1):
|
||||||
while not self.read_queue:
|
while not self.read_queue:
|
||||||
|
|||||||
@@ -241,19 +241,18 @@ class StreamMonitor(StreamBase):
|
|||||||
_valid_init = None
|
_valid_init = None
|
||||||
_ready_init = None
|
_ready_init = None
|
||||||
|
|
||||||
async def recv(self):
|
def _recv(self, item):
|
||||||
item = await self.queue.get()
|
|
||||||
if self.queue.empty():
|
if self.queue.empty():
|
||||||
self.active_event.clear()
|
self.active_event.clear()
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
async def recv(self):
|
||||||
|
item = await self.queue.get()
|
||||||
|
return self._recv(item)
|
||||||
|
|
||||||
def recv_nowait(self):
|
def recv_nowait(self):
|
||||||
if not self.queue.empty():
|
|
||||||
item = self.queue.get_nowait()
|
item = self.queue.get_nowait()
|
||||||
if self.queue.empty():
|
return self._recv(item)
|
||||||
self.active_event.clear()
|
|
||||||
return item
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def wait(self, timeout=0, timeout_unit=None):
|
async def wait(self, timeout=0, timeout_unit=None):
|
||||||
if not self.empty():
|
if not self.empty():
|
||||||
|
|||||||
Reference in New Issue
Block a user