diff --git a/cocotbext/axi/axi_master.py b/cocotbext/axi/axi_master.py index 6e8b780..ef95f1b 100644 --- a/cocotbext/axi/axi_master.py +++ b/cocotbext/axi/axi_master.py @@ -33,6 +33,7 @@ from cocotb.triggers import Event from .version import __version__ from .constants import AxiBurstType, AxiLockType, AxiProt, AxiResp from .axi_channels import AxiAWSource, AxiWSource, AxiBSink, AxiARSource, AxiRSink +from .address_space import Region from .reset import Reset @@ -192,8 +193,8 @@ class TagContextManager: return flushed_cmds -class AxiMasterWrite(Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256): +class AxiMasterWrite(Region, Reset): + def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs): self.bus = bus self.clock = clock self.reset = reset @@ -243,6 +244,8 @@ class AxiMasterWrite(Reset): self.wuser_present = hasattr(self.bus.w, "wuser") self.buser_present = hasattr(self.bus.b, "buser") + super().__init__(2**self.address_width, **kwargs) + self.log.info("AXI master configuration:") self.log.info(" Address width: %d bits", self.address_width) self.log.info(" ID width: %d bits", self.id_width) @@ -350,43 +353,6 @@ class AxiMasterWrite(Reset): await event.wait() return event.data - async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - words = data - data = bytearray() - for w in words: - data.extend(w.to_bytes(ws, byteorder)) - await self.write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write_words(address, data, byteorder, 4, awid, burst, size, - lock, cache, prot, qos, region, user, wuser) - - async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write_words(address, data, byteorder, 8, awid, burst, size, - lock, cache, prot, qos, region, user, wuser) - - async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write(address, [data], awid, burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write_words(address, [data], byteorder, ws, awid, burst, size, - lock, cache, prot, qos, region, user, wuser) - - async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write_dwords(address, [data], byteorder, awid, burst, size, - lock, cache, prot, qos, region, user, wuser) - - async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - await self.write_qwords(address, [data], byteorder, awid, burst, size, - lock, cache, prot, qos, region, user, wuser) - def _handle_reset(self, state): if state: self.log.info("Reset asserted") @@ -589,8 +555,8 @@ class AxiMasterWrite(Reset): self._idle.set() -class AxiMasterRead(Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256): +class AxiMasterRead(Region, Reset): + def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs): self.bus = bus self.clock = clock self.reset = reset @@ -636,6 +602,8 @@ class AxiMasterRead(Reset): self.aruser_present = hasattr(self.bus.ar, "aruser") self.ruser_present = hasattr(self.bus.r, "ruser") + super().__init__(2**self.address_width, **kwargs) + self.log.info("AXI master configuration:") self.log.info(" Address width: %d bits", self.address_width) self.log.info(" ID width: %d bits", self.id_width) @@ -731,43 +699,6 @@ class AxiMasterRead(Reset): await event.wait() return event.data - async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - data = await self.read(address, count*ws, arid, burst, size, lock, cache, prot, qos, region, user) - words = [] - for k in range(count): - words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder)) - return words - - async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_words(address, count, byteorder, 4, arid, burst, size, - lock, cache, prot, qos, region, user) - - async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_words(address, count, byteorder, 8, arid, burst, size, - lock, cache, prot, qos, region, user) - - async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return (await self.read(address, 1, arid, burst, size, lock, cache, prot, qos, region, user)).data[0] - - async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return (await self.read_words(address, 1, byteorder, ws, arid, burst, size, - lock, cache, prot, qos, region, user))[0] - - async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return (await self.read_dwords(address, 1, byteorder, arid, burst, size, - lock, cache, prot, qos, region, user))[0] - - async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return (await self.read_qwords(address, 1, byteorder, arid, burst, size, - lock, cache, prot, qos, region, user))[0] - def _handle_reset(self, state): if state: self.log.info("Reset asserted") @@ -969,13 +900,15 @@ class AxiMasterRead(Reset): self._idle.set() -class AxiMaster: - def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256): +class AxiMaster(Region): + def __init__(self, bus, clock, reset=None, reset_active_level=True, max_burst_len=256, **kwargs): self.write_if = None self.read_if = None - self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len) - self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len) + self.write_if = AxiMasterWrite(bus.write, clock, reset, reset_active_level, max_burst_len, **kwargs) + self.read_if = AxiMasterRead(bus.read, clock, reset, reset_active_level, max_burst_len, **kwargs) + + super().__init__(max(self.write_if.size, self.read_if.size), **kwargs) def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None): @@ -1004,77 +937,7 @@ class AxiMaster: return await self.read_if.read(address, length, arid, burst, size, lock, cache, prot, qos, region, user) - async def read_words(self, address, count, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_words(address, count, byteorder, ws, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_dwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_dwords(address, count, byteorder, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_qwords(self, address, count, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_qwords(address, count, byteorder, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_byte(self, address, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_byte(address, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_word(self, address, byteorder='little', ws=2, arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_word(address, byteorder, ws, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_dword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_dword(address, byteorder, arid, - burst, size, lock, cache, prot, qos, region, user) - - async def read_qword(self, address, byteorder='little', arid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0): - return await self.read_if.read_qword(address, byteorder, arid, - burst, size, lock, cache, prot, qos, region, user) - async def write(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): return await self.write_if.write(address, data, awid, burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_words(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_words(address, data, byteorder, ws, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_dwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_dwords(address, data, byteorder, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_qwords(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_qwords(address, data, byteorder, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_byte(self, address, data, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_byte(address, data, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_word(self, address, data, byteorder='little', ws=2, awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_word(address, data, byteorder, ws, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_dword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_dword(address, data, byteorder, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) - - async def write_qword(self, address, data, byteorder='little', awid=None, burst=AxiBurstType.INCR, size=None, - lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, wuser=0): - return await self.write_if.write_qword(address, data, byteorder, awid, - burst, size, lock, cache, prot, qos, region, user, wuser) diff --git a/cocotbext/axi/axil_master.py b/cocotbext/axi/axil_master.py index 2fbfa79..3c5e7de 100644 --- a/cocotbext/axi/axil_master.py +++ b/cocotbext/axi/axil_master.py @@ -32,6 +32,7 @@ from cocotb.triggers import Event from .version import __version__ from .constants import AxiProt, AxiResp from .axil_channels import AxiLiteAWSource, AxiLiteWSource, AxiLiteBSink, AxiLiteARSource, AxiLiteRSink +from .address_space import Region from .reset import Reset @@ -82,8 +83,8 @@ class AxiLiteReadResp(NamedTuple): return self.data -class AxiLiteMasterWrite(Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True): +class AxiLiteMasterWrite(Region, Reset): + def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs): self.bus = bus self.clock = clock self.reset = reset @@ -119,6 +120,8 @@ class AxiLiteMasterWrite(Reset): self.awprot_present = hasattr(self.bus.aw, "awprot") + super().__init__(2**self.address_width, **kwargs) + self.log.info("AXI lite master configuration:") self.log.info(" Address width: %d bits", self.address_width) self.log.info(" Byte size: %d bits", self.byte_size) @@ -175,31 +178,6 @@ class AxiLiteMasterWrite(Reset): await event.wait() return event.data - async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - words = data - data = bytearray() - for w in words: - data.extend(w.to_bytes(ws, byteorder)) - await self.write(address, data, prot) - - async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - await self.write_words(address, data, byteorder, 4, prot) - - async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - await self.write_words(address, data, byteorder, 8, prot) - - async def write_byte(self, address, data, prot=AxiProt.NONSECURE): - await self.write(address, [data], prot) - - async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - await self.write_words(address, [data], byteorder, ws, prot) - - async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - await self.write_dwords(address, [data], byteorder, prot) - - async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - await self.write_qwords(address, [data], byteorder, prot) - def _handle_reset(self, state): if state: self.log.info("Reset asserted") @@ -330,8 +308,8 @@ class AxiLiteMasterWrite(Reset): self._idle.set() -class AxiLiteMasterRead(Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True): +class AxiLiteMasterRead(Region, Reset): + def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs): self.bus = bus self.clock = clock self.reset = reset @@ -364,6 +342,8 @@ class AxiLiteMasterRead(Reset): self.arprot_present = hasattr(self.bus.ar, "arprot") + super().__init__(2**self.address_width, **kwargs) + self.log.info("AXI lite master configuration:") self.log.info(" Address width: %d bits", self.address_width) self.log.info(" Byte size: %d bits", self.byte_size) @@ -416,31 +396,6 @@ class AxiLiteMasterRead(Reset): await event.wait() return event.data - async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - data = await self.read(address, count*ws, prot) - words = [] - for k in range(count): - words.append(int.from_bytes(data.data[ws*k:ws*(k+1)], byteorder)) - return words - - async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_words(address, count, byteorder, 4, prot) - - async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_words(address, count, byteorder, 8, prot) - - async def read_byte(self, address, prot=AxiProt.NONSECURE): - return (await self.read(address, 1, prot)).data[0] - - async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - return (await self.read_words(address, 1, byteorder, ws, prot))[0] - - async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE): - return (await self.read_dwords(address, 1, byteorder, prot))[0] - - async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE): - return (await self.read_qwords(address, 1, byteorder, prot))[0] - def _handle_reset(self, state): if state: self.log.info("Reset asserted") @@ -558,13 +513,15 @@ class AxiLiteMasterRead(Reset): self._idle.set() -class AxiLiteMaster: - def __init__(self, bus, clock, reset=None, reset_active_level=True): +class AxiLiteMaster(Region): + def __init__(self, bus, clock, reset=None, reset_active_level=True, **kwargs): self.write_if = None self.read_if = None - self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level) - self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level) + self.write_if = AxiLiteMasterWrite(bus.write, clock, reset, reset_active_level, **kwargs) + self.read_if = AxiLiteMasterRead(bus.read, clock, reset, reset_active_level, **kwargs) + + super().__init__(max(self.write_if.size, self.read_if.size), **kwargs) def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None): return self.read_if.init_read(address, length, prot, event) @@ -589,47 +546,5 @@ class AxiLiteMaster: async def read(self, address, length, prot=AxiProt.NONSECURE): return await self.read_if.read(address, length, prot) - async def read_words(self, address, count, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - return await self.read_if.read_words(address, count, byteorder, ws, prot) - - async def read_dwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_if.read_dwords(address, count, byteorder, prot) - - async def read_qwords(self, address, count, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_if.read_qwords(address, count, byteorder, prot) - - async def read_byte(self, address, prot=AxiProt.NONSECURE): - return await self.read_if.read_byte(address, prot) - - async def read_word(self, address, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - return await self.read_if.read_word(address, byteorder, ws, prot) - - async def read_dword(self, address, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_if.read_dword(address, byteorder, prot) - - async def read_qword(self, address, byteorder='little', prot=AxiProt.NONSECURE): - return await self.read_if.read_qword(address, byteorder, prot) - async def write(self, address, data, prot=AxiProt.NONSECURE): return await self.write_if.write(address, data, prot) - - async def write_words(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - return await self.write_if.write_words(address, data, byteorder, ws, prot) - - async def write_dwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - return await self.write_if.write_dwords(address, data, byteorder, prot) - - async def write_qwords(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - return await self.write_if.write_qwords(address, data, byteorder, prot) - - async def write_byte(self, address, data, prot=AxiProt.NONSECURE): - return await self.write_if.write_byte(address, data, prot) - - async def write_word(self, address, data, byteorder='little', ws=2, prot=AxiProt.NONSECURE): - return await self.write_if.write_word(address, data, byteorder, ws, prot) - - async def write_dword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - return await self.write_if.write_dword(address, data, byteorder, prot) - - async def write_qword(self, address, data, byteorder='little', prot=AxiProt.NONSECURE): - return await self.write_if.write_qword(address, data, byteorder, prot)