27 Commits

Author SHA1 Message Date
Alex Forencich
c4873ad14c Release v0.1.10 2021-03-24 21:03:16 -07:00
Alex Forencich
77a40bdc8f Limit channel queue depth 2021-03-24 17:55:07 -07:00
Alex Forencich
f991096272 Separate processing coroutines for each ID 2021-03-24 17:07:16 -07:00
Alex Forencich
9e28bd7fbb Revert back to cocotb.fork 2021-03-24 16:20:08 -07:00
Alex Forencich
8c74f747a4 Update readme 2021-03-22 23:19:14 -07:00
Alex Forencich
a285f008ca Refactor reset handling code 2021-03-22 22:02:53 -07:00
Alex Forencich
c677ab245c Reset more internal state 2021-03-22 22:02:06 -07:00
Alex Forencich
11f9db8b06 Add test cases for init_read and init_write 2021-03-22 21:22:13 -07:00
Alex Forencich
344ec8d4ce Return event object from init_read and init_write; remove get_write_resp and get_read_data 2021-03-22 21:21:34 -07:00
Alex Forencich
4ff390481e Extract parameter values from cocotb.top 2021-03-22 15:51:07 -07:00
Alex Forencich
4bee96ea9a Enforce max queue depth on streaming sources 2021-03-21 22:24:59 -07:00
Alex Forencich
a66dfea6f7 Factor out common recv code; throw QueueEmpty exception in get_nowait 2021-03-21 21:02:28 -07:00
Alex Forencich
f1a89e6c12 Trigger transmit complete events when flushing queue to prevent deadlocks 2021-03-21 18:46:30 -07:00
Alex Forencich
11205bde46 Handle dropped transmit frames during reset 2021-03-21 18:39:35 -07:00
Alex Forencich
bce364eef5 Ensure idle event is set when queue is empty 2021-03-21 18:39:10 -07:00
Alex Forencich
e934b69776 Use start_soon instead of fork 2021-03-21 12:22:22 -07:00
Alex Forencich
7fb8c4e28b Reset processing on assert edge only to permit operations to be queued while reset is asserted 2021-03-21 12:13:19 -07:00
Alex Forencich
156fada616 Store commands currently being processed so they can be released when the processing coroutines are killed 2021-03-21 12:04:30 -07:00
Alex Forencich
d88ba7caf3 Warn when operations are dropped during reset 2021-03-21 11:41:25 -07:00
Alex Forencich
6c66776518 Use start_soon instead of fork 2021-03-21 11:40:25 -07:00
Alex Forencich
a71678c7e3 Bump to dev version 2021-03-17 18:32:42 -07:00
Alex Forencich
c0ebb90cd4 Release v0.1.8 2021-03-17 18:31:54 -07:00
Alex Forencich
56caf57fa4 Fix method name 2021-03-17 18:19:30 -07:00
Alex Forencich
abb78308ff Defer idle event until completion of transfer 2021-03-17 18:02:11 -07:00
Alex Forencich
f19ca9f651 Use cocotb async queues 2021-03-17 17:34:26 -07:00
Alex Forencich
1c40b8fa58 Use cocotb-bus 2021-03-16 18:47:32 -07:00
Alex Forencich
cfd5dae6ea Bump to dev version v0.1.7 2021-03-06 18:30:19 -08:00
12 changed files with 524 additions and 411 deletions

View File

@@ -42,36 +42,27 @@ To use these modules, import the one you need and connect it to the DUT:
axi_master = AxiMaster(AxiBus.from_prefix(dut, "s_axi"), dut.clk, dut.rst)
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object. These objects are containers for the interface signals and include class methods to automate connections.
The first argument to the constructor accepts an `AxiBus` or `AxiLiteBus` object, as appropriate. These objects are containers for the interface signals and include class methods to automate connections.
Once the module is instantiated, read and write operations can be initiated in a few different ways.
Once the module is instantiated, read and write operations can be initiated in a couple of different ways.
First, non-blocking operations can be started with `init_read()` and `init_write()`. These methods will queue up a read or write operation to be carried out over the interface. The result of the operation can be retrieved with `get_read_data()` and `get_write_resp()`. To monitor the status of the module, `idle()`, `wait()`, `wait_read()`, and `wait_write()` can be used. For example:
axi_master.init_write(0x0000, b'test')
await axi_master.wait()
resp = axi_master.get_write_resp()
axi_master.init_read(0x0000, 4)
await axi_master.wait()
data = axi_master.get_read_data()
Alternatively, an event object can be provided as an argument to `init_read()` and `init_write()`, and the result can be retrieved from `Event.data`. For example:
event = Event()
axi_master.init_write(0x0000, b'test', event=event)
await event.wait()
resp = event.data
event = Event()
axi_master.init_read(0x0000, 4, event=event)
await event.wait()
resp = event.data
Second, blocking operations can be carried out with `read()` and `write()` and their associated word-access wrappers. Multiple concurrent operations started from different coroutines are handled correctly. For example:
First, blocking operations can be carried out with `read()` and `write()` and their associated word-access wrappers. Multiple concurrent operations started from different coroutines are handled correctly. For example:
await axi_master.write(0x0000, b'test')
data = await axi_master.read(0x0000, 4)
`read()`, `write()`, `get_read_data()`, and `get_write_resp()` return `namedtuple` objects containing _address_, _data_ or _length_, and _resp_.
`read()` and `write()` return `namedtuple` objects containing _address_, _data_ or _length_, and _resp_. This is the preferred style, and this is the only style supported by the word-access wrappers.
Alternatively, operations can be initiated with non-blocking `init_read()` and `init_write()`. These functions return `Event` objects which are triggered when the operation completes, and the result can be retrieved from `Event.data`. For example:
write_op = axi_master.init_write(0x0000, b'test')
await write_op.wait()
resp = write_op.data
read_op = axi_master.init_read(0x0000, 4)
await read_op.wait()
resp = read_op.data
With this method, it is possible to start multiple concurrent operations from the same coroutine. It is also possible to use the events with `Combine`, `First`, and `with_timeout`.
#### `AxiMaster` and `AxiLiteMaster` constructor parameters
@@ -86,16 +77,12 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
#### Methods
* `init_read(address, length, ...)`: initiate reading _length_ bytes, starting at _address_
* `init_write(address, data, ...)`: initiate writing _data_ (bytes), starting from _address_
* `init_read(address, length, ...)`: initiate reading _length_ bytes, starting at _address_. Returns an `Event` object.
* `init_write(address, data, ...)`: initiate writing _data_ (bytes), starting from _address_. Returns an `Event` object.
* `idle()`: returns _True_ when there are no outstanding operations in progress
* `wait()`: blocking wait until all outstanding operations complete
* `wait_read()`: wait until all outstanding read operations complete
* `wait_write()`: wait until all outstanding write operations complete
* `read_data_ready()`: determine if any read read data is available
* `get_read_data()`: fetch first available read data
* `write_resp_ready()`: determine if any write response is available
* `get_write_resp()`: fetch first available write response
* `read(address, length, ...)`: read _length_ bytes, starting at _address_
* `read_words(address, count, byteorder='little', ws=2, ...)`: read _count_ _ws_-byte words, starting at _address_
* `read_dwords(address, count, byteorder='little', ...)`: read _count_ 4-byte dwords, starting at _address_
@@ -125,16 +112,16 @@ Second, blocking operations can be carried out with `read()` and `write()` and t
* _region_: AXI region field, default `0`
* _user_: AXI user signal (awuser/aruser), default `0`
* _wuser_: AXI wuser signal, default `0` (write-related methods only)
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None` (`init_read()` and `init_write()` only). If provided, the event will be triggered when the operation completes and the result returned via `Event.data` instead of `get_read_data()` or `get_write_resp()`.
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
#### Additional optional arguments for `AxiLiteMaster`
* _prot_: AXI protection flags, default `AxiProt.NONSECURE`
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None` (`init_read()` and `init_write()` only). If provided, the event will be triggered when the operation completes and the result returned via `Event.data` instead of `get_read_data()` or `get_write_resp()`.
* _event_: `Event` object used to wait on and retrieve result for specific operation, default `None`. The event will be triggered when the operation completes and the result returned via `Event.data`. (`init_read()` and `init_write()` only)
#### `AxiBus` and `AxiLiteBus` objects
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are extensions of `cocotb.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
The `AxiBus`, `AxiLiteBus`, and related objects are containers for the interface signals. These hold instances of bus objects for the individual channels, which are currently extensions of `cocotb_bus.bus.Bus`. Class methods `from_entity` and `from_prefix` are provided to facilitate signal name matching. For AXI interfaces use `AxiBus`, `AxiReadBus`, or `AxiWriteBus`, as appropriate. For AXI lite interfaces, use `AxiLiteBus`, `AxiLiteReadBus`, or `AxiLiteWriteBus`, as appropriate.
### AXI and AXI lite RAM
@@ -270,7 +257,7 @@ Note: _byte_size_, _byte_lanes_, `len(tdata)`, and `len(tkeep)` are all related,
* `read_nowait(count)`: read _count_ bytes from buffer (non-blocking) (sink/monitor)
* `count()`: returns the number of items in the queue (all)
* `empty()`: returns _True_ if the queue is empty (all)
* `full()`: returns _True_ if the queue occupancy limits are met (sink)
* `full()`: returns _True_ if the queue occupancy limits are met (source/sink)
* `idle()`: returns _True_ if no transfer is in progress (all) or if the queue is not empty (source)
* `clear()`: drop all data in queue (all)
* `wait()`: wait for idle (source)

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple, Counter
from collections import namedtuple, Counter
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event
from .version import __version__
@@ -58,21 +59,22 @@ class AxiMasterWrite(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.aw_channel = AxiAWSource(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSource(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSink(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = deque()
self.write_command_sync = Event()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.write_command_queue = Queue()
self.current_write_command = None
self.id_count = 2**len(self.aw_channel.bus.awid)
self.cur_id = 0
self.active_id = Counter()
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = [deque() for k in range(self.id_count)]
self.int_write_resp_command_queue = [Queue() for k in range(self.id_count)]
self.current_write_resp_command = [None for k in range(self.id_count)]
self.int_write_resp_queue_list = [Queue() for k in range(self.id_count)]
self.in_flight_operations = 0
self._idle = Event()
@@ -102,13 +104,17 @@ class AxiMasterWrite(Reset):
self._process_write_cr = None
self._process_write_resp_cr = None
self._process_write_resp_id_cr = None
self._init_reset(reset, reset_active_level)
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if awid is None or awid < 0:
@@ -138,8 +144,9 @@ class AxiMasterWrite(Reset):
cmd = AxiWriteCmd(address, bytearray(data), awid, burst, size, lock,
cache, prot, qos, region, user, wuser, event)
self.write_command_queue.append(cmd)
self.write_command_sync.set()
self.write_command_queue.put_nowait(cmd)
return event
def idle(self):
return not self.in_flight_operations
@@ -148,18 +155,9 @@ class AxiMasterWrite(Reset):
while not self.idle():
await self._idle.wait()
def write_resp_ready(self):
return bool(self.write_resp_queue)
def get_write_resp(self):
if self.write_resp_queue:
return self.write_resp_queue.popleft()
return None
async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0):
event = Event()
self.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
event = self.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser)
await event.wait()
return event.data
@@ -209,39 +207,62 @@ class AxiMasterWrite(Reset):
if self._process_write_resp_cr is not None:
self._process_write_resp_cr.kill()
self._process_write_resp_cr = None
if self._process_write_resp_id_cr is not None:
for cr in self._process_write_resp_id_cr:
cr.kill()
self._process_write_resp_id_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed write operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.write_command_queue.empty():
cmd = self.write_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_command:
cmd = self.current_write_command
self.current_write_command = None
flush_cmd(cmd)
for q in self.int_write_resp_command_queue:
while not q.empty():
cmd = q.get_nowait()
flush_cmd(cmd)
for k in range(len(self.current_write_resp_command)):
if self.current_write_resp_command[k]:
cmd = self.current_write_resp_command[k]
self.current_write_resp_command[k] = None
flush_cmd(cmd)
for q in self.int_write_resp_queue_list:
while not q.empty():
q.get_nowait()
self.cur_id = 0
self.active_id = Counter()
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
while self.write_command_queue:
cmd = self.write_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_write_resp_command_queue:
cmd = self.int_write_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.write_resp_queue.clear()
self.in_flight_operations = 0
self._idle.set()
if self._process_write_resp_id_cr is None:
self._process_write_resp_id_cr = [cocotb.fork(self._process_write_resp_id(i)) for i in range(self.id_count)]
async def _process_write(self):
while True:
if not self.write_command_queue:
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = self.write_command_queue.popleft()
cmd = await self.write_command_queue.get()
self.current_write_command = cmd
num_bytes = 2**cmd.size
@@ -298,7 +319,7 @@ class AxiMasterWrite(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((awid, burst_length))
burst_list.append(burst_length)
aw = self.aw_channel._transaction_obj()
aw.awid = awid
@@ -340,32 +361,31 @@ class AxiMasterWrite(Reset):
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd)
self.int_write_resp_command_sync.set()
await self.int_write_resp_command_queue[awid].put(resp_cmd)
self.current_write_command = None
async def _process_write_resp(self):
while True:
if not self.int_write_resp_command_queue:
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
b = await self.b_channel.recv()
cmd = self.int_write_resp_command_queue.popleft()
bid = int(b.bid)
if self.active_id[bid] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
await self.int_write_resp_queue_list[bid].put(b)
async def _process_write_resp_id(self, bid):
while True:
cmd = await self.int_write_resp_command_queue[bid].get()
self.current_write_resp_command[bid] = cmd
resp = AxiResp.OKAY
user = []
for bid, burst_length in cmd.burst_list:
while not self.int_write_resp_queue_list[bid]:
b = await self.b_channel.recv()
i = int(b.bid)
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[i].append(b)
b = self.int_write_resp_queue_list[bid].popleft()
for burst_length in cmd.burst_list:
b = await self.int_write_resp_queue_list[bid].get()
burst_id = int(b.bid)
burst_resp = AxiResp(b.bresp)
@@ -389,11 +409,9 @@ class AxiMasterWrite(Reset):
write_resp = AxiWriteResp(cmd.address, cmd.length, resp, user)
if cmd.event is not None:
cmd.event.set(write_resp)
else:
self.write_resp_queue.append(write_resp)
self.write_resp_sync.set()
cmd.event.set(write_resp)
self.current_write_resp_command[bid] = None
self.in_flight_operations -= 1
@@ -411,20 +429,20 @@ class AxiMasterRead(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.ar_channel = AxiARSource(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSink(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = deque()
self.read_command_sync = Event()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.read_command_queue = Queue()
self.current_read_command = None
self.id_count = 2**len(self.ar_channel.bus.arid)
self.cur_id = 0
self.active_id = Counter()
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = [deque() for k in range(self.id_count)]
self.int_read_resp_command_queue = [Queue() for k in range(self.id_count)]
self.current_read_resp_command = [None for k in range(self.id_count)]
self.int_read_resp_queue_list = [Queue() for k in range(self.id_count)]
self.in_flight_operations = 0
self._idle = Event()
@@ -452,13 +470,17 @@ class AxiMasterRead(Reset):
self._process_read_cr = None
self._process_read_resp_cr = None
self._process_read_resp_id_cr = None
self._init_reset(reset, reset_active_level)
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if length < 0:
@@ -483,8 +505,9 @@ class AxiMasterRead(Reset):
self._idle.clear()
cmd = AxiReadCmd(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
self.read_command_queue.append(cmd)
self.read_command_sync.set()
self.read_command_queue.put_nowait(cmd)
return event
def idle(self):
return not self.in_flight_operations
@@ -493,18 +516,9 @@ class AxiMasterRead(Reset):
while not self.idle():
await self._idle.wait()
def read_data_ready(self):
return bool(self.read_data_queue)
def get_read_data(self):
if self.read_data_queue:
return self.read_data_queue.popleft()
return None
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
event = Event()
self.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
event = self.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user)
await event.wait()
return event.data
@@ -554,38 +568,61 @@ class AxiMasterRead(Reset):
if self._process_read_resp_cr is not None:
self._process_read_resp_cr.kill()
self._process_read_resp_cr = None
if self._process_read_resp_id_cr is not None:
for cr in self._process_read_resp_id_cr:
cr.kill()
self._process_read_resp_id_cr = None
self.ar_channel.clear()
self.r_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed read operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.read_command_queue.empty():
cmd = self.read_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_command:
cmd = self.current_read_command
self.current_read_command = None
flush_cmd(cmd)
for q in self.int_read_resp_command_queue:
while not q.empty():
cmd = q.get_nowait()
flush_cmd(cmd)
for k in range(len(self.current_read_resp_command)):
if self.current_read_resp_command[k]:
cmd = self.current_read_resp_command[k]
self.current_read_resp_command[k] = None
flush_cmd(cmd)
for q in self.int_read_resp_queue_list:
while not q.empty():
q.get_nowait()
self.cur_id = 0
self.active_id = Counter()
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self.ar_channel.clear()
self.r_channel.clear()
while self.read_command_queue:
cmd = self.read_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_read_resp_command_queue:
cmd = self.int_read_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.read_data_queue.clear()
self.in_flight_operations = 0
self._idle.set()
if self._process_read_resp_id_cr is None:
self._process_read_resp_id_cr = [cocotb.fork(self._process_read_resp_id(i)) for i in range(self.id_count)]
async def _process_read(self):
while True:
if not self.read_command_queue:
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = self.read_command_queue.popleft()
cmd = await self.read_command_queue.get()
self.current_read_command = cmd
num_bytes = 2**cmd.size
@@ -619,7 +656,7 @@ class AxiMasterRead(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((arid, burst_length))
burst_list.append(burst_length)
ar = self.r_channel._transaction_obj()
ar.arid = arid
@@ -643,16 +680,25 @@ class AxiMasterRead(Reset):
cur_addr += num_bytes
resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd)
self.int_read_resp_command_sync.set()
await self.int_read_resp_command_queue[arid].put(resp_cmd)
self.current_read_command = None
async def _process_read_resp(self):
while True:
if not self.int_read_resp_command_queue:
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
r = await self.r_channel.recv()
cmd = self.int_read_resp_command_queue.popleft()
rid = int(r.rid)
if self.active_id[rid] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
await self.int_read_resp_queue_list[rid].put(r)
async def _process_read_resp_id(self, rid):
while True:
cmd = await self.int_read_resp_command_queue[rid].get()
self.current_read_resp_command[rid] = cmd
num_bytes = 2**cmd.size
@@ -669,19 +715,9 @@ class AxiMasterRead(Reset):
first = True
for rid, burst_length in cmd.burst_list:
for burst_length in cmd.burst_list:
for k in range(burst_length):
while not self.int_read_resp_queue_list[rid]:
r = await self.r_channel.recv()
i = int(r.rid)
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[i].append(r)
r = self.int_read_resp_queue_list[rid].popleft()
r = await self.int_read_resp_queue_list[rid].get()
cycle_id = int(r.rid)
cycle_data = int(r.rdata)
@@ -724,11 +760,9 @@ class AxiMasterRead(Reset):
read_resp = AxiReadResp(cmd.address, data, resp, user)
if cmd.event is not None:
cmd.event.set(read_resp)
else:
self.read_data_queue.append(read_resp)
self.read_data_sync.set()
cmd.event.set(read_resp)
self.current_read_resp_command[rid] = None
self.in_flight_operations -= 1
@@ -746,11 +780,11 @@ class AxiMaster:
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):
self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
return self.read_if.init_read(address, length, arid, burst, size, lock, cache, prot, qos, region, user, event)
def init_write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL,
cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0, event=None):
self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
return self.write_if.init_write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
@@ -766,18 +800,6 @@ class AxiMaster:
async def wait_write(self):
await self.write_if.wait()
def read_data_ready(self):
return self.read_if.read_data_ready()
def get_read_data(self):
return self.read_if.get_read_data()
def write_resp_ready(self):
return self.write_if.write_resp_ready()
def get_write_resp(self):
return self.write_if.get_write_resp()
async def read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.read_if.read(address, length, arid,

View File

@@ -45,8 +45,11 @@ class AxiRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
@@ -75,15 +78,15 @@ class AxiRamWrite(Memory, Reset):
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
@@ -168,7 +171,9 @@ class AxiRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
@@ -195,14 +200,14 @@ class AxiRamRead(Memory, Reset):
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self.ar_channel.clear()
self.r_channel.clear()
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()

View File

@@ -23,9 +23,10 @@ THE SOFTWARE.
"""
import logging
from collections import deque, namedtuple
from collections import namedtuple
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import Event
from .version import __version__
@@ -54,16 +55,17 @@ class AxiLiteMasterWrite(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.aw_channel = AxiLiteAWSource(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSource(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSink(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.write_command_queue = deque()
self.write_command_sync = Event()
self.write_resp_queue = deque()
self.write_resp_sync = Event()
self.write_command_queue = Queue()
self.current_write_command = None
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_command_queue = Queue()
self.current_write_resp_command = None
self.in_flight_operations = 0
self._idle = Event()
@@ -88,14 +90,18 @@ class AxiLiteMasterWrite(Reset):
self._init_reset(reset, reset_active_level)
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
self.in_flight_operations += 1
self._idle.clear()
self.write_command_queue.append(AxiLiteWriteCmd(address, bytearray(data), prot, event))
self.write_command_sync.set()
self.write_command_queue.put_nowait(AxiLiteWriteCmd(address, bytearray(data), prot, event))
return event
def idle(self):
return not self.in_flight_operations
@@ -104,17 +110,8 @@ class AxiLiteMasterWrite(Reset):
while not self.idle():
await self._idle.wait()
def write_resp_ready(self):
return bool(self.write_resp_queue)
def get_write_resp(self):
if self.write_resp_queue:
return self.write_resp_queue.popleft()
return None
async def write(self, address, data, prot=AxiProt.NONSECURE):
event = Event()
self.init_write(address, data, prot, event)
event = self.init_write(address, data, prot)
await event.wait()
return event.data
@@ -152,6 +149,36 @@ class AxiLiteMasterWrite(Reset):
if self._process_write_resp_cr is not None:
self._process_write_resp_cr.kill()
self._process_write_resp_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed write operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.write_command_queue.empty():
cmd = self.write_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_command:
cmd = self.current_write_command
self.current_write_command = None
flush_cmd(cmd)
while not self.int_write_resp_command_queue.empty():
cmd = self.int_write_resp_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_write_resp_command:
cmd = self.current_write_resp_command
self.current_write_resp_command = None
flush_cmd(cmd)
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
@@ -159,32 +186,10 @@ class AxiLiteMasterWrite(Reset):
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
while self.write_command_queue:
cmd = self.write_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_write_resp_command_queue:
cmd = self.int_write_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.write_resp_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_write(self):
while True:
if not self.write_command_queue:
self.write_command_sync.clear()
await self.write_command_sync.wait()
cmd = self.write_command_queue.popleft()
cmd = await self.write_command_queue.get()
self.current_write_command = cmd
word_addr = (cmd.address // self.byte_width) * self.byte_width
@@ -197,8 +202,7 @@ class AxiLiteMasterWrite(Reset):
cycles = (len(cmd.data) + (cmd.address % self.byte_width) + self.byte_width-1) // self.byte_width
resp_cmd = AxiLiteWriteRespCmd(cmd.address, len(cmd.data), cycles, cmd.prot, cmd.event)
self.int_write_resp_command_queue.append(resp_cmd)
self.int_write_resp_command_sync.set()
await self.int_write_resp_command_queue.put(resp_cmd)
offset = 0
@@ -233,13 +237,12 @@ class AxiLiteMasterWrite(Reset):
await self.aw_channel.send(aw)
await self.w_channel.send(w)
self.current_write_command = None
async def _process_write_resp(self):
while True:
if not self.int_write_resp_command_queue:
self.int_write_resp_command_sync.clear()
await self.int_write_resp_command_sync.wait()
cmd = self.int_write_resp_command_queue.popleft()
cmd = await self.int_write_resp_command_queue.get()
self.current_write_resp_command = cmd
resp = AxiResp.OKAY
@@ -256,11 +259,9 @@ class AxiLiteMasterWrite(Reset):
write_resp = AxiLiteWriteResp(cmd.address, cmd.length, resp)
if cmd.event is not None:
cmd.event.set(write_resp)
else:
self.write_resp_queue.append(write_resp)
self.write_resp_sync.set()
cmd.event.set(write_resp)
self.current_write_resp_command = None
self.in_flight_operations -= 1
@@ -278,15 +279,15 @@ class AxiLiteMasterRead(Reset):
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.ar_channel = AxiLiteARSource(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSink(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.read_command_queue = deque()
self.read_command_sync = Event()
self.read_data_queue = deque()
self.read_data_sync = Event()
self.read_command_queue = Queue()
self.current_read_command = None
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_command_queue = Queue()
self.current_read_resp_command = None
self.in_flight_operations = 0
self._idle = Event()
@@ -309,14 +310,18 @@ class AxiLiteMasterRead(Reset):
self._init_reset(reset, reset_active_level)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
if event is not None and not isinstance(event, Event):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
self.in_flight_operations += 1
self._idle.clear()
self.read_command_queue.append(AxiLiteReadCmd(address, length, prot, event))
self.read_command_sync.set()
self.read_command_queue.put_nowait(AxiLiteReadCmd(address, length, prot, event))
return event
def idle(self):
return not self.in_flight_operations
@@ -325,17 +330,8 @@ class AxiLiteMasterRead(Reset):
while not self.idle():
await self._idle.wait()
def read_data_ready(self):
return bool(self.read_data_queue)
def get_read_data(self):
if self.read_data_queue:
return self.read_data_queue.popleft()
return None
async def read(self, address, length, prot=AxiProt.NONSECURE):
event = Event()
self.init_read(address, length, prot, event)
event = self.init_read(address, length, prot)
await event.wait()
return event.data
@@ -373,6 +369,35 @@ class AxiLiteMasterRead(Reset):
if self._process_read_resp_cr is not None:
self._process_read_resp_cr.kill()
self._process_read_resp_cr = None
self.ar_channel.clear()
self.r_channel.clear()
def flush_cmd(cmd):
self.log.warning("Flushed read operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.read_command_queue.empty():
cmd = self.read_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_command:
cmd = self.current_read_command
self.current_read_command = None
flush_cmd(cmd)
while not self.int_read_resp_command_queue.empty():
cmd = self.int_read_resp_command_queue.get_nowait()
flush_cmd(cmd)
if self.current_read_resp_command:
cmd = self.current_read_resp_command
self.current_read_resp_command = None
flush_cmd(cmd)
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
@@ -380,39 +405,17 @@ class AxiLiteMasterRead(Reset):
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
self.ar_channel.clear()
self.r_channel.clear()
while self.read_command_queue:
cmd = self.read_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
while self.int_read_resp_command_queue:
cmd = self.int_read_resp_command_queue.popleft()
if cmd.event:
cmd.event.set(None)
self.read_data_queue.clear()
self.in_flight_operations = 0
self._idle.set()
async def _process_read(self):
while True:
if not self.read_command_queue:
self.read_command_sync.clear()
await self.read_command_sync.wait()
cmd = self.read_command_queue.popleft()
cmd = await self.read_command_queue.get()
self.current_read_command = cmd
word_addr = (cmd.address // self.byte_width) * self.byte_width
cycles = (cmd.length + self.byte_width-1 + (cmd.address % self.byte_width)) // self.byte_width
resp_cmd = AxiLiteReadRespCmd(cmd.address, cmd.length, cycles, cmd.prot, cmd.event)
self.int_read_resp_command_queue.append(resp_cmd)
self.int_read_resp_command_sync.set()
await self.int_read_resp_command_queue.put(resp_cmd)
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
cmd.address, cmd.prot, cmd.length)
@@ -424,13 +427,12 @@ class AxiLiteMasterRead(Reset):
await self.ar_channel.send(ar)
self.current_read_command = None
async def _process_read_resp(self):
while True:
if not self.int_read_resp_command_queue:
self.int_read_resp_command_sync.clear()
await self.int_read_resp_command_sync.wait()
cmd = self.int_read_resp_command_queue.popleft()
cmd = await self.int_read_resp_command_queue.get()
self.current_read_resp_command = cmd
start_offset = cmd.address % self.byte_width
end_offset = ((cmd.address + cmd.length - 1) % self.byte_width) + 1
@@ -464,11 +466,9 @@ class AxiLiteMasterRead(Reset):
read_resp = AxiLiteReadResp(cmd.address, data, resp)
if cmd.event is not None:
cmd.event.set(read_resp)
else:
self.read_data_queue.append(read_resp)
self.read_data_sync.set()
cmd.event.set(read_resp)
self.current_read_resp_command = None
self.in_flight_operations -= 1
@@ -485,10 +485,10 @@ class AxiLiteMaster:
self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
self.read_if.init_read(address, length, prot, event)
return self.read_if.init_read(address, length, prot, event)
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
self.write_if.init_write(address, data, prot, event)
return self.write_if.init_write(address, data, prot, event)
def idle(self):
return (not self.read_if or self.read_if.idle()) and (not self.write_if or self.write_if.idle())
@@ -504,18 +504,6 @@ class AxiLiteMaster:
async def wait_write(self):
await self.write_if.wait()
def read_data_ready(self):
return self.read_if.read_data_ready()
def get_read_data(self):
return self.read_if.get_read_data()
def write_resp_ready(self):
return self.write_if.write_resp_ready()
def get_write_resp(self):
return self.write_if.get_write_resp()
async def read(self, address, length, prot=AxiProt.NONSECURE):
return await self.read_if.read(address, length, prot)

View File

@@ -45,8 +45,11 @@ class AxiLiteRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level)
self.aw_channel.queue_occupancy_limit = 2
self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level)
self.w_channel.queue_occupancy_limit = 2
self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level)
self.b_channel.queue_occupancy_limit = 2
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
@@ -72,15 +75,15 @@ class AxiLiteRamWrite(Memory, Reset):
if self._process_write_cr is not None:
self._process_write_cr.kill()
self._process_write_cr = None
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_write_cr is None:
self._process_write_cr = cocotb.fork(self._process_write())
self.aw_channel.clear()
self.w_channel.clear()
self.b_channel.clear()
async def _process_write(self):
while True:
aw = await self.aw_channel.recv()
@@ -126,7 +129,9 @@ class AxiLiteRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level)
self.ar_channel.queue_occupancy_limit = 2
self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level)
self.r_channel.queue_occupancy_limit = 2
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
@@ -150,14 +155,14 @@ class AxiLiteRamRead(Memory, Reset):
if self._process_read_cr is not None:
self._process_read_cr.kill()
self._process_read_cr = None
self.ar_channel.clear()
self.r_channel.clear()
else:
self.log.info("Reset de-asserted")
if self._process_read_cr is None:
self._process_read_cr = cocotb.fork(self._process_read())
self.ar_channel.clear()
self.r_channel.clear()
async def _process_read(self):
while True:
ar = await self.ar_channel.recv()

View File

@@ -23,12 +23,12 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Timer, First, Event
from cocotb.utils import get_sim_time
from cocotb.bus import Bus
from cocotb_bus.bus import Bus
from .version import __version__
from .reset import Reset
@@ -276,8 +276,12 @@ class AxiStreamBase(Reset):
super().__init__(*args, **kwargs)
self.active = False
self.queue = deque()
self.queue_sync = Event()
self.queue = Queue()
self.dequeue_event = Event()
self.current_frame = None
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
@@ -344,13 +348,19 @@ class AxiStreamBase(Reset):
self._init_reset(reset, reset_active_level)
def count(self):
return len(self.queue)
return self.queue.qsize()
def empty(self):
return not self.queue
return self.queue.empty()
def clear(self):
self.queue.clear()
while not self.queue.empty():
frame = self.queue.get_nowait()
frame.sim_time_end = None
frame.handle_tx_complete()
self.dequeue_event.set()
self.idle_event.set()
self.active_event.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
@@ -360,13 +370,16 @@ class AxiStreamBase(Reset):
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
self.active = False
if self.queue.empty():
self.idle_event.set()
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
raise NotImplementedError()
@@ -407,14 +420,32 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
_valid_init = 0
_ready_init = None
async def send(self, frame):
self.send_nowait(frame)
def __init__(self, bus, clock, reset=None, reset_active_level=True,
byte_size=None, byte_lanes=None, *args, **kwargs):
def send_nowait(self, frame):
super().__init__(bus, clock, reset, reset_active_level, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1
async def send(self, frame):
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
frame = AxiStreamFrame(frame)
await self.queue.put(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
def send_nowait(self, frame):
if self.full():
raise QueueFull()
frame = AxiStreamFrame(frame)
self.queue.put_nowait(frame)
self.idle_event.clear()
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
async def write(self, data):
await self.send(data)
@@ -422,29 +453,42 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
def write_nowait(self, data):
self.send_nowait(data)
def full(self):
if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames:
return True
else:
return False
def idle(self):
return self.empty() and not self.active
async def wait(self):
while not self.idle():
await RisingEdge(self.clock)
await self.idle_event.wait()
def _handle_reset(self, state):
super()._handle_reset(state)
self.bus.tdata <= 0
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 0
if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0
if hasattr(self.bus, "tkeep"):
self.bus.tkeep <= 0
if hasattr(self.bus, "tid"):
self.bus.tid <= 0
if hasattr(self.bus, "tdest"):
self.bus.tdest <= 0
if hasattr(self.bus, "tuser"):
self.bus.tuser <= 0
if state:
self.bus.tdata <= 0
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 0
if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0
if hasattr(self.bus, "tkeep"):
self.bus.tkeep <= 0
if hasattr(self.bus, "tid"):
self.bus.tid <= 0
if hasattr(self.bus, "tdest"):
self.bus.tdest <= 0
if hasattr(self.bus, "tuser"):
self.bus.tuser <= 0
if self.current_frame:
self.log.warning("Flushed transmit frame during reset: %s", self.current_frame)
self.current_frame.handle_tx_complete()
self.current_frame = None
async def _run(self):
frame = None
@@ -458,10 +502,12 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
if (tready_sample and tvalid_sample) or not tvalid_sample:
if frame is None and self.queue:
frame = self.queue.popleft()
if frame is None and not self.queue.empty():
frame = self.queue.get_nowait()
self.dequeue_event.set()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
self.current_frame = frame
frame.sim_time_start = get_sim_time()
frame.sim_time_end = None
self.log.info("TX frame: %s", frame)
@@ -488,6 +534,7 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
frame.sim_time_end = get_sim_time()
frame.handle_tx_complete()
frame = None
self.current_frame = None
break
self.bus.tdata <= tdata_val
@@ -509,6 +556,8 @@ class AxiStreamSource(AxiStreamBase, AxiStreamPause):
if hasattr(self.bus, "tlast"):
self.bus.tlast <= 0
self.active = bool(frame)
if not frame and self.queue.empty():
self.idle_event.set()
class AxiStreamMonitor(AxiStreamBase):
@@ -527,21 +576,22 @@ class AxiStreamMonitor(AxiStreamBase):
self.read_queue = []
def _recv(self, frame, compact=True):
if self.queue.empty():
self.active_event.clear()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
async def recv(self, compact=True):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait(compact)
frame = await self.queue.get()
return self._recv(frame, compact)
def recv_nowait(self, compact=True):
if self.queue:
frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
return None
frame = self.queue.get_nowait()
return self._recv(frame, compact)
async def read(self, count=-1):
while not self.read_queue:
@@ -565,11 +615,10 @@ class AxiStreamMonitor(AxiStreamBase):
async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty():
return
self.queue_sync.clear()
if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else:
await self.queue_sync.wait()
await self.active_event.wait()
async def _run(self):
frame = None
@@ -609,8 +658,8 @@ class AxiStreamMonitor(AxiStreamBase):
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.queue_sync.set()
self.queue.put_nowait(frame)
self.active_event.set()
frame = None
@@ -643,8 +692,9 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
def _handle_reset(self, state):
super()._handle_reset(state)
if hasattr(self.bus, "tready"):
self.bus.tready <= 0
if state:
if hasattr(self.bus, "tready"):
self.bus.tready <= 0
async def _run(self):
frame = None
@@ -684,8 +734,8 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.queue_sync.set()
self.queue.put_nowait(frame)
self.active_event.set()
frame = None

View File

@@ -23,11 +23,11 @@ THE SOFTWARE.
"""
import logging
from collections import deque
import cocotb
from cocotb.queue import Queue, QueueFull
from cocotb.triggers import RisingEdge, Event, First, Timer
from cocotb.bus import Bus
from cocotb_bus.bus import Bus
from .reset import Reset
@@ -94,8 +94,11 @@ class StreamBase(Reset):
self.active = False
self.queue = deque()
self.queue_sync = Event()
self.queue = Queue()
self.dequeue_event = Event()
self.idle_event = Event()
self.idle_event.set()
self.active_event = Event()
self.ready = None
self.valid = None
@@ -124,13 +127,17 @@ class StreamBase(Reset):
self._init_reset(reset, reset_active_level)
def count(self):
return len(self.queue)
return self.queue.qsize()
def empty(self):
return not self.queue
return self.queue.empty()
def clear(self):
self.queue.clear()
while not self.queue.empty():
self.queue.get_nowait()
self.dequeue_event.set()
self.idle_event.set()
self.active_event.clear()
def _handle_reset(self, state):
if state:
@@ -138,13 +145,16 @@ class StreamBase(Reset):
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
self.active = False
if self.queue.empty():
self.idle_event.set()
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
raise NotImplementedError()
@@ -183,24 +193,42 @@ class StreamSource(StreamBase, StreamPause):
_valid_init = 0
_ready_init = None
def __init__(self, bus, clock, reset=None, reset_active_level=True, *args, **kwargs):
super().__init__(bus, clock, reset, reset_active_level, *args, **kwargs)
self.queue_occupancy_limit = -1
async def send(self, obj):
self.send_nowait(obj)
while self.full():
self.dequeue_event.clear()
await self.dequeue_event.wait()
await self.queue.put(obj)
self.idle_event.clear()
def send_nowait(self, obj):
self.queue.append(obj)
if self.full():
raise QueueFull()
self.queue.put_nowait(obj)
self.idle_event.clear()
def full(self):
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True
else:
return False
def idle(self):
return self.empty() and not self.active
async def wait(self):
while not self.idle():
await RisingEdge(self.clock)
await self.idle_event.wait()
def _handle_reset(self, state):
super()._handle_reset(state)
if self.valid is not None:
self.valid <= 0
if state:
if self.valid is not None:
self.valid <= 0
async def _run(self):
while True:
@@ -211,15 +239,18 @@ class StreamSource(StreamBase, StreamPause):
valid_sample = self.valid is None or self.valid.value
if (ready_sample and valid_sample) or (not valid_sample):
if self.queue and not self.pause:
self.bus.drive(self.queue.popleft())
if not self.queue.empty() and not self.pause:
self.bus.drive(self.queue.get_nowait())
self.dequeue_event.set()
if self.valid is not None:
self.valid <= 1
self.active = True
else:
if self.valid is not None:
self.valid <= 0
self.active = bool(self.queue)
self.active = not self.queue.empty()
if self.queue.empty():
self.idle_event.set()
class StreamMonitor(StreamBase):
@@ -229,25 +260,26 @@ class StreamMonitor(StreamBase):
_valid_init = None
_ready_init = None
def _recv(self, item):
if self.queue.empty():
self.active_event.clear()
return item
async def recv(self):
while self.empty():
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait()
item = await self.queue.get()
return self._recv(item)
def recv_nowait(self):
if self.queue:
return self.queue.popleft()
return None
item = self.queue.get_nowait()
return self._recv(item)
async def wait(self, timeout=0, timeout_unit=None):
if not self.empty():
return
self.queue_sync.clear()
if timeout:
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
await First(self.active_event.wait(), Timer(timeout, timeout_unit))
else:
await self.queue_sync.wait()
await self.active_event.wait()
async def _run(self):
while True:
@@ -260,8 +292,8 @@ class StreamMonitor(StreamBase):
if ready_sample and valid_sample:
obj = self._transaction_obj()
self.bus.sample(obj)
self.queue.append(obj)
self.queue_sync.set()
self.queue.put_nowait(obj)
self.active_event.set()
class StreamSink(StreamMonitor, StreamPause):
@@ -277,7 +309,7 @@ class StreamSink(StreamMonitor, StreamPause):
self.queue_occupancy_limit = -1
def full(self):
if self.queue_occupancy_limit > 0 and len(self.queue) >= self.queue_occupancy_limit:
if self.queue_occupancy_limit > 0 and self.count() >= self.queue_occupancy_limit:
return True
else:
return False
@@ -285,8 +317,9 @@ class StreamSink(StreamMonitor, StreamPause):
def _handle_reset(self, state):
super()._handle_reset(state)
if self.ready is not None:
self.ready <= 0
if state:
if self.ready is not None:
self.ready <= 0
async def _run(self):
while True:
@@ -299,8 +332,8 @@ class StreamSink(StreamMonitor, StreamPause):
if ready_sample and valid_sample:
obj = self._transaction_obj()
self.bus.sample(obj)
self.queue.append(obj)
self.queue_sync.set()
self.queue.put_nowait(obj)
self.active_event.set()
if self.ready is not None:
self.ready <= (not self.full() and not self.pause)

View File

@@ -1 +1 @@
__version__ = "0.1.6"
__version__ = "0.1.10"

View File

@@ -27,6 +27,7 @@ packages = find_namespace:
python_requires = >=3.6
install_requires =
cocotb
cocotb-bus
[options.extras_require]
test =

View File

@@ -160,6 +160,11 @@ async def run_test_write_words(dut):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
event = tb.axi_master.init_write(addr, test_data)
await event.wait()
assert tb.axi_ram.read(addr, length) == test_data
test_data = bytearray([x % 256 for x in range(length)])
await tb.axi_master.write(addr, test_data)
assert tb.axi_ram.read(addr, length) == test_data
@@ -209,6 +214,12 @@ async def run_test_read_words(dut):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(addr, test_data)
event = tb.axi_master.init_read(addr, length)
await event.wait()
assert event.data.data == test_data
test_data = bytearray([x % 256 for x in range(length)])
tb.axi_ram.write(addr, test_data)
assert (await tb.axi_master.read(addr, length)).data == test_data
@@ -287,7 +298,7 @@ def cycle_pause():
if cocotb.SIM_NAME:
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
data_width = len(cocotb.top.axi_wdata)
byte_width = data_width // 8
max_burst_size = (byte_width-1).bit_length()

View File

@@ -149,6 +149,11 @@ async def run_test_write_words(dut):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
event = tb.axil_master.init_write(addr, test_data)
await event.wait()
assert tb.axil_ram.read(addr, length) == test_data
test_data = bytearray([x % 256 for x in range(length)])
await tb.axil_master.write(addr, test_data)
assert tb.axil_ram.read(addr, length) == test_data
@@ -198,6 +203,12 @@ async def run_test_read_words(dut):
tb.log.info("length %d, offset %d", length, offset)
addr = offset+0x1000
test_data = bytearray([x % 256 for x in range(length)])
tb.axil_ram.write(addr, test_data)
event = tb.axil_master.init_read(addr, length)
await event.wait()
assert event.data.data == test_data
test_data = bytearray([x % 256 for x in range(length)])
tb.axil_ram.write(addr, test_data)
assert (await tb.axil_master.read(addr, length)).data == test_data

View File

@@ -126,7 +126,7 @@ def cycle_pause():
def size_list():
data_width = int(os.getenv("PARAM_DATA_WIDTH"))
data_width = len(cocotb.top.axis_tdata)
byte_width = data_width // 8
return list(range(1, byte_width*4+1)) + [512] + [1]*64