From 645ebb069ccfe623135e339e2584e1d28dd71e5a Mon Sep 17 00:00:00 2001 From: Alex Forencich Date: Tue, 1 Dec 2020 22:48:41 -0800 Subject: [PATCH] Peel off memory interface methods into separate object, add word access wrappers --- cocotbext/axi/axi_ram.py | 76 ++++------------------------ cocotbext/axi/axil_ram.py | 78 ++++------------------------ cocotbext/axi/memory.py | 104 ++++++++++++++++++++++++++++++++++++++ tests/axi/test_axi.py | 10 ++-- tests/axil/test_axil.py | 10 ++-- 5 files changed, 135 insertions(+), 143 deletions(-) create mode 100644 cocotbext/axi/memory.py diff --git a/cocotbext/axi/axi_ram.py b/cocotbext/axi/axi_ram.py index 91d13e1..9c58647 100644 --- a/cocotbext/axi/axi_ram.py +++ b/cocotbext/axi/axi_ram.py @@ -22,19 +22,17 @@ THE SOFTWARE. """ -import mmap - import cocotb from cocotb.log import SimLog from .version import __version__ from .constants import AxiBurstType, AxiProt, AxiResp from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource -from .utils import hexdump, hexdump_str +from .memory import Memory -class AxiRamWrite(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiRamWrite(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log.info("AXI RAM model (write)") @@ -42,11 +40,7 @@ class AxiRamWrite(object): self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-axi") - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) + super().__init__(size, mem, *args, **kwargs) self.reset = reset @@ -75,20 +69,6 @@ class AxiRamWrite(object): cocotb.fork(self._process_write()) - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) - async def _process_write(self): while True: await self.aw_channel.wait() @@ -163,8 +143,8 @@ class AxiRamWrite(object): self.b_channel.send(b) -class AxiRamRead(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiRamRead(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log.info("AXI RAM model (read)") @@ -172,11 +152,7 @@ class AxiRamRead(object): self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-axi") - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) + super().__init__(size, mem, *args, **kwargs) self.reset = reset @@ -202,20 +178,6 @@ class AxiRamRead(object): cocotb.fork(self._process_read()) - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) - async def _process_read(self): while True: await self.ar_channel.wait() @@ -275,30 +237,12 @@ class AxiRamRead(object): cur_addr = lower_wrap_boundary -class AxiRam(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiRam(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.write_if = None self.read_if = None - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) + super().__init__(size, mem, *args, **kwargs) self.write_if = AxiRamWrite(entity, name, clock, reset, mem=self.mem) self.read_if = AxiRamRead(entity, name, clock, reset, mem=self.mem) - - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) diff --git a/cocotbext/axi/axil_ram.py b/cocotbext/axi/axil_ram.py index c740153..5ffbd0c 100644 --- a/cocotbext/axi/axil_ram.py +++ b/cocotbext/axi/axil_ram.py @@ -22,19 +22,17 @@ THE SOFTWARE. """ -import mmap - import cocotb from cocotb.log import SimLog from .version import __version__ from .constants import AxiProt, AxiResp from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource -from .utils import hexdump, hexdump_str +from .memory import Memory -class AxiLiteRamWrite(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiLiteRamWrite(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log.info("AXI lite RAM model (write)") @@ -42,11 +40,7 @@ class AxiLiteRamWrite(object): self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-axi") - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) + super().__init__(size, mem, *args, **kwargs) self.reset = reset @@ -72,20 +66,6 @@ class AxiLiteRamWrite(object): cocotb.fork(self._process_write()) - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) - async def _process_write(self): while True: await self.aw_channel.wait() @@ -121,8 +101,8 @@ class AxiLiteRamWrite(object): self.b_channel.send(b) -class AxiLiteRamRead(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiLiteRamRead(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.log.info("AXI lite RAM model (read)") @@ -130,17 +110,13 @@ class AxiLiteRamRead(object): self.log.info("Copyright (c) 2020 Alex Forencich") self.log.info("https://github.com/alexforencich/cocotbext-axi") + super().__init__(size, mem, *args, **kwargs) + self.reset = reset self.ar_channel = AxiLiteARSink(entity, name, clock, reset) self.r_channel = AxiLiteRSource(entity, name, clock, reset) - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) - self.in_flight_operations = 0 self.width = len(self.r_channel.bus.rdata) @@ -157,20 +133,6 @@ class AxiLiteRamRead(object): cocotb.fork(self._process_read()) - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) - async def _process_read(self): while True: await self.ar_channel.wait() @@ -195,30 +157,12 @@ class AxiLiteRamRead(object): addr, prot, ' '.join((f'{c:02x}' for c in data))) -class AxiLiteRam(object): - def __init__(self, entity, name, clock, reset=None, size=1024, mem=None): +class AxiLiteRam(Memory): + def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs): self.write_if = None self.read_if = None - if type(mem) is mmap.mmap: - self.mem = mem - else: - self.mem = mmap.mmap(-1, size) - self.size = len(self.mem) + super().__init__(size, mem, *args, **kwargs) self.write_if = AxiLiteRamWrite(entity, name, clock, reset, mem=self.mem) self.read_if = AxiLiteRamRead(entity, name, clock, reset, mem=self.mem) - - def read_mem(self, address, length): - self.mem.seek(address) - return self.mem.read(length) - - def write_mem(self, address, data): - self.mem.seek(address) - self.mem.write(bytes(data)) - - def hexdump(self, address, length, prefix=""): - hexdump(self.mem, address, length, prefix=prefix) - - def hexdump_str(self, address, length, prefix=""): - return hexdump_str(self.mem, address, length, prefix=prefix) diff --git a/cocotbext/axi/memory.py b/cocotbext/axi/memory.py new file mode 100644 index 0000000..c2c26f3 --- /dev/null +++ b/cocotbext/axi/memory.py @@ -0,0 +1,104 @@ +""" + +Copyright (c) 2020 Alex Forencich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. + +""" + +import mmap + +from .utils import hexdump, hexdump_lines, hexdump_str + + +class Memory(object): + def __init__(self, size=1024, mem=None, *args, **kwargs): + if mem is not None: + self.mem = mem + else: + self.mem = mmap.mmap(-1, size) + self.size = len(self.mem) + super().__init__(*args, **kwargs) + + def read(self, address, length): + self.mem.seek(address) + return self.mem.read(length) + + def write(self, address, data): + self.mem.seek(address) + self.mem.write(bytes(data)) + + def write_words(self, address, data, byteorder='little', ws=2): + words = data + data = bytearray() + for w in words: + data.extend(w.to_bytes(ws, byteorder)) + self.write(address, data) + + def write_dwords(self, address, data, byteorder='little'): + self.write_words(address, data, byteorder, 4) + + def write_qwords(self, address, data, byteorder='little'): + self.write_words(address, data, byteorder, 8) + + def write_byte(self, address, data): + self.write(address, [data]) + + def write_word(self, address, data, byteorder='little', ws=2): + self.write_words(address, [data], byteorder, ws) + + def write_dword(self, address, data, byteorder='little'): + self.write_dwords(address, [data], byteorder) + + def write_qword(self, address, data, byteorder='little'): + self.write_qwords(address, [data], byteorder) + + def read_words(self, address, count, byteorder='little', ws=2): + data = self.read(address, count*ws) + words = [] + for k in range(count): + words.append(int.from_bytes(data[ws*k:ws*(k+1)], byteorder)) + return words + + def read_dwords(self, address, count, byteorder='little'): + return self.read_words(address, count, byteorder, 4) + + def read_qwords(self, address, count, byteorder='little'): + return self.read_words(address, count, byteorder, 8) + + def read_byte(self, address): + return self.read(address, 1)[0] + + def read_word(self, address, byteorder='little', ws=2): + return self.read_words(address, 1, byteorder, ws)[0] + + def read_dword(self, address, byteorder='little'): + return self.read_dwords(address, 1, byteorder)[0] + + def read_qword(self, address, byteorder='little'): + return self.read_qwords(address, 1, byteorder)[0] + + def hexdump(self, address, length, prefix=""): + hexdump(self.mem, address, length, prefix=prefix) + + def hexdump_lines(self, address, length, prefix=""): + return hexdump_lines(self.mem, address, length, prefix=prefix) + + def hexdump_str(self, address, length, prefix=""): + return hexdump_str(self.mem, address, length, prefix=prefix) diff --git a/tests/axi/test_axi.py b/tests/axi/test_axi.py index 1b46a77..b23d160 100644 --- a/tests/axi/test_axi.py +++ b/tests/axi/test_axi.py @@ -102,15 +102,15 @@ async def run_test_write(dut, idle_inserter=None, backpressure_inserter=None, si addr = offset+0x1000 test_data = bytearray([x % 256 for x in range(length)]) - tb.axi_ram.write_mem(addr-128, b'\xaa'*(length+256)) + tb.axi_ram.write(addr-128, b'\xaa'*(length+256)) await tb.axi_master.write(addr, test_data, size=size) tb.log.debug("%s", tb.axi_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) - assert tb.axi_ram.read_mem(addr, length) == test_data - assert tb.axi_ram.read_mem(addr-1, 1) == b'\xaa' - assert tb.axi_ram.read_mem(addr+length, 1) == b'\xaa' + assert tb.axi_ram.read(addr, length) == test_data + assert tb.axi_ram.read(addr-1, 1) == b'\xaa' + assert tb.axi_ram.read(addr+length, 1) == b'\xaa' await RisingEdge(dut.clk) await RisingEdge(dut.clk) @@ -137,7 +137,7 @@ async def run_test_read(dut, idle_inserter=None, backpressure_inserter=None, siz addr = offset+0x1000 test_data = bytearray([x % 256 for x in range(length)]) - tb.axi_ram.write_mem(addr, test_data) + tb.axi_ram.write(addr, test_data) data = await tb.axi_master.read(addr, length, size=size) diff --git a/tests/axil/test_axil.py b/tests/axil/test_axil.py index 343ca42..e44377b 100644 --- a/tests/axil/test_axil.py +++ b/tests/axil/test_axil.py @@ -95,15 +95,15 @@ async def run_test_write(dut, data_in=None, idle_inserter=None, backpressure_ins addr = offset+0x1000 test_data = bytearray([x % 256 for x in range(length)]) - tb.axil_ram.write_mem(addr-128, b'\xaa'*(length+256)) + tb.axil_ram.write(addr-128, b'\xaa'*(length+256)) await tb.axil_master.write(addr, test_data) tb.log.debug("%s", tb.axil_ram.hexdump_str((addr & 0xfffffff0)-16, (((addr & 0xf)+length-1) & 0xfffffff0)+48)) - assert tb.axil_ram.read_mem(addr, length) == test_data - assert tb.axil_ram.read_mem(addr-1, 1) == b'\xaa' - assert tb.axil_ram.read_mem(addr+length, 1) == b'\xaa' + assert tb.axil_ram.read(addr, length) == test_data + assert tb.axil_ram.read(addr-1, 1) == b'\xaa' + assert tb.axil_ram.read(addr+length, 1) == b'\xaa' await RisingEdge(dut.clk) await RisingEdge(dut.clk) @@ -126,7 +126,7 @@ async def run_test_read(dut, data_in=None, idle_inserter=None, backpressure_inse addr = offset+0x1000 test_data = bytearray([x % 256 for x in range(length)]) - tb.axil_ram.write_mem(addr, test_data) + tb.axil_ram.write(addr, test_data) data = await tb.axil_master.read(addr, length)