6 Commits

Author SHA1 Message Date
Alex Forencich
c0ebb90cd4 Release v0.1.8 2021-03-17 18:31:54 -07:00
Alex Forencich
56caf57fa4 Fix method name 2021-03-17 18:19:30 -07:00
Alex Forencich
abb78308ff Defer idle event until completion of transfer 2021-03-17 18:02:11 -07:00
Alex Forencich
f19ca9f651 Use cocotb async queues 2021-03-17 17:34:26 -07:00
Alex Forencich
1c40b8fa58 Use cocotb-bus 2021-03-16 18:47:32 -07:00
Alex Forencich
cfd5dae6ea Bump to dev version v0.1.7 2021-03-06 18:30:19 -08:00
6 changed files with 162 additions and 185 deletions

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
""" """
import logging import logging
from collections import deque, namedtuple, Counter from collections import namedtuple, Counter
import cocotb import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event from cocotb.triggers import Event
from .version import __version__ from .version import __version__
@@ -61,18 +62,15 @@ class AxiMasterWrite(Reset):
self.w_channel = AxiWSource(bus.w, clock, reset, reset_active_level) self.w_channel = AxiWSource(bus.w, clock, reset, reset_active_level)
self.b_channel = AxiBSink(bus.b, clock, reset, reset_active_level) self.b_channel = AxiBSink(bus.b, clock, reset, reset_active_level)
self.write_command_queue = deque() self.write_command_queue = Queue()
self.write_command_sync = Event() self.write_resp_queue = Queue()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.id_count = 2**len(self.aw_channel.bus.awid) self.id_count = 2**len(self.aw_channel.bus.awid)
self.cur_id = 0 self.cur_id = 0
self.active_id = Counter() self.active_id = Counter()
self.int_write_resp_command_queue = deque() self.int_write_resp_command_queue = Queue()
self.int_write_resp_command_sync = Event() self.int_write_resp_queue_list = [Queue() for k in range(self.id_count)]
self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle = Event() self._idle = Event()
@@ -138,8 +136,7 @@ class AxiMasterWrite(Reset):
cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock, cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock,
cache, prot, qos, region, user, wuser, event) cache, prot, qos, region, user, wuser, event)
self.write_command_queue.append(cmd) self.write_command_queue.put_nowait(cmd)
self.write_command_sync.set()
def idle(self): def idle(self):
return not self.in_flight_operations return not self.in_flight_operations
@@ -149,11 +146,11 @@ class AxiMasterWrite(Reset):
await self._idle.wait() await self._idle.wait()
def write_resp_ready(self): def write_resp_ready(self):
return bool(self.write_resp_queue) return not self.write_resp_queue.empty()
def get_write_resp(self): def get_write_resp(self):
if self.write_resp_queue: if not self.write_resp_queue.empty():
return self.write_resp_queue.popleft() return self.write_resp_queue.get_nowait()
return None return None
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
@@ -220,28 +217,25 @@ class AxiMasterWrite(Reset):
self.w_channel.clear() self.w_channel.clear()
self.b_channel.clear() self.b_channel.clear()
while self.write_command_queue: while not self.write_command_queue.empty():
cmd = self.write_command_queue.popleft() cmd = self.write_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
while self.int_write_resp_command_queue: while not self.int_write_resp_command_queue.empty():
cmd = self.int_write_resp_command_queue.popleft() cmd = self.int_write_resp_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
self.write_resp_queue.clear() while not self.write_resp_queue.empty():
self.write_resp_queue.get_nowait()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle.set() self._idle.set()
async def _process_write(self): async def _process_write(self):
while True: while True:
if not self.write_command_queue: cmd = await self.write_command_queue.get()
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = self.write_command_queue.popleft()
num_bytes = 2**cmd.size num_bytes = 2**cmd.size
@@ -340,22 +334,17 @@ class AxiMasterWrite(Reset):
cycle_offset = (cycle_offset + num_bytes) % self.byte_width cycle_offset = (cycle_offset + num_bytes) % self.byte_width
resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event) resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd) await self.int_write_resp_command_queue.put(resp_cmd)
self.int_write_resp_command_sync.set()
async def _process_write_resp(self): async def _process_write_resp(self):
while True: while True:
if not self.int_write_resp_command_queue: cmd = await self.int_write_resp_command_queue.get()
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
cmd = self.int_write_resp_command_queue.popleft()
resp = AxiResp.OKAY resp = AxiResp.OKAY
user = [] user = []
for bid, burst_length in cmd.burst_list: for bid, burst_length in cmd.burst_list:
while not self.int_write_resp_queue_list[bid]: while self.int_write_resp_queue_list[bid].empty():
b = await self.b_channel.recv() b = await self.b_channel.recv()
i = int(b.bid) i = int(b.bid)
@@ -363,9 +352,9 @@ class AxiMasterWrite(Reset):
if self.active_id[i] <= 0: if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}") raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[i].append(b) self.int_write_resp_queue_list[i].put_nowait(b)
b = self.int_write_resp_queue_list[bid].popleft() b = self.int_write_resp_queue_list[bid].get_nowait()
burst_id = int(b.bid) burst_id = int(b.bid)
burst_resp = AxiResp(b.bresp) burst_resp = AxiResp(b.bresp)
@@ -392,8 +381,7 @@ class AxiMasterWrite(Reset):
if cmd.event is not None: if cmd.event is not None:
cmd.event.set(write_resp) cmd.event.set(write_resp)
else: else:
self.write_resp_queue.append(write_resp) self.write_resp_queue.put_nowait(write_resp)
self.write_resp_sync.set()
self.in_flight_operations -= 1 self.in_flight_operations -= 1
@@ -413,18 +401,15 @@ class AxiMasterRead(Reset):
self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level) self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level)
self.r_channel = AxiRSink(bus.r, clock, reset, reset_active_level) self.r_channel = AxiRSink(bus.r, clock, reset, reset_active_level)
self.read_command_queue = deque() self.read_command_queue = Queue()
self.read_command_sync = Event() self.read_data_queue = Queue()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.id_count = 2**len(self.ar_channel.bus.arid) self.id_count = 2**len(self.ar_channel.bus.arid)
self.cur_id = 0 self.cur_id = 0
self.active_id = Counter() self.active_id = Counter()
self.int_read_resp_command_queue = deque() self.int_read_resp_command_queue = Queue()
self.int_read_resp_command_sync = Event() self.int_read_resp_queue_list = [Queue() for k in range(self.id_count)]
self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle = Event() self._idle = Event()
@@ -483,8 +468,7 @@ class AxiMasterRead(Reset):
self._idle.clear() self._idle.clear()
cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event) cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
self.read_command_queue.append(cmd) self.read_command_queue.put_nowait(cmd)
self.read_command_sync.set()
def idle(self): def idle(self):
return not self.in_flight_operations return not self.in_flight_operations
@@ -494,11 +478,11 @@ class AxiMasterRead(Reset):
await self._idle.wait() await self._idle.wait()
def read_data_ready(self): def read_data_ready(self):
return bool(self.read_data_queue) return not self.read_data_queue.empty()
def get_read_data(self): def get_read_data(self):
if self.read_data_queue: if not self.read_data_queue.empty():
return self.read_data_queue.popleft() return self.read_data_queue.get_nowait()
return None return None
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None, async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
@@ -564,28 +548,25 @@ class AxiMasterRead(Reset):
self.ar_channel.clear() self.ar_channel.clear()
self.r_channel.clear() self.r_channel.clear()
while self.read_command_queue: while not self.read_command_queue.empty():
cmd = self.read_command_queue.popleft() cmd = self.read_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
while self.int_read_resp_command_queue: while not self.int_read_resp_command_queue.empty():
cmd = self.int_read_resp_command_queue.popleft() cmd = self.int_read_resp_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
self.read_data_queue.clear() while not self.read_data_queue.empty():
self.read_data_queue.get_nowait()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle.set() self._idle.set()
async def _process_read(self): async def _process_read(self):
while True: while True:
if not self.read_command_queue: cmd = await self.read_command_queue.get()
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = self.read_command_queue.popleft()
num_bytes = 2**cmd.size num_bytes = 2**cmd.size
@@ -643,16 +624,11 @@ class AxiMasterRead(Reset):
cur_addr += num_bytes cur_addr += num_bytes
resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event) resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd) await self.int_read_resp_command_queue.put(resp_cmd)
self.int_read_resp_command_sync.set()
async def _process_read_resp(self): async def _process_read_resp(self):
while True: while True:
if not self.int_read_resp_command_queue: cmd = await self.int_read_resp_command_queue.get()
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
cmd = self.int_read_resp_command_queue.popleft()
num_bytes = 2**cmd.size num_bytes = 2**cmd.size
@@ -671,7 +647,7 @@ class AxiMasterRead(Reset):
for rid, burst_length in cmd.burst_list: for rid, burst_length in cmd.burst_list:
for k in range(burst_length): for k in range(burst_length):
while not self.int_read_resp_queue_list[rid]: while self.int_read_resp_queue_list[rid].empty():
r = await self.r_channel.recv() r = await self.r_channel.recv()
i = int(r.rid) i = int(r.rid)
@@ -679,9 +655,9 @@ class AxiMasterRead(Reset):
if self.active_id[i] <= 0: if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}") raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[i].append(r) self.int_read_resp_queue_list[i].put_nowait(r)
r = self.int_read_resp_queue_list[rid].popleft() r = self.int_read_resp_queue_list[rid].get_nowait()
cycle_id = int(r.rid) cycle_id = int(r.rid)
cycle_data = int(r.rdata) cycle_data = int(r.rdata)
@@ -727,8 +703,7 @@ class AxiMasterRead(Reset):
if cmd.event is not None: if cmd.event is not None:
cmd.event.set(read_resp) cmd.event.set(read_resp)
else: else:
self.read_data_queue.append(read_resp) self.read_data_queue.put_nowait(read_resp)
self.read_data_sync.set()
self.in_flight_operations -= 1 self.in_flight_operations -= 1

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
""" """
import logging import logging
from collections import deque, namedtuple from collections import namedtuple
import cocotb import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event from cocotb.triggers import Event
from .version import __version__ from .version import __version__
@@ -57,13 +58,10 @@ class AxiLiteMasterWrite(Reset):
self.w_channel = AxiLiteWSource(bus.w, clock, reset, reset_active_level) self.w_channel = AxiLiteWSource(bus.w, clock, reset, reset_active_level)
self.b_channel = AxiLiteBSink(bus.b, clock, reset, reset_active_level) self.b_channel = AxiLiteBSink(bus.b, clock, reset, reset_active_level)
self.write_command_queue = deque() self.write_command_queue = Queue()
self.write_command_sync = Event() self.write_resp_queue = Queue()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.int_write_resp_command_queue = deque() self.int_write_resp_command_queue = Queue()
self.int_write_resp_command_sync = Event()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle = Event() self._idle = Event()
@@ -94,8 +92,7 @@ class AxiLiteMasterWrite(Reset):
self.in_flight_operations += 1 self.in_flight_operations += 1
self._idle.clear() self._idle.clear()
self.write_command_queue.append(AxiLiteWriteCmd(address, bytearray(data), prot, event)) self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event))
self.write_command_sync.set()
def idle(self): def idle(self):
return not self.in_flight_operations return not self.in_flight_operations
@@ -105,11 +102,11 @@ class AxiLiteMasterWrite(Reset):
await self._idle.wait() await self._idle.wait()
def write_resp_ready(self): def write_resp_ready(self):
return bool(self.write_resp_queue) return not self.write_resp_queue.empty()
def get_write_resp(self): def get_write_resp(self):
if self.write_resp_queue: if not self.write_resp_queue.empty():
return self.write_resp_queue.popleft() return self.write_resp_queue.get_nowait()
return None return None
async def write(self, address, data, prot=AxiProt.NONSECURE): async def write(self, address, data, prot=AxiProt.NONSECURE):
@@ -163,28 +160,25 @@ class AxiLiteMasterWrite(Reset):
self.w_channel.clear() self.w_channel.clear()
self.b_channel.clear() self.b_channel.clear()
while self.write_command_queue: while not self.write_command_queue.empty():
cmd = self.write_command_queue.popleft() cmd = self.write_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
while self.int_write_resp_command_queue: while not self.int_write_resp_command_queue.empty():
cmd = self.int_write_resp_command_queue.popleft() cmd = self.int_write_resp_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
self.write_resp_queue.clear() while not self.write_resp_queue.empty():
self.write_resp_queue.get_nowait()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle.set() self._idle.set()
async def _process_write(self): async def _process_write(self):
while True: while True:
if not self.write_command_queue: cmd = await self.write_command_queue.get()
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = self.write_command_queue.popleft()
word_addr = (cmd.address // self.byte_width) * self.byte_width word_addr = (cmd.address // self.byte_width) * self.byte_width
@@ -197,8 +191,7 @@ class AxiLiteMasterWrite(Reset):
cycles = (len(cmd.data) + (cmd.address % self.byte_width) + self.byte_width-1) // self.byte_width cycles = (len(cmd.data) + (cmd.address % self.byte_width) + self.byte_width-1) // self.byte_width
resp_cmd = AxiLiteWriteRespCmd(cmd.address, len(cmd.data), cycles, cmd.prot, cmd.event) resp_cmd = AxiLiteWriteRespCmd(cmd.address, len(cmd.data), cycles, cmd.prot, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd) await self.int_write_resp_command_queue.put(resp_cmd)
self.int_write_resp_command_sync.set()
offset = 0 offset = 0
@@ -235,11 +228,7 @@ class AxiLiteMasterWrite(Reset):
async def _process_write_resp(self): async def _process_write_resp(self):
while True: while True:
if not self.int_write_resp_command_queue: cmd = await self.int_write_resp_command_queue.get()
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
cmd = self.int_write_resp_command_queue.popleft()
resp = AxiResp.OKAY resp = AxiResp.OKAY
@@ -259,8 +248,7 @@ class AxiLiteMasterWrite(Reset):
if cmd.event is not None: if cmd.event is not None:
cmd.event.set(write_resp) cmd.event.set(write_resp)
else: else:
self.write_resp_queue.append(write_resp) self.write_resp_queue.put_nowait(write_resp)
self.write_resp_sync.set()
self.in_flight_operations -= 1 self.in_flight_operations -= 1
@@ -280,13 +268,10 @@ class AxiLiteMasterRead(Reset):
self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level) self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level)
self.r_channel = AxiLiteRSink(bus.r, clock, reset, reset_active_level) self.r_channel = AxiLiteRSink(bus.r, clock, reset, reset_active_level)
self.read_command_queue = deque() self.read_command_queue = Queue()
self.read_command_sync = Event() self.read_data_queue = Queue()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.int_read_resp_command_queue = deque() self.int_read_resp_command_queue = Queue()
self.int_read_resp_command_sync = Event()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle = Event() self._idle = Event()
@@ -315,8 +300,7 @@ class AxiLiteMasterRead(Reset):
self.in_flight_operations += 1 self.in_flight_operations += 1
self._idle.clear() self._idle.clear()
self.read_command_queue.append(AxiLiteReadCmd(address, length, prot, event)) self.read_command_queue.put_nowait(AxiLiteReadCmd(address, length, prot, event))
self.read_command_sync.set()
def idle(self): def idle(self):
return not self.in_flight_operations return not self.in_flight_operations
@@ -326,11 +310,11 @@ class AxiLiteMasterRead(Reset):
await self._idle.wait() await self._idle.wait()
def read_data_ready(self): def read_data_ready(self):
return bool(self.read_data_queue) return not self.read_data_queue.empty()
def get_read_data(self): def get_read_data(self):
if self.read_data_queue: if not self.read_data_queue.empty():
return self.read_data_queue.popleft() return self.read_data_queue.get_nowait()
return None return None
async def read(self, address, length, prot=AxiProt.NONSECURE): async def read(self, address, length, prot=AxiProt.NONSECURE):
@@ -383,36 +367,32 @@ class AxiLiteMasterRead(Reset):
self.ar_channel.clear() self.ar_channel.clear()
self.r_channel.clear() self.r_channel.clear()
while self.read_command_queue: while not self.read_command_queue.empty():
cmd = self.read_command_queue.popleft() cmd = self.read_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
while self.int_read_resp_command_queue: while not self.int_read_resp_command_queue.empty():
cmd = self.int_read_resp_command_queue.popleft() cmd = self.int_read_resp_command_queue.get_nowait()
if cmd.event: if cmd.event:
cmd.event.set(None) cmd.event.set(None)
self.read_data_queue.clear() while not self.read_data_queue.empty():
self.read_data_queue.get_nowait()
self.in_flight_operations = 0 self.in_flight_operations = 0
self._idle.set() self._idle.set()
async def _process_read(self): async def _process_read(self):
while True: while True:
if not self.read_command_queue: cmd = await self.read_command_queue.get()
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = self.read_command_queue.popleft()
word_addr = (cmd.address // self.byte_width) * self.byte_width word_addr = (cmd.address // self.byte_width) * self.byte_width
cycles = (cmd.length + self.byte_width-1 + (cmd.address % self.byte_width)) // self.byte_width cycles = (cmd.length + self.byte_width-1 + (cmd.address % self.byte_width)) // self.byte_width
resp_cmd = AxiLiteReadRespCmd(cmd.address, cmd.length, cycles, cmd.prot, cmd.event) resp_cmd = AxiLiteReadRespCmd(cmd.address, cmd.length, cycles, cmd.prot, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd) await self.int_read_resp_command_queue.put(resp_cmd)
self.int_read_resp_command_sync.set()
self.log.info("Read start addr: 0x%08x prot: %s length: %d", self.log.info("Read start addr: 0x%08x prot: %s length: %d",
cmd.address, cmd.prot, cmd.length) cmd.address, cmd.prot, cmd.length)
@@ -426,11 +406,7 @@ class AxiLiteMasterRead(Reset):
async def _process_read_resp(self): async def _process_read_resp(self):
while True: while True:
if not self.int_read_resp_command_queue: cmd = await self.int_read_resp_command_queue.get()
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
cmd = self.int_read_resp_command_queue.popleft()
start_offset = cmd.address % self.byte_width start_offset = cmd.address % self.byte_width
end_offset = ((cmd.address + cmd.length - 1) % self.byte_width) + 1 end_offset = ((cmd.address + cmd.length - 1) % self.byte_width) + 1
@@ -467,8 +443,7 @@ class AxiLiteMasterRead(Reset):
if cmd.event is not None: if cmd.event is not None:
cmd.event.set(read_resp) cmd.event.set(read_resp)
else: else:
self.read_data_queue.append(read_resp) self.read_data_queue.put_nowait(read_resp)
self.read_data_sync.set()
self.in_flight_operations -= 1 self.in_flight_operations -= 1

View File

@@ -23,12 +23,12 @@ THE SOFTWARE.
""" """
import logging import logging
from collections import deque
import cocotb import cocotb
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Timer, First, Event from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time from cocotb.utils import get_sim_time
from cocotb.bus import Bus from cocotb_bus.bus import Bus
from .version import __version__ from .version import __version__
from .reset import Reset from .reset import Reset
@@ -276,8 +276,10 @@ class AxiStreamBase(Reset):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.queue_sync = Event() self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -344,13 +346,16 @@ class AxiStreamBase(Reset):
self._init_reset(reset, reset_active_level) self._init_reset(reset, reset_active_level)
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def clear(self): def clear(self):
self.queue.clear() while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.active_event.clear()
self.queue_occupancy_bytes = 0 self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0 self.queue_occupancy_frames = 0
@@ -408,13 +413,18 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
_ready_init = None _ready_init = None
async def send(self, frame): async def send(self, frame):
self.send_nowait(frame) frame = AxiStreamFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame): def send_nowait(self, frame):
frame = AxiStreamFrame(frame) frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame)
async def write(self, data): async def write(self, data):
await self.send(data) await self.send(data)
@@ -426,8 +436,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
return self.empty() and not self.active return self.empty() and not self.active
async def wait(self): async def wait(self):
while not self.idle(): await self.idle_event.wait()
await RisingEdge(self.clock)
def _handle_reset(self, state): def _handle_reset(self, state):
super()._handle_reset(state) super()._handle_reset(state)
@@ -458,8 +467,8 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
if (tready_sample and tvalid_sample) or not tvalid_sample: if (tready_sample and tvalid_sample) or not tvalid_sample:
if frame is None and self.queue: if frame is None and not self.queue.empty():
frame = self.queue.popleft() frame = self.queue.get_nowait()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
frame.sim_time_start = get_sim_time() frame.sim_time_start = get_sim_time()
@@ -509,6 +518,8 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
if hasattr(self.bus, "tlast"): if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0 self.bus.tlast <= 0
self.active = bool(frame) self.active = bool(frame)
if not frame and self.queue.empty():
self.idle_event.set()
class AxiStreamMonitor(AxiStreamBase): class AxiStreamMonitor(AxiStreamBase):
@@ -528,14 +539,20 @@ class AxiStreamMonitor(AxiStreamBase):
self.read_queue = [] self.read_queue = []
async def recv(self, compact=True): async def recv(self, compact=True):
while self.empty(): frame = await self.queue.get()
self.queue_sync.clear() if self.queue.empty():
await self.queue_sync.wait() self.active_event.clear()
return self.recv_nowait(compact) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
def recv_nowait(self, compact=True): def recv_nowait(self, compact=True):
if self.queue: if not self.queue.empty():
frame = self.queue.popleft() frame = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame) self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1 self.queue_occupancy_frames -= 1
if compact: if compact:
@@ -565,11 +582,10 @@ class AxiStreamMonitor(AxiStreamBase):
async def wait(self, timeout=0, timeout_unit='ns'): async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty(): if not self.empty():
return return
self.queue_sync.clear()
if timeout: if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else: else:
await self.queue_sync.wait() await self.active_event.wait()
async def _run(self): async def _run(self):
frame = None frame = None
@@ -609,8 +625,8 @@ class AxiStreamMonitor(AxiStreamBase):
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.put_nowait(frame)
self.queue_sync.set() self.active_event.set()
frame = None frame = None
@@ -684,8 +700,8 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
self.queue_occupancy_bytes += len(frame) self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1 self.queue_occupancy_frames += 1
self.queue.append(frame) self.queue.put_nowait(frame)
self.queue_sync.set() self.active_event.set()
frame = None frame = None

View File

@@ -23,11 +23,11 @@ THE SOFTWARE.
""" """
import logging import logging
from collections import deque
import cocotb import cocotb
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Event, First, Timer from cocotb.triggers import RisingEdge, Event, First, Timer
from cocotb.bus import Bus from cocotb_bus.bus import Bus
from .reset import Reset from .reset import Reset
@@ -94,8 +94,10 @@ class StreamBase(Reset):
self.active = False self.active = False
self.queue = deque() self.queue = Queue()
self.queue_sync = Event() self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.ready = None self.ready = None
self.valid = None self.valid = None
@@ -124,13 +126,16 @@ class StreamBase(Reset):
self._init_reset(reset, reset_active_level) self._init_reset(reset, reset_active_level)
def count(self): def count(self):
return len(self.queue) return self.queue.qsize()
def empty(self): def empty(self):
return not self.queue return self.queue.empty()
def clear(self): def clear(self):
self.queue.clear() while not self.queue.empty():
self.queue.get_nowait()
self.idle_event.set()
self.active_event.clear()
def _handle_reset(self, state): def _handle_reset(self, state):
if state: if state:
@@ -184,17 +189,18 @@ class StreamSource(StreamBase, StreamPause):
_ready_init = None _ready_init = None
async def send(self, obj): async def send(self, obj):
self.send_nowait(obj) await self.queue.put(obj)
self.idle_event.clear()
def send_nowait(self, obj): def send_nowait(self, obj):
self.queue.append(obj) self.queue.put_nowait(obj)
self.idle_event.clear()
def idle(self): def idle(self):
return self.empty() and not self.active return self.empty() and not self.active
async def wait(self): async def wait(self):
while not self.idle(): await self.idle_event.wait()
await RisingEdge(self.clock)
def _handle_reset(self, state): def _handle_reset(self, state):
super()._handle_reset(state) super()._handle_reset(state)
@@ -211,15 +217,17 @@ class StreamSource(StreamBase, StreamPause):
valid_sample = self.valid is None or self.valid.value valid_sample = self.valid is None or self.valid.value
if (ready_sample and valid_sample) or (not valid_sample): if (ready_sample and valid_sample) or (not valid_sample):
if self.queue and not self.pause: if not self.queue.empty() and not self.pause:
self.bus.drive(self.queue.popleft()) self.bus.drive(self.queue.get_nowait())
if self.valid is not None: if self.valid is not None:
self.valid <= 1 self.valid <= 1
self.active = True self.active = True
else: else:
if self.valid is not None: if self.valid is not None:
self.valid <= 0 self.valid <= 0
self.active = bool(self.queue) self.active = not self.queue.empty()
if self.queue.empty():
self.idle_event.set()
class StreamMonitor(StreamBase): class StreamMonitor(StreamBase):
@@ -230,24 +238,26 @@ class StreamMonitor(StreamBase):
_ready_init = None _ready_init = None
async def recv(self): async def recv(self):
while self.empty(): item = await self.queue.get()
self.queue_sync.clear() if self.queue.empty():
await self.queue_sync.wait() self.active_event.clear()
return self.recv_nowait() return item
def recv_nowait(self): def recv_nowait(self):
if self.queue: if not self.queue.empty():
return self.queue.popleft() item = self.queue.get_nowait()
if self.queue.empty():
self.active_event.clear()
return item
return None return None
async def wait(self, timeout=0, timeout_unit=None): async def wait(self, timeout=0, timeout_unit=None):
if not self.empty(): if not self.empty():
return return
self.queue_sync.clear()
if timeout: if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit)) await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else: else:
await self.queue_sync.wait() await self.active_event.wait()
async def _run(self): async def _run(self):
while True: while True:
@@ -260,8 +270,8 @@ class StreamMonitor(StreamBase):
if ready_sample and valid_sample: if ready_sample and valid_sample:
obj = self._transaction_obj() obj = self._transaction_obj()
self.bus.sample(obj) self.bus.sample(obj)
self.queue.append(obj) self.queue.put_nowait(obj)
self.queue_sync.set() self.active_event.set()
class StreamSink(StreamMonitor, StreamPause): class StreamSink(StreamMonitor, StreamPause):
@@ -277,7 +287,7 @@ class StreamSink(StreamMonitor, StreamPause):
self.queue_occupancy_limit = -1 self.queue_occupancy_limit = -1
def full(self): def full(self):
if self.queue_occupancy_limit > 0 and len(self.queue) >= self.queue_occupancy_limit: if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True return True
else: else:
return False return False
@@ -299,8 +309,8 @@ class StreamSink(StreamMonitor, StreamPause):
if ready_sample and valid_sample: if ready_sample and valid_sample:
obj = self._transaction_obj() obj = self._transaction_obj()
self.bus.sample(obj) self.bus.sample(obj)
self.queue.append(obj) self.queue.put_nowait(obj)
self.queue_sync.set() self.active_event.set()
if self.ready is not None: if self.ready is not None:
self.ready <= (not self.full() and not self.pause) self.ready <= (not self.full() and not self.pause)

View File

@@ -1 +1 @@
__version__ = "0.1.6" __version__ = "0.1.8"

View File

@@ -27,6 +27,7 @@ packages = find_namespace:
python_requires = >=3.6 python_requires = >=3.6
install_requires = install_requires =
cocotb cocotb
cocotb-bus
[options.extras_require] [options.extras_require]
test = test =