Peel off memory interface methods into separate object, add word access wrappers
This commit is contained in:
@@ -22,19 +22,17 @@ THE SOFTWARE.
|
||||
|
||||
"""
|
||||
|
||||
import mmap
|
||||
|
||||
import cocotb
|
||||
from cocotb.log import SimLog
|
||||
|
||||
from .version import __version__
|
||||
from .constants import AxiProt, AxiResp
|
||||
from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource
|
||||
from .utils import hexdump, hexdump_str
|
||||
from .memory import Memory
|
||||
|
||||
|
||||
class AxiLiteRamWrite(object):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
|
||||
class AxiLiteRamWrite(Memory):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
|
||||
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
|
||||
|
||||
self.log.info("AXI lite RAM model (write)")
|
||||
@@ -42,11 +40,7 @@ class AxiLiteRamWrite(object):
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
if type(mem) is mmap.mmap:
|
||||
self.mem = mem
|
||||
else:
|
||||
self.mem = mmap.mmap(-1, size)
|
||||
self.size = len(self.mem)
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.reset = reset
|
||||
|
||||
@@ -72,20 +66,6 @@ class AxiLiteRamWrite(object):
|
||||
|
||||
cocotb.fork(self._process_write())
|
||||
|
||||
def read_mem(self, address, length):
|
||||
self.mem.seek(address)
|
||||
return self.mem.read(length)
|
||||
|
||||
def write_mem(self, address, data):
|
||||
self.mem.seek(address)
|
||||
self.mem.write(bytes(data))
|
||||
|
||||
def hexdump(self, address, length, prefix=""):
|
||||
hexdump(self.mem, address, length, prefix=prefix)
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem, address, length, prefix=prefix)
|
||||
|
||||
async def _process_write(self):
|
||||
while True:
|
||||
await self.aw_channel.wait()
|
||||
@@ -121,8 +101,8 @@ class AxiLiteRamWrite(object):
|
||||
self.b_channel.send(b)
|
||||
|
||||
|
||||
class AxiLiteRamRead(object):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
|
||||
class AxiLiteRamRead(Memory):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
|
||||
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
|
||||
|
||||
self.log.info("AXI lite RAM model (read)")
|
||||
@@ -130,17 +110,13 @@ class AxiLiteRamRead(object):
|
||||
self.log.info("Copyright (c) 2020 Alex Forencich")
|
||||
self.log.info("https://github.com/alexforencich/cocotbext-axi")
|
||||
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.reset = reset
|
||||
|
||||
self.ar_channel = AxiLiteARSink(entity, name, clock, reset)
|
||||
self.r_channel = AxiLiteRSource(entity, name, clock, reset)
|
||||
|
||||
if type(mem) is mmap.mmap:
|
||||
self.mem = mem
|
||||
else:
|
||||
self.mem = mmap.mmap(-1, size)
|
||||
self.size = len(self.mem)
|
||||
|
||||
self.in_flight_operations = 0
|
||||
|
||||
self.width = len(self.r_channel.bus.rdata)
|
||||
@@ -157,20 +133,6 @@ class AxiLiteRamRead(object):
|
||||
|
||||
cocotb.fork(self._process_read())
|
||||
|
||||
def read_mem(self, address, length):
|
||||
self.mem.seek(address)
|
||||
return self.mem.read(length)
|
||||
|
||||
def write_mem(self, address, data):
|
||||
self.mem.seek(address)
|
||||
self.mem.write(bytes(data))
|
||||
|
||||
def hexdump(self, address, length, prefix=""):
|
||||
hexdump(self.mem, address, length, prefix=prefix)
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem, address, length, prefix=prefix)
|
||||
|
||||
async def _process_read(self):
|
||||
while True:
|
||||
await self.ar_channel.wait()
|
||||
@@ -195,30 +157,12 @@ class AxiLiteRamRead(object):
|
||||
addr, prot, ' '.join((f'{c:02x}' for c in data)))
|
||||
|
||||
|
||||
class AxiLiteRam(object):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None):
|
||||
class AxiLiteRam(Memory):
|
||||
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
|
||||
self.write_if = None
|
||||
self.read_if = None
|
||||
|
||||
if type(mem) is mmap.mmap:
|
||||
self.mem = mem
|
||||
else:
|
||||
self.mem = mmap.mmap(-1, size)
|
||||
self.size = len(self.mem)
|
||||
super().__init__(size, mem, *args, **kwargs)
|
||||
|
||||
self.write_if = AxiLiteRamWrite(entity, name, clock, reset, mem=self.mem)
|
||||
self.read_if = AxiLiteRamRead(entity, name, clock, reset, mem=self.mem)
|
||||
|
||||
def read_mem(self, address, length):
|
||||
self.mem.seek(address)
|
||||
return self.mem.read(length)
|
||||
|
||||
def write_mem(self, address, data):
|
||||
self.mem.seek(address)
|
||||
self.mem.write(bytes(data))
|
||||
|
||||
def hexdump(self, address, length, prefix=""):
|
||||
hexdump(self.mem, address, length, prefix=prefix)
|
||||
|
||||
def hexdump_str(self, address, length, prefix=""):
|
||||
return hexdump_str(self.mem, address, length, prefix=prefix)
|
||||
|
||||
Reference in New Issue
Block a user