Files
cocotbext-axi/cocotbext/axi/apb.py
Byron Lathi 80b4095d73
Some checks failed
build / Build distributions (push) Successful in 7s
build / deploy (push) Has been skipped
build / Build distributions (release) Successful in 7s
build / deploy (release) Successful in 7s
Regression Tests / Python 3.10 (push) Has been cancelled
Regression Tests / Python 3.11 (push) Has been cancelled
Regression Tests / Python 3.12 (push) Has been cancelled
Regression Tests / Python 3.13 (push) Has been cancelled
Regression Tests / Python 3.8 (push) Has been cancelled
Regression Tests / Python 3.9 (push) Has been cancelled
Change kill to cancel
2026-02-08 13:53:59 -08:00

624 lines
21 KiB
Python

"""
Copyright (c) 2025 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import logging
from typing import NamedTuple
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge, Event
from cocotb_bus.bus import Bus
from .version import __version__
from .constants import AxiResp, AxiProt
from .address_space import Region
from .reset import Reset
from .memory import Memory
# APB master write helper objects
class ApbWriteCmd(NamedTuple):
address: int
data: bytes
prot: AxiProt
event: Event
class ApbWriteResp(NamedTuple):
address: int
length: int
resp: AxiResp
# APB master read helper objects
class ApbReadCmd(NamedTuple):
address: int
length: int
prot: AxiProt
event: Event
class ApbReadResp(NamedTuple):
address: int
data: bytes
resp: AxiResp
def __bytes__(self):
return self.data
class ApbBus(Bus):
_signals = ["paddr", "psel", "penable", "pwrite", "pwdata", "pstrb", "pready", "prdata"]
_optional_signals = ["pprot", "pslverr"]
def __init__(self, entity=None, prefix=None, **kwargs):
super().__init__(entity, prefix, self._signals, optional_signals=self._optional_signals, **kwargs)
@classmethod
def from_entity(cls, entity, **kwargs):
return cls(entity, **kwargs)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
return cls(entity, prefix, **kwargs)
class ApbPause:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._pause = False
self._pause_generator = None
self._pause_cr = None
def _pause_update(self, val):
pass
@property
def pause(self):
return self._pause
@pause.setter
def pause(self, val):
if self._pause != val:
self._pause_update(val)
self._pause = val
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.cancel()
self._pause_cr = None
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.start_soon(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
async def _run_pause(self):
clock_edge_event = RisingEdge(self.clock)
for val in self._pause_generator:
self.pause = val
await clock_edge_event
class ApbMaster(ApbPause, Region, Reset):
def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
if bus._name:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
else:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
self.log.info("APB master")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2025 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.command_queue = Queue()
self.command_queue.queue_occupancy_limit = 2
self.current_command = None
self.read_resp = None
self.write_resp = None
self.in_flight_operations = 0
self._idle = Event()
self._idle.set()
self.address_width = len(self.bus.paddr)
self.width = len(self.bus.pwdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.pprot_present = hasattr(self.bus, "pprot")
self.pstrb_present = hasattr(self.bus, "pstrb")
self.pslverr_present = hasattr(self.bus, "pslverr")
super().__init__(2**self.address_width, **kwargs)
self.log.info("APB master configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("APB master signals:")
for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
if self.pstrb_present:
assert self.byte_lanes == len(self.bus.pstrb)
assert self.byte_lanes * self.byte_size == self.width
self.bus.paddr.setimmediatevalue(0)
if self.pprot_present:
self.bus.pprot.setimmediatevalue(0)
self.bus.psel.setimmediatevalue(False)
self.bus.penable.setimmediatevalue(False)
self.bus.pwrite.setimmediatevalue(False)
self.bus.pwdata.setimmediatevalue(0)
if self.pstrb_present:
self.bus.pstrb.setimmediatevalue(0)
self._run_cr = None
self._init_reset(reset, reset_active_level)
def init_write(self, address, data, prot=AxiProt.NONSECURE, event=None):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if address+len(data) > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
if not self.pprot_present and prot != AxiProt.NONSECURE:
raise ValueError("pprot sideband signal value specified, but signal is not connected")
data = bytes(data)
cocotb.start_soon(self._write_wrapper(address, bytes(data), prot, event))
return event
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
if event is None:
event = Event()
if not isinstance(event, Event):
raise ValueError("Expected event object")
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if length < 0:
raise ValueError("Read length must be positive")
if address+length > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
if not self.pprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
cocotb.start_soon(self._read_wrapper(address, length, prot, event))
return event
def idle(self):
return not self.in_flight_operations
async def wait(self):
while not self.idle():
await self._idle.wait()
async def write(self, address, data, prot=AxiProt.NONSECURE):
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if isinstance(data, int):
raise ValueError("Expected bytes or bytearray for data")
if address+len(data) > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
if not self.pprot_present and prot != AxiProt.NONSECURE:
raise ValueError("pprot sideband signal value specified, but signal is not connected")
event = Event()
data = bytes(data)
self.in_flight_operations += 1
self._idle.clear()
await self.command_queue.put(ApbWriteCmd(address, data, prot, event))
await event.wait()
return self.write_resp
async def _write_wrapper(self, address, data, prot, event):
event.set(await self.write(address, data, prot))
async def read(self, address, length, prot=AxiProt.NONSECURE):
if address < 0 or address >= 2**self.address_width:
raise ValueError("Address out of range")
if length < 0:
raise ValueError("Read length must be positive")
if address+length > 2**self.address_width:
raise ValueError("Requested transfer overruns end of address space")
if not self.pprot_present and prot != AxiProt.NONSECURE:
raise ValueError("arprot sideband signal value specified, but signal is not connected")
event = Event()
self.in_flight_operations += 1
self._idle.clear()
await self.command_queue.put(ApbReadCmd(address, length, prot, event))
await event.wait()
return self.read_resp
async def _read_wrapper(self, address, length, prot, event):
event.set(await self.read(address, length, prot))
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
self.bus.psel.value = False
self.bus.penable.value = False
if self._run_cr is not None:
self._run_cr.cancel()
self._run_cr = None
def flush_cmd(cmd):
self.log.warning("Flushed write operation during reset: %s", cmd)
if cmd.event:
cmd.event.set(None)
while not self.command_queue.empty():
cmd = self.command_queue.get_nowait()
flush_cmd(cmd)
if self.current_command:
cmd = self.current_command
self.current_command = None
flush_cmd(cmd)
self.in_flight_operations = 0
self._idle.set()
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.start_soon(self._run())
async def _run(self):
clock_edge_event = RisingEdge(self.clock)
while True:
cmd = await self.command_queue.get()
self.current_command = cmd
length = 0
pwrite = False
if isinstance(cmd, ApbWriteCmd):
length = len(cmd.data)
pwrite = True
else:
length = cmd.length
pwrite = False
word_addr = (cmd.address // self.byte_lanes) * self.byte_lanes
start_offset = cmd.address % self.byte_lanes
end_offset = ((cmd.address + length - 1) % self.byte_lanes) + 1
strb_start = (self.strb_mask << start_offset) & self.strb_mask
strb_end = self.strb_mask >> (self.byte_lanes - end_offset)
cycles = (length + (cmd.address % self.byte_lanes) + self.byte_lanes-1) // self.byte_lanes
offset = 0
read_data = bytearray()
resp = AxiResp.OKAY
if self.log.isEnabledFor(logging.INFO):
if pwrite:
self.log.info("Write start addr: 0x%08x prot: %s data: %s",
cmd.address, cmd.prot, ' '.join((f'{c:02x}' for c in cmd.data)))
else:
self.log.info("Read start addr: 0x%08x prot: %s length: %d",
cmd.address, cmd.prot, cmd.length)
await clock_edge_event
self.bus.psel.value = True
for k in range(cycles):
start = 0
stop = self.byte_lanes
strb = self.strb_mask
if k == 0:
start = start_offset
strb &= strb_start
if k == cycles-1:
stop = end_offset
strb &= strb_end
val = 0
if pwrite:
for j in range(start, stop):
val |= cmd.data[offset] << j*8
offset += 1
if not self.pstrb_present and strb != self.strb_mask:
self.log.warning("Partial operation requested with pstrb not connected, write will be zero-padded (0x%x != 0x%x)", strb, self.strb_mask)
else:
strb = 0
while self.pause:
await clock_edge_event
if k == 0:
self.bus.paddr.value = cmd.address
else:
self.bus.paddr.value = word_addr + k*self.byte_lanes
self.bus.pprot.value = cmd.prot
self.bus.penable.value = True
self.bus.pwrite.value = pwrite
self.bus.pwdata.value = val
self.bus.pstrb.value = strb
await clock_edge_event
while not int(self.bus.pready.value):
await clock_edge_event
self.bus.penable.value = False
cycle_data = int(self.bus.prdata.value)
if self.pslverr_present and int(self.bus.pslverr.value):
resp = AxiResp.SLVERR
start = 0
stop = self.byte_lanes
if k == 0:
start = start_offset
if k == cycles-1:
stop = end_offset
for j in range(start, stop):
read_data.append((cycle_data >> j*8) & 0xff)
self.bus.psel.value = False
if pwrite:
self.log.info("Write complete addr: 0x%08x prot: %s resp: %s length: %d",
cmd.address, cmd.prot, resp, length)
self.write_resp = ApbWriteResp(cmd.address, length, resp)
cmd.event.set()
else:
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read complete addr: 0x%08x prot: %s resp: %s data: %s",
cmd.address, cmd.prot, resp, ' '.join((f'{c:02x}' for c in read_data)))
self.read_resp = ApbReadResp(cmd.address, bytes(read_data), resp)
cmd.event.set()
self.current_write_command = None
self.in_flight_operations -= 1
if self.in_flight_operations == 0:
self._idle.set()
class ApbSlave(ApbPause, Reset):
def __init__(self, bus, clock, reset=None, target=None, reset_active_level=True, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.target = target
if bus._name:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
else:
self.log = logging.getLogger(f"cocotb.{bus._entity._name}")
self.log.info("APB slave model")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2025 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(**kwargs)
self.address_width = len(self.bus.paddr)
self.width = len(self.bus.pwdata)
self.byte_size = 8
self.byte_lanes = self.width // self.byte_size
self.strb_mask = 2**self.byte_lanes-1
self.pprot_present = hasattr(self.bus, "pprot")
self.pstrb_present = hasattr(self.bus, "pstrb")
self.pslverr_present = hasattr(self.bus, "pslverr")
self.log.info("APB slave model configuration:")
self.log.info(" Address width: %d bits", self.address_width)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info("APB slave model signals:")
for sig in sorted(list(set().union(self.bus._signals, self.bus._optional_signals))):
if hasattr(bus, sig):
self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig)))
else:
self.log.info(" %s: not present", sig)
if self.pstrb_present:
assert self.byte_lanes == len(self.bus.pstrb)
assert self.byte_lanes * self.byte_size == self.width
self.bus.pready.setimmediatevalue(False)
self.bus.prdata.setimmediatevalue(0)
if self.pslverr_present:
self.bus.pslverr.setimmediatevalue(0)
self._run_cr = None
self._init_reset(reset, reset_active_level)
async def _write(self, address, data):
await self.target.write(address, data)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
self.bus.pready.value = False
if self._run_cr is not None:
self._run_cr.cancel()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.start_soon(self._run())
async def _run(self):
clock_edge_event = RisingEdge(self.clock)
self.bus.pready.value = False
while True:
await clock_edge_event
if self.pause:
continue
if not int(self.bus.psel.value) or not int(self.bus.penable.value):
continue
addr = (int(self.bus.paddr.value) // self.byte_lanes) * self.byte_lanes
if self.pprot_present:
prot = AxiProt(int(self.bus.pprot.value))
else:
prot = AxiProt.NONSECURE
pslverr = False
if (int(self.bus.pwrite.value)):
data = int(self.bus.pwdata.value)
if self.pstrb_present:
strb = int(self.bus.pstrb.value)
else:
strb = self.strb_mask
# generate operation list
offset = 0
start_offset = None
write_ops = []
data = data.to_bytes(self.byte_lanes, 'little')
if self.log.isEnabledFor(logging.INFO):
self.log.info("Write data paddr: 0x%08x pprot: %s pstrb: 0x%02x data: %s",
addr, prot, strb, ' '.join((f'{c:02x}' for c in data)))
for i in range(self.byte_lanes):
if strb & (1 << i):
if start_offset is None:
start_offset = offset
else:
if start_offset is not None and offset != start_offset:
write_ops.append((addr+start_offset, data[start_offset:offset]))
start_offset = None
offset += 1
if start_offset is not None and offset != start_offset:
write_ops.append((addr+start_offset, data[start_offset:offset]))
# perform writes
try:
for addr, data in write_ops:
await self._write(addr, data)
except Exception:
self.log.warning("Write operation failed")
pslverr = True
else:
try:
data = await self._read(addr, self.byte_lanes)
except Exception:
self.log.warning("Read operation failed")
data = bytes(self.byte_lanes)
pslverr = True
if self.log.isEnabledFor(logging.INFO):
self.log.info("Read data paddr: 0x%08x pprot: %s data: %s",
addr, prot, ' '.join((f'{c:02x}' for c in data)))
self.bus.prdata.value = int.from_bytes(data, 'little')
await clock_edge_event
if self.pslverr_present:
self.bus.pslverr.value = pslverr
self.bus.pready.value = True
await clock_edge_event
self.bus.pready.value = False
if self.pslverr_present:
self.bus.pslverr.value = False
class ApbRam(ApbSlave, Memory):
def __init__(self, bus, clock, reset=None, reset_active_level=True, size=2**64, mem=None, **kwargs):
super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs)
async def _write(self, address, data):
self.write(address % self.size, data)
async def _read(self, address, length):
return self.read(address % self.size, length)