From 079f4009b30e433e9c66c57b1c7cd996fbf0c7cc Mon Sep 17 00:00:00 2001 From: Alex Forencich Date: Tue, 16 Nov 2021 17:00:48 -0800 Subject: [PATCH] Rewrite RAM modules to use common slave implementation --- cocotbext/axi/axi_ram.py | 284 ++------------------------------------ cocotbext/axi/axil_ram.py | 198 ++------------------------ 2 files changed, 28 insertions(+), 454 deletions(-) diff --git a/cocotbext/axi/axi_ram.py b/cocotbext/axi/axi_ram.py index 901c274..c636e73 100644 --- a/cocotbext/axi/axi_ram.py +++ b/cocotbext/axi/axi_ram.py @@ -1,6 +1,6 @@ """ -Copyright (c) 2020 Alex Forencich +Copyright (c) 2021 Alex Forencich Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -22,288 +22,32 @@ THE SOFTWARE. """ -import logging - -import cocotb - -from .version import __version__ -from .constants import AxiBurstType, AxiProt, AxiResp -from .axi_channels import AxiAWSink, AxiWSink, AxiBSource, AxiARSink, AxiRSource +from .axi_slave import AxiSlaveWrite, AxiSlaveRead from .memory import Memory -from .reset import Reset -class AxiRamWrite(Memory, Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): - self.bus = bus - self.clock = clock - self.reset = reset - self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}") +class AxiRamWrite(AxiSlaveWrite, Memory): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): + super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs) - self.log.info("AXI RAM model (write)") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") + async def _write(self, address, data): + self.write(address, data) - super().__init__(size, mem, *args, **kwargs) - self.aw_channel = AxiAWSink(bus.aw, clock, reset, reset_active_level) - self.aw_channel.queue_occupancy_limit = 2 - self.w_channel = AxiWSink(bus.w, clock, reset, reset_active_level) - self.w_channel.queue_occupancy_limit = 2 - self.b_channel = AxiBSource(bus.b, clock, reset, reset_active_level) - self.b_channel.queue_occupancy_limit = 2 +class AxiRamRead(AxiSlaveRead, Memory): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): + super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs) - self.address_width = len(self.aw_channel.bus.awaddr) - self.id_width = len(self.aw_channel.bus.awid) - self.width = len(self.w_channel.bus.wdata) - self.byte_size = 8 - self.byte_lanes = self.width // self.byte_size - self.strb_mask = 2**self.byte_lanes-1 - - self.max_burst_size = (self.byte_lanes-1).bit_length() - - self.log.info("AXI RAM model configuration:") - self.log.info(" Memory size: %d bytes", len(self.mem)) - self.log.info(" Address width: %d bits", self.address_width) - self.log.info(" ID width: %d bits", self.id_width) - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - - self.log.info("AXI RAM model signals:") - for bus in (self.bus.aw, self.bus.w, self.bus.b): - for sig in sorted(list(set().union(bus._signals, bus._optional_signals))): - if hasattr(bus, sig): - self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) - else: - self.log.info(" %s: not present", sig) - - assert self.byte_lanes == len(self.w_channel.bus.wstrb) - assert self.byte_lanes * self.byte_size == self.width - - assert len(self.b_channel.bus.bid) == len(self.aw_channel.bus.awid) - - self._process_write_cr = None - - self._init_reset(reset, reset_active_level) - - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._process_write_cr is not None: - self._process_write_cr.kill() - self._process_write_cr = None - - self.aw_channel.clear() - self.w_channel.clear() - self.b_channel.clear() - else: - self.log.info("Reset de-asserted") - if self._process_write_cr is None: - self._process_write_cr = cocotb.fork(self._process_write()) - - async def _process_write(self): - while True: - aw = await self.aw_channel.recv() - - awid = int(getattr(aw, 'awid', 0)) - addr = int(aw.awaddr) - length = int(getattr(aw, 'awlen', 0)) - size = int(getattr(aw, 'awsize', self.max_burst_size)) - burst = AxiBurstType(getattr(aw, 'awburst', AxiBurstType.INCR)) - prot = AxiProt(getattr(aw, 'awprot', AxiProt.NONSECURE)) - - self.log.info("Write burst awid: 0x%x awaddr: 0x%08x awlen: %d awsize: %d awprot: %s", - awid, addr, length, size, prot) - - num_bytes = 2**size - assert 0 < num_bytes <= self.byte_lanes - - aligned_addr = (addr // num_bytes) * num_bytes - length += 1 - - transfer_size = num_bytes*length - - if burst == AxiBurstType.WRAP: - lower_wrap_boundary = (addr // transfer_size) * transfer_size - upper_wrap_boundary = lower_wrap_boundary + transfer_size - - if burst == AxiBurstType.INCR: - # check 4k boundary crossing - assert 0x1000-(aligned_addr & 0xfff) >= transfer_size - - cur_addr = aligned_addr - - for n in range(length): - cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes - - w = await self.w_channel.recv() - - data = int(w.wdata) - strb = int(getattr(w, 'wstrb', self.strb_mask)) - last = int(w.wlast) - - # todo latency - - self.mem.seek(cur_word_addr % self.size) - - data = data.to_bytes(self.byte_lanes, 'little') - - if self.log.isEnabledFor(logging.DEBUG): - self.log.debug("Write word awid: 0x%x addr: 0x%08x wstrb: 0x%02x data: %s", - awid, cur_addr, strb, ' '.join((f'{c:02x}' for c in data))) - - for i in range(self.byte_lanes): - if strb & (1 << i): - self.mem.write(data[i:i+1]) - else: - self.mem.seek(1, 1) - - assert last == (n == length-1) - - if burst != AxiBurstType.FIXED: - cur_addr += num_bytes - - if burst == AxiBurstType.WRAP: - if cur_addr == upper_wrap_boundary: - cur_addr = lower_wrap_boundary - - b = self.b_channel._transaction_obj() - b.bid = awid - b.bresp = AxiResp.OKAY - - await self.b_channel.send(b) - - -class AxiRamRead(Memory, Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): - self.bus = bus - self.clock = clock - self.reset = reset - self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}") - - self.log.info("AXI RAM model (read)") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") - - super().__init__(size, mem, *args, **kwargs) - - self.ar_channel = AxiARSink(bus.ar, clock, reset, reset_active_level) - self.ar_channel.queue_occupancy_limit = 2 - self.r_channel = AxiRSource(bus.r, clock, reset, reset_active_level) - self.r_channel.queue_occupancy_limit = 2 - - self.address_width = len(self.ar_channel.bus.araddr) - self.id_width = len(self.ar_channel.bus.arid) - self.width = len(self.r_channel.bus.rdata) - self.byte_size = 8 - self.byte_lanes = self.width // self.byte_size - - self.max_burst_size = (self.byte_lanes-1).bit_length() - - self.log.info("AXI RAM model configuration:") - self.log.info(" Memory size: %d bytes", len(self.mem)) - self.log.info(" Address width: %d bits", self.address_width) - self.log.info(" ID width: %d bits", self.id_width) - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - - self.log.info("AXI RAM model signals:") - for bus in (self.bus.ar, self.bus.r): - for sig in sorted(list(set().union(bus._signals, bus._optional_signals))): - if hasattr(bus, sig): - self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) - else: - self.log.info(" %s: not present", sig) - - assert self.byte_lanes * self.byte_size == self.width - - assert len(self.r_channel.bus.rid) == len(self.ar_channel.bus.arid) - - self._process_read_cr = None - - self._init_reset(reset, reset_active_level) - - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._process_read_cr is not None: - self._process_read_cr.kill() - self._process_read_cr = None - - self.ar_channel.clear() - self.r_channel.clear() - else: - self.log.info("Reset de-asserted") - if self._process_read_cr is None: - self._process_read_cr = cocotb.fork(self._process_read()) - - async def _process_read(self): - while True: - ar = await self.ar_channel.recv() - - arid = int(getattr(ar, 'arid', 0)) - addr = int(ar.araddr) - length = int(getattr(ar, 'arlen', 0)) - size = int(getattr(ar, 'arsize', self.max_burst_size)) - burst = AxiBurstType(getattr(ar, 'arburst', AxiBurstType.INCR)) - prot = AxiProt(getattr(ar, 'arprot', AxiProt.NONSECURE)) - - self.log.info("Read burst arid: 0x%x araddr: 0x%08x arlen: %d arsize: %d arprot: %s", - arid, addr, length, size, prot) - - num_bytes = 2**size - assert 0 < num_bytes <= self.byte_lanes - - aligned_addr = (addr // num_bytes) * num_bytes - length += 1 - - transfer_size = num_bytes*length - - if burst == AxiBurstType.WRAP: - lower_wrap_boundary = (addr // transfer_size) * transfer_size - upper_wrap_boundary = lower_wrap_boundary + transfer_size - - if burst == AxiBurstType.INCR: - # check 4k boundary crossing - assert 0x1000-(aligned_addr & 0xfff) >= transfer_size - - cur_addr = aligned_addr - - for n in range(length): - cur_word_addr = (cur_addr // self.byte_lanes) * self.byte_lanes - - self.mem.seek(cur_word_addr % self.size) - - data = self.mem.read(self.byte_lanes) - - r = self.r_channel._transaction_obj() - r.rid = arid - r.rdata = int.from_bytes(data, 'little') - r.rlast = n == length-1 - r.rresp = AxiResp.OKAY - - await self.r_channel.send(r) - - if self.log.isEnabledFor(logging.DEBUG): - self.log.debug("Read word awid: 0x%x addr: 0x%08x data: %s", - arid, cur_addr, ' '.join((f'{c:02x}' for c in data))) - - if burst != AxiBurstType.FIXED: - cur_addr += num_bytes - - if burst == AxiBurstType.WRAP: - if cur_addr == upper_wrap_boundary: - cur_addr = lower_wrap_boundary + async def _read(self, address, length): + return self.read(address, length) class AxiRam(Memory): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): self.write_if = None self.read_if = None - super().__init__(size, mem, *args, **kwargs) + super().__init__(size, mem, **kwargs) self.write_if = AxiRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem) self.read_if = AxiRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem) diff --git a/cocotbext/axi/axil_ram.py b/cocotbext/axi/axil_ram.py index aa70479..2acfe17 100644 --- a/cocotbext/axi/axil_ram.py +++ b/cocotbext/axi/axil_ram.py @@ -1,6 +1,6 @@ """ -Copyright (c) 2020 Alex Forencich +Copyright (c) 2021 Alex Forencich Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -22,202 +22,32 @@ THE SOFTWARE. """ -import logging - -import cocotb - -from .version import __version__ -from .constants import AxiProt, AxiResp -from .axil_channels import AxiLiteAWSink, AxiLiteWSink, AxiLiteBSource, AxiLiteARSink, AxiLiteRSource +from .axil_slave import AxiLiteSlaveWrite, AxiLiteSlaveRead from .memory import Memory -from .reset import Reset -class AxiLiteRamWrite(Memory, Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): - self.bus = bus - self.clock = clock - self.reset = reset - self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}") +class AxiLiteRamWrite(AxiLiteSlaveWrite, Memory): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): + super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs) - self.log.info("AXI lite RAM model (write)") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") - - super().__init__(size, mem, *args, **kwargs) - - self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset, reset_active_level) - self.aw_channel.queue_occupancy_limit = 2 - self.w_channel = AxiLiteWSink(bus.w, clock, reset, reset_active_level) - self.w_channel.queue_occupancy_limit = 2 - self.b_channel = AxiLiteBSource(bus.b, clock, reset, reset_active_level) - self.b_channel.queue_occupancy_limit = 2 - - self.address_width = len(self.aw_channel.bus.awaddr) - self.width = len(self.w_channel.bus.wdata) - self.byte_size = 8 - self.byte_lanes = self.width // self.byte_size - self.strb_mask = 2**self.byte_lanes-1 - - self.log.info("AXI lite RAM model configuration:") - self.log.info(" Memory size: %d bytes", len(self.mem)) - self.log.info(" Address width: %d bits", self.address_width) - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - - self.log.info("AXI lite RAM model signals:") - for bus in (self.bus.aw, self.bus.w, self.bus.b): - for sig in sorted(list(set().union(bus._signals, bus._optional_signals))): - if hasattr(bus, sig): - self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) - else: - self.log.info(" %s: not present", sig) - - assert self.byte_lanes == len(self.w_channel.bus.wstrb) - assert self.byte_lanes * self.byte_size == self.width - - self._process_write_cr = None - - self._init_reset(reset, reset_active_level) - - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._process_write_cr is not None: - self._process_write_cr.kill() - self._process_write_cr = None - - self.aw_channel.clear() - self.w_channel.clear() - self.b_channel.clear() - else: - self.log.info("Reset de-asserted") - if self._process_write_cr is None: - self._process_write_cr = cocotb.fork(self._process_write()) - - async def _process_write(self): - while True: - aw = await self.aw_channel.recv() - - addr = (int(aw.awaddr) // self.byte_lanes) * self.byte_lanes - prot = AxiProt(getattr(aw, 'awprot', AxiProt.NONSECURE)) - - w = await self.w_channel.recv() - - data = int(w.wdata) - strb = int(getattr(w, 'wstrb', self.strb_mask)) - - # todo latency - - self.mem.seek(addr % self.size) - - data = data.to_bytes(self.byte_lanes, 'little') - - if self.log.isEnabledFor(logging.INFO): - self.log.info("Write data awaddr: 0x%08x awprot: %s wstrb: 0x%02x data: %s", - addr, prot, strb, ' '.join((f'{c:02x}' for c in data))) - - for i in range(self.byte_lanes): - if strb & (1 << i): - self.mem.write(data[i:i+1]) - else: - self.mem.seek(1, 1) - - b = self.b_channel._transaction_obj() - b.bresp = AxiResp.OKAY - - await self.b_channel.send(b) + async def _write(self, address, data): + self.write(address, data) -class AxiLiteRamRead(Memory, Reset): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): - self.bus = bus - self.clock = clock - self.reset = reset - self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}") +class AxiLiteRamRead(AxiLiteSlaveRead, Memory): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): + super().__init__(bus, clock, reset, reset_active_level=reset_active_level, size=size, mem=mem, **kwargs) - self.log.info("AXI lite RAM model (read)") - self.log.info("cocotbext-axi version %s", __version__) - self.log.info("Copyright (c) 2020 Alex Forencich") - self.log.info("https://github.com/alexforencich/cocotbext-axi") - - super().__init__(size, mem, *args, **kwargs) - - self.ar_channel = AxiLiteARSink(bus.ar, clock, reset, reset_active_level) - self.ar_channel.queue_occupancy_limit = 2 - self.r_channel = AxiLiteRSource(bus.r, clock, reset, reset_active_level) - self.r_channel.queue_occupancy_limit = 2 - - self.address_width = len(self.ar_channel.bus.araddr) - self.width = len(self.r_channel.bus.rdata) - self.byte_size = 8 - self.byte_lanes = self.width // self.byte_size - - self.log.info("AXI lite RAM model configuration:") - self.log.info(" Memory size: %d bytes", len(self.mem)) - self.log.info(" Address width: %d bits", self.address_width) - self.log.info(" Byte size: %d bits", self.byte_size) - self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes) - - self.log.info("AXI lite RAM model signals:") - for bus in (self.bus.ar, self.bus.r): - for sig in sorted(list(set().union(bus._signals, bus._optional_signals))): - if hasattr(bus, sig): - self.log.info(" %s width: %d bits", sig, len(getattr(bus, sig))) - else: - self.log.info(" %s: not present", sig) - - assert self.byte_lanes * self.byte_size == self.width - - self._process_read_cr = None - - self._init_reset(reset, reset_active_level) - - def _handle_reset(self, state): - if state: - self.log.info("Reset asserted") - if self._process_read_cr is not None: - self._process_read_cr.kill() - self._process_read_cr = None - - self.ar_channel.clear() - self.r_channel.clear() - else: - self.log.info("Reset de-asserted") - if self._process_read_cr is None: - self._process_read_cr = cocotb.fork(self._process_read()) - - async def _process_read(self): - while True: - ar = await self.ar_channel.recv() - - addr = (int(ar.araddr) // self.byte_lanes) * self.byte_lanes - prot = AxiProt(getattr(ar, 'arprot', AxiProt.NONSECURE)) - - # todo latency - - self.mem.seek(addr % self.size) - - data = self.mem.read(self.byte_lanes) - - r = self.r_channel._transaction_obj() - r.rdata = int.from_bytes(data, 'little') - r.rresp = AxiResp.OKAY - - await self.r_channel.send(r) - - if self.log.isEnabledFor(logging.INFO): - self.log.info("Read data araddr: 0x%08x arprot: %s data: %s", - addr, prot, ' '.join((f'{c:02x}' for c in data))) + async def _read(self, address, length): + return self.read(address, length) class AxiLiteRam(Memory): - def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, *args, **kwargs): + def __init__(self, bus, clock, reset=None, reset_active_level=True, size=1024, mem=None, **kwargs): self.write_if = None self.read_if = None - super().__init__(size, mem, *args, **kwargs) + super().__init__(size, mem, **kwargs) self.write_if = AxiLiteRamWrite(bus.write, clock, reset, reset_active_level, mem=self.mem) self.read_if = AxiLiteRamRead(bus.read, clock, reset, reset_active_level, mem=self.mem)