Use new channel objects

This commit is contained in:
Alex Forencich
2020-10-22 11:31:06 -07:00
parent 44eb2971db
commit db5c34816f
4 changed files with 232 additions and 979 deletions

View File

@@ -23,36 +23,24 @@ THE SOFTWARE.
"""
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event
from cocotb.drivers import BusDriver
from cocotb.triggers import RisingEdge, Event
from cocotb.log import SimLog
from collections import deque
from .constants import *
from .axi_channels import *
class AxiMasterWrite(BusDriver):
_signals = [
# Write address channel
"awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready",
# Write data channel
"wdata", "wstrb", "wlast", "wvalid", "wready",
# Write response channel
"bid", "bresp", "bvalid", "bready",
]
_optional_signals = [
# Write address channel
"awlock", "awcache", "awqos", "awregion", "awuser",
# Write data channel
"wuser",
# Write response channel
"buser",
]
class AxiMasterWrite(object):
def __init__(self, entity, name, clock, reset=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.reset = reset
self.aw_channel = AxiAWSource(entity, name, clock, reset)
self.w_channel = AxiWSource(entity, name, clock, reset)
self.b_channel = AxiBSink(entity, name, clock, reset)
self.active_tokens = set()
@@ -62,80 +50,30 @@ class AxiMasterWrite(BusDriver):
self.write_resp_sync = Event()
self.write_resp_set = set()
self.id_queue = deque(range(2**len(self.bus.awid)))
self.id_queue = deque(range(2**len(self.aw_channel.bus.awid)))
self.id_sync = Event()
self.int_write_addr_queue = deque()
self.int_write_data_queue = deque()
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_queue_list = {}
self.int_write_resp_sync_list = {}
self.in_flight_operations = 0
self.width = len(self.bus.wdata)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**len(self.bus.wstrb)-1
self.strb_mask = 2**self.byte_width-1
self.max_burst_len = 256
self.max_burst_size = (self.byte_width-1).bit_length()
assert self.byte_width == len(self.bus.wstrb)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.reset = reset
self.bus.awid.setimmediatevalue(0)
self.bus.awaddr.setimmediatevalue(0)
assert len(self.bus.awlen) == 8
self.bus.awlen.setimmediatevalue(0)
assert len(self.bus.awsize) == 3
self.bus.awsize.setimmediatevalue(0)
assert len(self.bus.awburst) == 2
self.bus.awburst.setimmediatevalue(0)
if hasattr(self.bus, "awlock"):
assert len(self.bus.awlock) == 1
self.bus.awlock.setimmediatevalue(0)
if hasattr(self.bus, "awcache"):
assert len(self.bus.awcache) == 4
self.bus.awcache.setimmediatevalue(0)
assert len(self.bus.awprot) == 3
self.bus.awprot.setimmediatevalue(0)
if hasattr(self.bus, "awqos"):
assert len(self.bus.awqos) == 4
self.bus.awqos.setimmediatevalue(0)
if hasattr(self.bus, "awregion"):
assert len(self.bus.awregion) == 4
self.bus.awregion.setimmediatevalue(0)
if hasattr(self.bus, "awuser"):
self.bus.awuser.setimmediatevalue(0)
assert len(self.bus.awvalid) == 1
self.bus.awvalid.setimmediatevalue(0)
assert len(self.bus.awready) == 1
self.bus.wdata.setimmediatevalue(0)
self.bus.wstrb.setimmediatevalue(0)
assert len(self.bus.wlast) == 1
self.bus.wlast.setimmediatevalue(0)
if hasattr(self.bus, "wuser"):
self.bus.wuser.setimmediatevalue(0)
assert len(self.bus.wvalid) == 1
self.bus.wvalid.setimmediatevalue(0)
assert len(self.bus.wready) == 1
assert len(self.bus.bid) == len(self.bus.awid)
assert len(self.bus.bresp) == 2
assert len(self.bus.bvalid) == 1
assert len(self.bus.bready) == 1
self.bus.bready.setimmediatevalue(0)
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
cocotb.fork(self._process_write())
cocotb.fork(self._process_write_resp())
cocotb.fork(self._process_write_addr_if())
cocotb.fork(self._process_write_data_if())
cocotb.fork(self._process_write_resp_if())
def init_write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
if token is not None:
@@ -257,12 +195,32 @@ class AxiMasterWrite(BusDriver):
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align
burst_list.append((awid, burst_length))
self.int_write_addr_queue.append((cur_addr, awid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
aw = self.aw_channel._transaction_obj()
aw.awid = awid
aw.awaddr = cur_addr
aw.awlen = burst_length-1
aw.awsize = size
aw.awburst = burst
aw.awlock = lock
aw.awcache = cache
aw.awprot = prot
aw.awqos = qos
aw.awregion = region
aw.awuser = user
self.aw_channel.send(aw)
self.log.info(f"Write burst start awid {awid:#x} awaddr: {cur_addr:#010x} awlen: {burst_length-1} awsize: {size}")
n += 1
self.int_write_data_queue.append((val, strb, n >= burst_length, 0))
w = self.w_channel._transaction_obj()
w.wdata = val
w.wstrb = strb
w.wlast = n >= burst_length
self.w_channel.send(w)
cur_addr += num_bytes
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
@@ -283,13 +241,20 @@ class AxiMasterWrite(BusDriver):
for bid, burst_length in burst_list:
self.int_write_resp_queue_list.setdefault(bid, deque())
self.int_write_resp_sync_list.setdefault(bid, Event())
if not self.int_write_resp_queue_list[bid]:
self.int_write_resp_sync_list[bid].clear()
await self.int_write_resp_sync_list[bid].wait()
while True:
if self.int_write_resp_queue_list[bid]:
break
burst_id, burst_resp, burst_user = self.int_write_resp_queue_list[bid].popleft()
burst_resp = AxiResp(burst_resp)
await self.b_channel.wait()
b = self.b_channel.recv()
self.int_write_resp_queue_list[int(b.bid)].append(b)
b = self.int_write_resp_queue_list[bid].popleft()
burst_id = int(b.bid)
burst_resp = AxiResp(b.bresp)
burst_user = int(b.buser)
if burst_resp != AxiResp.OKAY:
resp = burst_resp
@@ -312,115 +277,15 @@ class AxiMasterWrite(BusDriver):
self.write_resp_set.add(token)
self.in_flight_operations -= 1
async def _process_write_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
awready_sample = self.bus.awready.value
awvalid_sample = self.bus.awvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.awvalid <= 0
continue
await RisingEdge(self.clock)
if (awready_sample and awvalid_sample) or (not awvalid_sample):
if self.int_write_addr_queue:
addr, awid, length, size, burst, lock, cache, prot, qos, region, user = self.int_write_addr_queue.popleft()
self.bus.awaddr <= addr
self.bus.awid <= awid
self.bus.awlen <= length
self.bus.awsize <= size
self.bus.awburst <= burst
if hasattr(self.bus, "awlock"):
self.bus.awlock <= lock
if hasattr(self.bus, "awcache"):
self.bus.awcache <= cache
self.bus.awprot <= prot
if hasattr(self.bus, "awqos"):
self.bus.awqos <= qos
if hasattr(self.bus, "awregion"):
self.bus.awregion <= region
if hasattr(self.bus, "awuser"):
self.bus.awuser <= user
self.bus.awvalid <= 1
else:
self.bus.awvalid <= 0
async def _process_write_data_if(self):
while True:
await ReadOnly()
# read handshake signals
wready_sample = self.bus.wready.value
wvalid_sample = self.bus.wvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.wvalid <= 0
continue
await RisingEdge(self.clock)
if (wready_sample and wvalid_sample) or (not wvalid_sample):
if self.int_write_data_queue:
data, strb, last, user = self.int_write_data_queue.popleft()
self.bus.wdata <= data
self.bus.wstrb <= strb
self.bus.wlast <= last
if hasattr(self.bus, "awuser"):
self.bus.awuser <= user
self.bus.wvalid <= 1
else:
self.bus.wvalid <= 0
async def _process_write_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
bready_sample = self.bus.bready.value
bvalid_sample = self.bus.bvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.bready <= 0
continue
if bready_sample and bvalid_sample:
bid = self.bus.bid.value.integer
bresp = self.bus.bresp.value.integer
buser = self.bus.buser.value.integer if hasattr(self.bus, "buser") else None
self.int_write_resp_queue_list.setdefault(bid, deque())
self.int_write_resp_queue_list[bid].append((bid, bresp, buser))
self.int_write_resp_sync_list.setdefault(bid, Event())
self.int_write_resp_sync_list[bid].set()
await RisingEdge(self.clock)
self.bus.bready <= 1
class AxiMasterRead(BusDriver):
_signals = [
# Read address channel
"arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready",
# Read data channel
"rid", "rdata", "rresp", "rlast", "rvalid", "rready",
]
_optional_signals = [
# Read address channel
"arlock", "arcache", "arqos", "arregion", "aruser",
# Read data channel
"ruser",
]
class AxiMasterRead(object):
def __init__(self, entity, name, clock, reset=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.reset = reset
self.ar_channel = AxiARSource(entity, name, clock, reset)
self.r_channel = AxiRSink(entity, name, clock, reset)
self.active_tokens = set()
@@ -430,18 +295,16 @@ class AxiMasterRead(BusDriver):
self.read_data_sync = Event()
self.read_data_set = set()
self.id_queue = deque(range(2**len(self.bus.arid)))
self.id_queue = deque(range(2**len(self.ar_channel.bus.arid)))
self.id_sync = Event()
self.int_read_addr_queue = deque()
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue_list = {}
self.int_read_resp_sync_list = {}
self.in_flight_operations = 0
self.width = len(self.bus.rdata)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
@@ -450,47 +313,10 @@ class AxiMasterRead(BusDriver):
assert self.byte_width * self.byte_size == self.width
self.reset = reset
self.bus.arid.setimmediatevalue(0)
self.bus.araddr.setimmediatevalue(0)
assert len(self.bus.arlen) == 8
self.bus.arlen.setimmediatevalue(0)
assert len(self.bus.arsize) == 3
self.bus.arsize.setimmediatevalue(0)
assert len(self.bus.arburst) == 2
self.bus.arburst.setimmediatevalue(0)
if hasattr(self.bus, "arlock"):
assert len(self.bus.arlock) == 1
self.bus.arlock.setimmediatevalue(0)
if hasattr(self.bus, "arcache"):
assert len(self.bus.arcache) == 4
self.bus.arcache.setimmediatevalue(0)
assert len(self.bus.arprot) == 3
self.bus.arprot.setimmediatevalue(0)
if hasattr(self.bus, "arqos"):
assert len(self.bus.arqos) == 4
self.bus.arqos.setimmediatevalue(0)
if hasattr(self.bus, "arregion"):
assert len(self.bus.arregion) == 4
self.bus.arregion.setimmediatevalue(0)
if hasattr(self.bus, "aruser"):
self.bus.aruser.setimmediatevalue(0)
assert len(self.bus.arvalid) == 1
self.bus.arvalid.setimmediatevalue(0)
assert len(self.bus.arready) == 1
assert len(self.bus.rid) == len(self.bus.arid)
assert len(self.bus.rresp) == 2
assert len(self.bus.rlast) == 1
assert len(self.bus.rvalid) == 1
assert len(self.bus.rready) == 1
self.bus.rready.setimmediatevalue(0)
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
cocotb.fork(self._process_read())
cocotb.fork(self._process_read_resp())
cocotb.fork(self._process_read_addr_if())
cocotb.fork(self._process_read_resp_if())
def init_read(self, address, length, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, token=None):
if token is not None:
@@ -591,7 +417,21 @@ class AxiMasterRead(BusDriver):
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr&0xfff))+num_bytes-1)//num_bytes # 4k align
burst_list.append((arid, burst_length))
self.int_read_addr_queue.append((cur_addr, arid, burst_length-1, size, burst, lock, cache, prot, qos, region, user))
ar = self.r_channel._transaction_obj()
ar.arid = arid
ar.araddr = cur_addr
ar.arlen = burst_length-1
ar.arsize = size
ar.arburst = burst
ar.arlock = lock
ar.arcache = cache
ar.arprot = prot
ar.arqos = qos
ar.arregion = region
ar.aruser = user
self.ar_channel.send(ar)
self.log.info(f"Read burst start arid {arid:#x} araddr: {cur_addr:#010x} arlen: {burst_length-1} arsize: {size}")
@@ -600,7 +440,6 @@ class AxiMasterRead(BusDriver):
self.int_read_resp_command_queue.append((address, length, size, cycles, prot, burst_list, token))
self.int_read_resp_command_sync.set()
async def _process_read_resp(self):
while True:
if not self.int_read_resp_command_queue:
@@ -628,13 +467,22 @@ class AxiMasterRead(BusDriver):
for rid, burst_length in burst_list:
for k in range(burst_length):
self.int_read_resp_queue_list.setdefault(rid, deque())
self.int_read_resp_sync_list.setdefault(rid, Event())
if not self.int_read_resp_queue_list[rid]:
self.int_read_resp_sync_list[rid].clear()
await self.int_read_resp_sync_list[rid].wait()
while True:
if self.int_read_resp_queue_list[rid]:
break
cycle_id, cycle_data, cycle_resp, cycle_last, cycle_user = self.int_read_resp_queue_list[rid].popleft()
cycle_resp = AxiResp(cycle_resp)
await self.r_channel.wait()
r = self.r_channel.recv()
self.int_read_resp_queue_list[int(r.rid)].append(r)
r = self.int_read_resp_queue_list[rid].popleft()
cycle_id = int(r.rid)
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)
cycle_last = int(r.rlast)
cycle_user = int(r.ruser)
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
@@ -674,71 +522,6 @@ class AxiMasterRead(BusDriver):
self.read_data_set.add(token)
self.in_flight_operations -= 1
async def _process_read_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
arready_sample = self.bus.arready.value
arvalid_sample = self.bus.arvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.arvalid <= 0
continue
await RisingEdge(self.clock)
if (arready_sample and arvalid_sample) or (not arvalid_sample):
if self.int_read_addr_queue:
addr, arid, length, size, burst, lock, cache, prot, qos, region, user = self.int_read_addr_queue.popleft()
self.bus.araddr <= addr
self.bus.arid <= arid
self.bus.arlen <= length
self.bus.arsize <= size
self.bus.arburst <= burst
if hasattr(self.bus, "arlock"):
self.bus.arlock <= lock
if hasattr(self.bus, "arcache"):
self.bus.arcache <= cache
self.bus.arprot <= prot
if hasattr(self.bus, "arqos"):
self.bus.arqos <= qos
if hasattr(self.bus, "arregion"):
self.bus.arregion <= region
if hasattr(self.bus, "aruser"):
self.bus.aruser <= user
self.bus.arvalid <= 1
else:
self.bus.arvalid <= 0
async def _process_read_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
rready_sample = self.bus.rready.value
rvalid_sample = self.bus.rvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.rready <= 0
continue
if rready_sample and rvalid_sample:
rid = self.bus.rid.value.integer
rdata = self.bus.rdata.value.integer
rresp = self.bus.rresp.value.integer
rlast = self.bus.rlast.value.integer
ruser = self.bus.ruser.value.integer if hasattr(self.bus, "ruser") else None
self.int_read_resp_queue_list.setdefault(rid, deque())
self.int_read_resp_queue_list[rid].append((rid, rdata, rresp, rlast, ruser))
self.int_read_resp_sync_list.setdefault(rid, Event())
self.int_read_resp_sync_list[rid].set()
await RisingEdge(self.clock)
self.bus.rready <= 1
class AxiMaster(object):
def __init__(self, entity, name, clock, reset=None):
@@ -786,3 +569,4 @@ class AxiMaster(object):
async def write(self, address, data, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0):
return await self.write_if.write(address, data, burst, size, lock, cache, prot, qos, region, user)

View File

@@ -23,37 +23,19 @@ THE SOFTWARE.
"""
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event
from cocotb.drivers import BusDriver
from cocotb.triggers import Event
from cocotb.log import SimLog
import mmap
from collections import deque
from .constants import *
from .axi_channels import *
class AxiRamWrite(BusDriver):
_signals = [
# Write address channel
"awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready",
# Write data channel
"wdata", "wstrb", "wlast", "wvalid", "wready",
# Write response channel
"bid", "bresp", "bvalid", "bready",
]
_optional_signals = [
# Write address channel
"awlock", "awcache", "awqos", "awregion", "awuser",
# Write data channel
"wuser",
# Write response channel
"buser",
]
class AxiRamWrite(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
if type(mem) is mmap.mmap:
self.mem = mem
@@ -61,60 +43,25 @@ class AxiRamWrite(BusDriver):
self.mem = mmap.mmap(-1, size)
self.size = len(self.mem)
self.int_write_addr_queue = deque()
self.int_write_addr_sync = Event()
self.int_write_data_queue = deque()
self.int_write_data_sync = Event()
self.int_write_resp_queue = deque()
self.int_write_resp_sync = Event()
self.reset = reset
self.aw_channel = AxiAWSink(entity, name, clock, reset)
self.w_channel = AxiWSink(entity, name, clock, reset)
self.b_channel = AxiBSource(entity, name, clock, reset)
self.in_flight_operations = 0
self.width = len(self.bus.wdata)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**len(self.bus.wstrb)-1
self.strb_mask = 2**self.byte_width-1
assert self.byte_width == len(self.bus.wstrb)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.reset = reset
assert len(self.bus.awlen) == 8
assert len(self.bus.awsize) == 3
assert len(self.bus.awburst) == 2
if hasattr(self.bus, "awlock"):
assert len(self.bus.awlock) == 1
if hasattr(self.bus, "awcache"):
assert len(self.bus.awcache) == 4
assert len(self.bus.awprot) == 3
if hasattr(self.bus, "awqos"):
assert len(self.bus.awqos) == 4
if hasattr(self.bus, "awregion"):
assert len(self.bus.awregion) == 4
assert len(self.bus.awvalid) == 1
assert len(self.bus.awready) == 1
self.bus.awready.setimmediatevalue(0)
assert len(self.bus.wlast) == 1
assert len(self.bus.wvalid) == 1
assert len(self.bus.wready) == 1
self.bus.wready.setimmediatevalue(0)
assert len(self.bus.bid) == len(self.bus.awid)
self.bus.bid.setimmediatevalue(0)
assert len(self.bus.bresp) == 2
self.bus.bresp.setimmediatevalue(0)
assert len(self.bus.bvalid) == 1
if hasattr(self.bus, "buser"):
self.bus.buser.setimmediatevalue(0)
self.bus.bvalid.setimmediatevalue(0)
assert len(self.bus.bready) == 1
assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid)
cocotb.fork(self._process_write())
cocotb.fork(self._process_write_addr_if())
cocotb.fork(self._process_write_data_if())
cocotb.fork(self._process_write_resp_if())
def read_mem(self, address, length):
self.mem.seek(address)
@@ -126,12 +73,15 @@ class AxiRamWrite(BusDriver):
async def _process_write(self):
while True:
if not self.int_write_addr_queue:
self.int_write_addr_sync.clear()
await self.int_write_addr_sync.wait()
await self.aw_channel.wait()
aw = self.aw_channel.recv()
awid, addr, length, size, burst, prot = self.int_write_addr_queue.popleft()
prot = AxiProt(prot)
awid = int(aw.awid)
addr = int(aw.awaddr)
length = int(aw.awlen)
size = int(aw.awsize)
burst = int(aw.awburst)
prot = AxiProt(int(aw.awprot))
self.log.info(f"Write burst awid: {awid:#x} awaddr: {addr:#010x} awlen: {length} awsize: {size} awprot: {prot}")
@@ -156,11 +106,12 @@ class AxiRamWrite(BusDriver):
for n in range(length):
cur_word_addr = (cur_addr // self.byte_width) * self.byte_width
if not self.int_write_data_queue:
self.int_write_data_sync.clear()
await self.int_write_data_sync.wait()
await self.w_channel.wait()
w = self.w_channel.recv()
data, strb, last = self.int_write_data_queue.popleft()
data = int(w.wdata)
strb = int(w.wstrb)
last = int(w.wlast)
# todo latency
@@ -185,101 +136,16 @@ class AxiRamWrite(BusDriver):
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
self.int_write_resp_queue.append((awid, AxiResp.OKAY))
self.int_write_resp_sync.set()
b = self.b_channel._transaction_obj()
b.bid = awid
b.bresp = AxiResp.OKAY
async def _process_write_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
awready_sample = self.bus.awready.value
awvalid_sample = self.bus.awvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.awready <= 0
continue
if awready_sample and awvalid_sample:
awid = self.bus.awid.value.integer
awaddr = self.bus.awaddr.value.integer
awlen = self.bus.awlen.value.integer
awsize = self.bus.awsize.value.integer
awburst = self.bus.awburst.value.integer
awprot = self.bus.awprot.value.integer
self.int_write_addr_queue.append((awid, awaddr, awlen, awsize, awburst, awprot))
self.int_write_addr_sync.set()
await RisingEdge(self.clock)
self.bus.awready <= 1
async def _process_write_data_if(self):
while True:
await ReadOnly()
# read handshake signals
wready_sample = self.bus.wready.value
wvalid_sample = self.bus.wvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.wready <= 0
continue
if wready_sample and wvalid_sample:
wdata = self.bus.wdata.value.integer
wstrb = self.bus.wstrb.value.integer
wlast = self.bus.wlast.value.integer
self.int_write_data_queue.append((wdata, wstrb, wlast))
self.int_write_data_sync.set()
await RisingEdge(self.clock)
self.bus.wready <= 1
async def _process_write_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
bready_sample = self.bus.bready.value
bvalid_sample = self.bus.bvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.bvalid <= 0
continue
await RisingEdge(self.clock)
if (bready_sample and bvalid_sample) or (not bvalid_sample):
if self.int_write_resp_queue:
bid, bresp = self.int_write_resp_queue.popleft()
self.bus.bid <= bid
self.bus.bresp <= bresp
self.bus.bvalid <= 1
else:
self.bus.bvalid <= 0
self.b_channel.send(b)
class AxiRamRead(BusDriver):
_signals = [
# Read address channel
"arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready",
# Read data channel
"rid", "rdata", "rresp", "rlast", "rvalid", "rready",
]
_optional_signals = [
# Read address channel
"arlock", "arcache", "arqos", "arregion", "aruser",
# Read data channel
"ruser",
]
class AxiRamRead(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
if type(mem) is mmap.mmap:
self.mem = mem
@@ -287,55 +153,25 @@ class AxiRamRead(BusDriver):
self.mem = mmap.mmap(-1, size)
self.size = len(self.mem)
self.int_read_addr_queue = deque()
self.int_read_addr_sync = Event()
self.reset = reset
self.ar_channel = AxiARSink(entity, name, clock, reset)
self.r_channel = AxiRSource(entity, name, clock, reset)
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue = deque()
self.int_read_resp_sync = Event()
self.in_flight_operations = 0
self.width = len(self.bus.rdata)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
assert self.byte_width * self.byte_size == self.width
self.reset = reset
assert len(self.bus.arlen) == 8
assert len(self.bus.arsize) == 3
assert len(self.bus.arburst) == 2
if hasattr(self.bus, "arlock"):
assert len(self.bus.arlock) == 1
if hasattr(self.bus, "arcache"):
assert len(self.bus.arcache) == 4
assert len(self.bus.arprot) == 3
if hasattr(self.bus, "arqos"):
assert len(self.bus.arqos) == 4
if hasattr(self.bus, "arregion"):
assert len(self.bus.arregion) == 4
assert len(self.bus.arvalid) == 1
assert len(self.bus.arready) == 1
self.bus.arready.setimmediatevalue(0)
assert len(self.bus.rid) == len(self.bus.arid)
self.bus.rid.setimmediatevalue(0)
self.bus.rdata.setimmediatevalue(0)
assert len(self.bus.rresp) == 2
self.bus.rresp.setimmediatevalue(0)
assert len(self.bus.rlast) == 1
self.bus.rlast.setimmediatevalue(0)
if hasattr(self.bus, "ruser"):
self.bus.ruser.setimmediatevalue(0)
assert len(self.bus.rvalid) == 1
self.bus.rvalid.setimmediatevalue(0)
assert len(self.bus.rready) == 1
assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid)
cocotb.fork(self._process_read())
cocotb.fork(self._process_read_addr_if())
cocotb.fork(self._process_read_resp_if())
def read_mem(self, address, length):
self.mem.seek(address)
@@ -347,12 +183,15 @@ class AxiRamRead(BusDriver):
async def _process_read(self):
while True:
if not self.int_read_addr_queue:
self.int_read_addr_sync.clear()
await self.int_read_addr_sync.wait()
await self.ar_channel.wait()
ar = self.ar_channel.recv()
arid, addr, length, size, burst, prot = self.int_read_addr_queue.popleft()
prot = AxiProt(prot)
arid = int(ar.arid)
addr = int(ar.araddr)
length = int(ar.arlen)
size = int(ar.arsize)
burst = int(ar.arburst)
prot = AxiProt(ar.arprot)
self.log.info(f"Read burst arid: {arid:#x} araddr: {addr:#010x} arlen: {length} arsize: {size} arprot: {prot}")
@@ -381,8 +220,13 @@ class AxiRamRead(BusDriver):
data = self.mem.read(self.byte_width)
self.int_read_resp_queue.append((arid, int.from_bytes(data, 'little'), AxiResp.OKAY, n == length-1))
self.int_read_resp_sync.set()
r = self.r_channel._transaction_obj()
r.rid = arid
r.rdata = int.from_bytes(data, 'little')
r.rlast = n == length-1
r.rresp = AxiResp.OKAY
self.r_channel.send(r)
self.log.info(f"Read word arid: {arid:#x} addr: {cur_addr:#010x} data: {' '.join((f'{c:02x}' for c in data))}")
@@ -393,58 +237,6 @@ class AxiRamRead(BusDriver):
if cur_addr == upper_wrap_boundary:
cur_addr = lower_wrap_boundary
async def _process_read_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
arready_sample = self.bus.arready.value
arvalid_sample = self.bus.arvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.arready <= 0
continue
if arready_sample and arvalid_sample:
arid = self.bus.arid.value.integer
araddr = self.bus.araddr.value.integer
arlen = self.bus.arlen.value.integer
arsize = self.bus.arsize.value.integer
arburst = self.bus.arburst.value.integer
arprot = self.bus.arprot.value.integer
self.int_read_addr_queue.append((arid, araddr, arlen, arsize, arburst, arprot))
self.int_read_addr_sync.set()
await RisingEdge(self.clock)
self.bus.arready <= 1
async def _process_read_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
rready_sample = self.bus.rready.value
rvalid_sample = self.bus.rvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.rvalid <= 0
continue
await RisingEdge(self.clock)
if (rready_sample and rvalid_sample) or (not rvalid_sample):
if self.int_read_resp_queue:
rid, rdata, rresp, rlast = self.int_read_resp_queue.popleft()
self.bus.rid <= rid
self.bus.rdata <= rdata
self.bus.rresp <= rresp
self.bus.rlast <= rlast
self.bus.rvalid <= 1
else:
self.bus.rvalid <= 0
class AxiRam(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):

View File

@@ -23,27 +23,24 @@ THE SOFTWARE.
"""
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event
from cocotb.drivers import BusDriver
from cocotb.triggers import RisingEdge, Event
from cocotb.log import SimLog
from collections import deque
from .constants import *
from .axil_channels import *
class AxiLiteMasterWrite(BusDriver):
_signals = [
# Write address channel
"awaddr", "awprot", "awvalid", "awready",
# Write data channel
"wdata", "wstrb", "wvalid", "wready",
# Write response channel
"bresp", "bvalid", "bready",
]
class AxiLiteMasterWrite(object):
def __init__(self, entity, name, clock, reset=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.reset = reset
self.aw_channel = AxiLiteAWSource(entity, name, clock, reset)
self.w_channel = AxiLiteWSource(entity, name, clock, reset)
self.b_channel = AxiLiteBSink(entity, name, clock, reset)
self.active_tokens = set()
@@ -51,47 +48,20 @@ class AxiLiteMasterWrite(BusDriver):
self.write_resp_sync = Event()
self.write_resp_set = set()
self.int_write_addr_queue = deque()
self.int_write_data_queue = deque()
self.int_write_resp_command_queue = deque()
self.int_write_resp_command_sync = Event()
self.int_write_resp_queue = deque()
self.int_write_resp_sync = Event()
self.in_flight_operations = 0
self.width = len(self.bus.wdata)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**len(self.bus.wstrb)-1
self.strb_mask = 2**self.byte_width-1
assert self.byte_width == len(self.bus.wstrb)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.reset = reset
self.bus.awaddr.setimmediatevalue(0)
assert len(self.bus.awprot) == 3
self.bus.awprot.setimmediatevalue(0)
assert len(self.bus.awvalid) == 1
self.bus.awvalid.setimmediatevalue(0)
assert len(self.bus.awready) == 1
self.bus.wdata.setimmediatevalue(0)
self.bus.wstrb.setimmediatevalue(0)
assert len(self.bus.wvalid) == 1
self.bus.wvalid.setimmediatevalue(0)
assert len(self.bus.wready) == 1
assert len(self.bus.bresp) == 2
assert len(self.bus.bvalid) == 1
assert len(self.bus.bready) == 1
self.bus.bready.setimmediatevalue(0)
cocotb.fork(self._process_write_resp())
cocotb.fork(self._process_write_addr_if())
cocotb.fork(self._process_write_data_if())
cocotb.fork(self._process_write_resp_if())
def init_write(self, address, data, prot=AxiProt.NONSECURE, token=None):
if token is not None:
@@ -135,8 +105,16 @@ class AxiLiteMasterWrite(BusDriver):
val |= bytearray(data)[offset] << j*8
offset += 1
self.int_write_addr_queue.append((word_addr + start + k*self.byte_width, prot))
self.int_write_data_queue.append((val, strb))
aw = self.aw_channel._transaction_obj()
aw.awaddr = word_addr + k*self.byte_width
aw.awprot = prot
w = self.w_channel._transaction_obj()
w.wdata = val
w.wstrb = strb
self.aw_channel.send(aw)
self.w_channel.send(w)
def idle(self):
return not self.in_flight_operations
@@ -193,12 +171,10 @@ class AxiLiteMasterWrite(BusDriver):
resp = AxiResp.OKAY
for k in range(cycles):
if not self.int_write_resp_queue:
self.int_write_resp_sync.clear()
await self.int_write_resp_sync.wait()
await self.b_channel.wait()
b = self.b_channel.recv()
cycle_resp = self.int_write_resp_queue.popleft()
cycle_resp = AxiResp(cycle_resp)
cycle_resp = AxiResp(b.bresp)
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
@@ -211,87 +187,15 @@ class AxiLiteMasterWrite(BusDriver):
self.write_resp_set.add(token)
self.in_flight_operations -= 1
async def _process_write_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
awready_sample = self.bus.awready.value
awvalid_sample = self.bus.awvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.awvalid <= 0
continue
await RisingEdge(self.clock)
if (awready_sample and awvalid_sample) or (not awvalid_sample):
if self.int_write_addr_queue:
addr, prot = self.int_write_addr_queue.popleft()
self.bus.awaddr <= addr
self.bus.awprot <= prot
self.bus.awvalid <= 1
else:
self.bus.awvalid <= 0
async def _process_write_data_if(self):
while True:
await ReadOnly()
# read handshake signals
wready_sample = self.bus.wready.value
wvalid_sample = self.bus.wvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.wvalid <= 0
continue
await RisingEdge(self.clock)
if (wready_sample and wvalid_sample) or (not wvalid_sample):
if self.int_write_data_queue:
data, strb = self.int_write_data_queue.popleft()
self.bus.wdata <= data
self.bus.wstrb <= strb
self.bus.wvalid <= 1
else:
self.bus.wvalid <= 0
async def _process_write_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
bready_sample = self.bus.bready.value
bvalid_sample = self.bus.bvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.bready <= 0
continue
if bready_sample and bvalid_sample:
bresp = self.bus.bresp.value.integer
self.int_write_resp_queue.append(bresp)
self.int_write_resp_sync.set()
await RisingEdge(self.clock)
self.bus.bready <= 1
class AxiLiteMasterRead(BusDriver):
_signals = [
# Read address channel
"araddr", "arprot", "arvalid", "arready",
# Read data channel
"rdata", "rresp", "rvalid", "rready",
]
class AxiLiteMasterRead(object):
def __init__(self, entity, name, clock, reset=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.reset = reset
self.ar_channel = AxiLiteARSource(entity, name, clock, reset)
self.r_channel = AxiLiteRSink(entity, name, clock, reset)
self.active_tokens = set()
@@ -299,37 +203,18 @@ class AxiLiteMasterRead(BusDriver):
self.read_data_sync = Event()
self.read_data_set = set()
self.int_read_addr_queue = deque()
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue = deque()
self.int_read_resp_sync = Event()
self.in_flight_operations = 0
self.width = len(self.bus.rdata)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
assert self.byte_width * self.byte_size == self.width
self.reset = reset
self.bus.araddr.setimmediatevalue(0)
assert len(self.bus.arprot) == 3
self.bus.arprot.setimmediatevalue(0)
assert len(self.bus.arvalid) == 1
self.bus.arvalid.setimmediatevalue(0)
assert len(self.bus.arready) == 1
assert len(self.bus.rresp) == 2
assert len(self.bus.rvalid) == 1
assert len(self.bus.rready) == 1
self.bus.rready.setimmediatevalue(0)
cocotb.fork(self._process_read_resp())
cocotb.fork(self._process_read_addr_if())
cocotb.fork(self._process_read_resp_if())
def init_read(self, address, length, prot=AxiProt.NONSECURE, token=None):
if token is not None:
@@ -349,7 +234,11 @@ class AxiLiteMasterRead(BusDriver):
self.log.info(f"Read start addr: {address:#010x} prot: {prot} length: {length}")
for k in range(cycles):
self.int_read_addr_queue.append((word_addr + k*self.byte_width, prot))
ar = self.ar_channel._transaction_obj()
ar.araddr = word_addr + k*self.byte_width
ar.arprot = prot
self.ar_channel.send(ar)
def idle(self):
return not self.in_flight_operations
@@ -413,12 +302,11 @@ class AxiLiteMasterRead(BusDriver):
resp = AxiResp.OKAY
for k in range(cycles):
if not self.int_read_resp_queue:
self.int_read_resp_sync.clear()
await self.int_read_resp_sync.wait()
await self.r_channel.wait()
r = self.r_channel.recv()
cycle_data, cycle_resp = self.int_read_resp_queue.popleft()
cycle_resp = AxiResp(cycle_resp)
cycle_data = int(r.rdata)
cycle_resp = AxiResp(r.rresp)
if cycle_resp != AxiResp.OKAY:
resp = cycle_resp
@@ -442,52 +330,6 @@ class AxiLiteMasterRead(BusDriver):
self.read_data_set.add(token)
self.in_flight_operations -= 1
async def _process_read_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
arready_sample = self.bus.arready.value
arvalid_sample = self.bus.arvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.arvalid <= 0
continue
await RisingEdge(self.clock)
if (arready_sample and arvalid_sample) or (not arvalid_sample):
if self.int_read_addr_queue:
addr, prot = self.int_read_addr_queue.popleft()
self.bus.araddr <= addr
self.bus.arprot <= prot
self.bus.arvalid <= 1
else:
self.bus.arvalid <= 0
async def _process_read_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
rready_sample = self.bus.rready.value
rvalid_sample = self.bus.rvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.rready <= 0
continue
if rready_sample and rvalid_sample:
rdata = self.bus.rdata.value.integer
rresp = self.bus.rresp.value.integer
self.int_read_resp_queue.append((rdata, rresp))
self.int_read_resp_sync.set()
await RisingEdge(self.clock)
self.bus.rready <= 1
class AxiLiteMaster(object):
def __init__(self, entity, name, clock, reset=None):

View File

@@ -23,29 +23,20 @@ THE SOFTWARE.
"""
import cocotb
from cocotb.triggers import RisingEdge, ReadOnly, Event
from cocotb.drivers import BusDriver
from cocotb.triggers import Event
from cocotb.log import SimLog
import mmap
import queue
from collections import deque
from .constants import *
from .axil_channels import *
class AxiLiteRamWrite(BusDriver):
_signals = [
# Write address channel
"awaddr", "awprot", "awvalid", "awready",
# Write data channel
"wdata", "wstrb", "wvalid", "wready",
# Write response channel
"bresp", "bvalid", "bready",
]
class AxiLiteRamWrite(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
if type(mem) is mmap.mmap:
self.mem = mem
@@ -53,44 +44,23 @@ class AxiLiteRamWrite(BusDriver):
self.mem = mmap.mmap(-1, size)
self.size = len(self.mem)
self.int_write_addr_queue = deque()
self.int_write_addr_sync = Event()
self.int_write_data_queue = deque()
self.int_write_data_sync = Event()
self.int_write_resp_queue = deque()
self.int_write_resp_sync = Event()
self.reset = reset
self.aw_channel = AxiLiteAWSink(entity, name, clock, reset)
self.w_channel = AxiLiteWSink(entity, name, clock, reset)
self.b_channel = AxiLiteBSource(entity, name, clock, reset)
self.in_flight_operations = 0
self.width = len(self.bus.wdata)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
self.strb_mask = 2**len(self.bus.wstrb)-1
self.strb_mask = 2**self.byte_width-1
assert self.byte_width == len(self.bus.wstrb)
assert self.byte_width == len(self.w_channel.bus.wstrb)
assert self.byte_width * self.byte_size == self.width
self.reset = reset
assert len(self.bus.awprot) == 3
assert len(self.bus.awvalid) == 1
assert len(self.bus.awready) == 1
self.bus.awready.setimmediatevalue(0)
assert len(self.bus.wvalid) == 1
assert len(self.bus.wready) == 1
self.bus.wready.setimmediatevalue(0)
assert len(self.bus.bresp) == 2
self.bus.bresp.setimmediatevalue(0)
assert len(self.bus.bvalid) == 1
self.bus.bvalid.setimmediatevalue(0)
assert len(self.bus.bready) == 1
cocotb.fork(self._process_write())
cocotb.fork(self._process_write_addr_if())
cocotb.fork(self._process_write_data_if())
cocotb.fork(self._process_write_resp_if())
def read_mem(self, address, length):
self.mem.seek(address)
@@ -102,19 +72,17 @@ class AxiLiteRamWrite(BusDriver):
async def _process_write(self):
while True:
if not self.int_write_addr_queue:
self.int_write_addr_sync.clear()
await self.int_write_addr_sync.wait()
await self.aw_channel.wait()
aw = self.aw_channel.recv()
addr, prot = self.int_write_addr_queue.popleft()
addr = (addr // self.byte_width) * self.byte_width
prot = AxiProt(prot)
addr = (int(aw.awaddr) // self.byte_width) * self.byte_width
prot = AxiProt(aw.awprot)
if not self.int_write_data_queue:
self.int_write_data_sync.clear()
await self.int_write_data_sync.wait()
await self.w_channel.wait()
w = self.w_channel.recv()
data, strb = self.int_write_data_queue.popleft()
data = int(w.wdata)
strb = int(w.wstrb)
# todo latency
@@ -130,88 +98,20 @@ class AxiLiteRamWrite(BusDriver):
else:
self.mem.seek(1, 1)
self.int_write_resp_queue.append(AxiResp.OKAY)
self.int_write_resp_sync.set()
b = self.b_channel._transaction_obj()
b.bresp = AxiResp.OKAY
async def _process_write_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
awready_sample = self.bus.awready.value
awvalid_sample = self.bus.awvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.awready <= 0
continue
if awready_sample and awvalid_sample:
awaddr = self.bus.awaddr.value.integer
awprot = self.bus.awprot.value.integer
self.int_write_addr_queue.append((awaddr, awprot))
self.int_write_addr_sync.set()
await RisingEdge(self.clock)
self.bus.awready <= 1
async def _process_write_data_if(self):
while True:
await ReadOnly()
# read handshake signals
wready_sample = self.bus.wready.value
wvalid_sample = self.bus.wvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.wready <= 0
continue
if wready_sample and wvalid_sample:
wdata = self.bus.wdata.value.integer
wstrb = self.bus.wstrb.value.integer
self.int_write_data_queue.append((wdata, wstrb))
self.int_write_data_sync.set()
await RisingEdge(self.clock)
self.bus.wready <= 1
async def _process_write_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
bready_sample = self.bus.bready.value
bvalid_sample = self.bus.bvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.bvalid <= 0
continue
await RisingEdge(self.clock)
if (bready_sample and bvalid_sample) or (not bvalid_sample):
if self.int_write_resp_queue:
bresp = self.int_write_resp_queue.popleft()
self.bus.bresp <= bresp
self.bus.bvalid <= 1
else:
self.bus.bvalid <= 0
self.b_channel.send(b)
class AxiLiteRamRead(BusDriver):
_signals = [
# Read address channel
"araddr", "arprot", "arvalid", "arready",
# Read data channel
"rdata", "rresp", "rvalid", "rready",
]
class AxiLiteRamRead(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
super().__init__(entity, name, clock)
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.reset = reset
self.ar_channel = AxiLiteARSink(entity, name, clock, reset)
self.r_channel = AxiLiteRSource(entity, name, clock, reset)
if type(mem) is mmap.mmap:
self.mem = mem
@@ -219,38 +119,18 @@ class AxiLiteRamRead(BusDriver):
self.mem = mmap.mmap(-1, size)
self.size = len(self.mem)
self.int_read_addr_queue = deque()
self.int_read_addr_sync = Event()
self.int_read_resp_command_queue = deque()
self.int_read_resp_command_sync = Event()
self.int_read_resp_queue = deque()
self.int_read_resp_sync = Event()
self.in_flight_operations = 0
self.width = len(self.bus.rdata)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
self.byte_width = self.width // self.byte_size
assert self.byte_width * self.byte_size == self.width
self.reset = reset
assert len(self.bus.arprot) == 3
assert len(self.bus.arvalid) == 1
assert len(self.bus.arready) == 1
self.bus.arready.setimmediatevalue(0)
self.bus.rdata.setimmediatevalue(0)
assert len(self.bus.rresp) == 2
self.bus.rresp.setimmediatevalue(0)
assert len(self.bus.rvalid) == 1
self.bus.rvalid.setimmediatevalue(0)
assert len(self.bus.rready) == 1
cocotb.fork(self._process_read())
cocotb.fork(self._process_read_addr_if())
cocotb.fork(self._process_read_resp_if())
def read_mem(self, address, length):
self.mem.seek(address)
@@ -262,13 +142,11 @@ class AxiLiteRamRead(BusDriver):
async def _process_read(self):
while True:
if not self.int_read_addr_queue:
self.int_read_addr_sync.clear()
await self.int_read_addr_sync.wait()
await self.ar_channel.wait()
ar = self.ar_channel.recv()
addr, prot = self.int_read_addr_queue.popleft()
addr = (addr // self.byte_width) * self.byte_width
prot = AxiProt(prot)
addr = (int(ar.araddr) // self.byte_width) * self.byte_width
prot = AxiProt(ar.arprot)
# todo latency
@@ -276,57 +154,14 @@ class AxiLiteRamRead(BusDriver):
data = self.mem.read(self.byte_width)
self.int_read_resp_queue.append((int.from_bytes(data, 'little'), AxiResp.OKAY))
self.int_read_resp_sync.set()
r = self.r_channel._transaction_obj()
r.rdata = int.from_bytes(data, 'little')
r.rresp = AxiResp.OKAY
self.r_channel.send(r)
self.log.info(f"Read data addr: {addr:#010x} prot: {prot} data: {' '.join((f'{c:02x}' for c in data))}")
async def _process_read_addr_if(self):
while True:
await ReadOnly()
# read handshake signals
arready_sample = self.bus.arready.value
arvalid_sample = self.bus.arvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.arready <= 0
continue
if arready_sample and arvalid_sample:
araddr = self.bus.araddr.value.integer
arprot = self.bus.arprot.value.integer
self.int_read_addr_queue.append((araddr, arprot))
self.int_read_addr_sync.set()
await RisingEdge(self.clock)
self.bus.arready <= 1
async def _process_read_resp_if(self):
while True:
await ReadOnly()
# read handshake signals
rready_sample = self.bus.rready.value
rvalid_sample = self.bus.rvalid.value
if self.reset is not None and self.reset.value:
await RisingEdge(self.clock)
self.bus.rvalid <= 0
continue
await RisingEdge(self.clock)
if (rready_sample and rvalid_sample) or (not rvalid_sample):
if self.int_read_resp_queue:
rdata, rresp = self.int_read_resp_queue.popleft()
self.bus.rdata <= rdata
self.bus.rresp <= rresp
self.bus.rvalid <= 1
else:
self.bus.rvalid <= 0
class AxiLiteRam(object):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):