Add AXI bus objects

This commit is contained in:
Alex Forencich
2021-03-06 16:26:51 -08:00
parent c18fdd6e22
commit 69717c1698
12 changed files with 269 additions and 100 deletions

View File

@@ -26,10 +26,14 @@ from .version import __version__
from .constants import AxiBurstType, AxiBurstSize, AxiLockType, AxiCacheBit, AxiProt, AxiResp
from .axis import AxiStreamFrame, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
from .axis import AxiStreamFrame, AxiStreamBus, AxiStreamSource, AxiStreamSink, AxiStreamMonitor
from .axil_channels import AxiLiteAWBus, AxiLiteWBus, AxiLiteBBus, AxiLiteARBus, AxiLiteRBus
from .axil_channels import AxiLiteWriteBus, AxiLiteReadBus, AxiLiteBus
from .axil_master import AxiLiteMasterWrite, AxiLiteMasterRead, AxiLiteMaster
from .axil_ram import AxiLiteRamWrite, AxiLiteRamRead, AxiLiteRam
from .axi_channels import AxiAWBus, AxiWBus, AxiBBus, AxiARBus, AxiRBus
from .axi_channels import AxiWriteBus, AxiReadBus, AxiBus
from .axi_master import AxiMasterWrite, AxiMasterRead, AxiMaster
from .axi_ram import AxiRamWrite, AxiRamRead, AxiRam

View File

@@ -25,7 +25,7 @@ THE SOFTWARE.
from .stream import define_stream
# Write address channel
AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW",
AxiAWBus, AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW",
signals=["awid", "awaddr", "awlen", "awsize", "awburst", "awprot", "awvalid", "awready"],
optional_signals=["awlock", "awcache", "awqos", "awregion", "awuser"],
signal_widths={"awlen": 8, "awsize": 3, "awburst": 2, "awlock": 1,
@@ -33,21 +33,21 @@ AxiAWTransaction, AxiAWSource, AxiAWSink, AxiAWMonitor = define_stream("AxiAW",
)
# Write data channel
AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("AxiW",
AxiWBus, AxiWTransaction, AxiWSource, AxiWSink, AxiWMonitor = define_stream("AxiW",
signals=["wdata", "wstrb", "wlast", "wvalid", "wready"],
optional_signals=["wuser"],
signal_widths={"wlast": 1}
)
# Write response channel
AxiBTransaction, AxiBSource, AxiBSink, AxiBMonitor = define_stream("AxiB",
AxiBBus, AxiBTransaction, AxiBSource, AxiBSink, AxiBMonitor = define_stream("AxiB",
signals=["bid", "bresp", "bvalid", "bready"],
optional_signals=["buser"],
signal_widths={"bresp": 2}
)
# Read address channel
AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR",
AxiARBus, AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR",
signals=["arid", "araddr", "arlen", "arsize", "arburst", "arprot", "arvalid", "arready"],
optional_signals=["arlock", "arcache", "arqos", "arregion", "aruser"],
signal_widths={"arlen": 8, "arsize": 3, "arburst": 2, "arlock": 1,
@@ -55,8 +55,79 @@ AxiARTransaction, AxiARSource, AxiARSink, AxiARMonitor = define_stream("AxiAR",
)
# Read data channel
AxiRTransaction, AxiRSource, AxiRSink, AxiRMonitor = define_stream("AxiR",
AxiRBus, AxiRTransaction, AxiRSource, AxiRSink, AxiRMonitor = define_stream("AxiR",
signals=["rid", "rdata", "rresp", "rlast", "rvalid", "rready"],
optional_signals=["ruser"],
signal_widths={"rresp": 2, "rlast": 1}
)
class AxiWriteBus:
def __init__(self, aw=None, w=None, b=None):
self.aw = aw
self.w = w
self.b = b
@classmethod
def from_entity(cls, entity, **kwargs):
aw = AxiAWBus.from_entity(entity, **kwargs)
w = AxiWBus.from_entity(entity, **kwargs)
b = AxiBBus.from_entity(entity, **kwargs)
return cls(aw, w, b)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
aw = AxiAWBus.from_prefix(entity, prefix, **kwargs)
w = AxiWBus.from_prefix(entity, prefix, **kwargs)
b = AxiBBus.from_prefix(entity, prefix, **kwargs)
return cls(aw, w, b)
@classmethod
def from_channels(cls, aw, w, b):
return cls(aw, w, b)
class AxiReadBus:
def __init__(self, ar=None, r=None):
self.ar = ar
self.r = r
@classmethod
def from_entity(cls, entity, **kwargs):
ar = AxiARBus.from_entity(entity, **kwargs)
r = AxiRBus.from_entity(entity, **kwargs)
return cls(ar, r)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
ar = AxiARBus.from_prefix(entity, prefix, **kwargs)
r = AxiRBus.from_prefix(entity, prefix, **kwargs)
return cls(ar, r)
@classmethod
def from_channels(cls, ar, r):
return cls(ar, r)
class AxiBus:
def __init__(self, write=None, read=None, **kwargs):
self.write = write
self.read = read
@classmethod
def from_entity(cls, entity, **kwargs):
write = AxiWriteBus.from_entity(entity, **kwargs)
read = AxiReadBus.from_entity(entity, **kwargs)
return cls(write, read)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
write = AxiWriteBus.from_prefix(entity, prefix, **kwargs)
read = AxiReadBus.from_prefix(entity, prefix, **kwargs)
return cls(write, read)
@classmethod
def from_channels(cls, aw, w, b, ar, r):
write = AxiWriteBus.from_channels(aw, w, b)
read = AxiReadBus.from_channels(ar, r)
return cls(write, read)

View File

@@ -49,19 +49,17 @@ AxiReadResp = namedtuple("AxiReadResp", ["address", "data", "resp", "user"])
class AxiMasterWrite(Reset):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, max_burst_len=256):
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI master (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.reset = reset
self.aw_channel = AxiAWSource(entity, name, clock, reset)
self.w_channel = AxiWSource(entity, name, clock, reset)
self.b_channel = AxiBSink(entity, name, clock, reset)
self.aw_channel = AxiAWSource(bus.aw, clock, reset)
self.w_channel = AxiWSource(bus.w, clock, reset)
self.b_channel = AxiBSink(bus.b, clock, reset)
self.write_command_queue = deque()
self.write_command_sync = Event()
@@ -404,18 +402,16 @@ class AxiMasterWrite(Reset):
class AxiMasterRead(Reset):
def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, max_burst_len=256):
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI master (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.reset = reset
self.ar_channel = AxiARSource(entity, name, clock, reset)
self.r_channel = AxiRSink(entity, name, clock, reset)
self.ar_channel = AxiARSource(bus.ar, clock, reset)
self.r_channel = AxiRSink(bus.r, clock, reset)
self.read_command_queue = deque()
self.read_command_sync = Event()
@@ -741,12 +737,12 @@ class AxiMasterRead(Reset):
class AxiMaster:
def __init__(self, entity, name, clock, reset=None, max_burst_len=256):
def __init__(self, bus, clock, reset=None, max_burst_len=256):
self.write_if = None
self.read_if = None
self.write_if = AxiMasterWrite(entity, name, clock, reset, max_burst_len)
self.read_if = AxiMasterRead(entity, name, clock, reset, max_burst_len)
self.write_if = AxiMasterWrite(bus.write, clock, reset, max_burst_len)
self.read_if = AxiMasterRead(bus.read, clock, reset, max_burst_len)
def init_read(self, address, length, arid=None, burst=AxiBurstType.INCR, size=None,
lock=AxiLockType.NORMAL, cache=0b0011, prot=AxiProt.NONSECURE, qos=0, region=0, user=0, event=None):

View File

@@ -34,8 +34,8 @@ from .reset import Reset
class AxiRamWrite(Memory, Reset):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -44,11 +44,9 @@ class AxiRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.reset = reset
self.aw_channel = AxiAWSink(entity, name, clock, reset)
self.w_channel = AxiWSink(entity, name, clock, reset)
self.b_channel = AxiBSource(entity, name, clock, reset)
self.aw_channel = AxiAWSink(bus.aw, clock, reset)
self.w_channel = AxiWSink(bus.w, clock, reset)
self.b_channel = AxiBSource(bus.b, clock, reset)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
@@ -159,8 +157,8 @@ class AxiRamWrite(Memory, Reset):
class AxiRamRead(Memory, Reset):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -169,10 +167,8 @@ class AxiRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.reset = reset
self.ar_channel = AxiARSink(entity, name, clock, reset)
self.r_channel = AxiRSource(entity, name, clock, reset)
self.ar_channel = AxiARSink(bus.ar, clock, reset)
self.r_channel = AxiRSource(bus.r, clock, reset)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
@@ -266,11 +262,11 @@ class AxiRamRead(Memory, Reset):
class AxiRam(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(size, mem, *args, **kwargs)
self.write_if = AxiRamWrite(entity, name, clock, reset, mem=self.mem)
self.read_if = AxiRamRead(entity, name, clock, reset, mem=self.mem)
self.write_if = AxiRamWrite(bus.write, clock, reset, mem=self.mem)
self.read_if = AxiRamRead(bus.read, clock, reset, mem=self.mem)

View File

@@ -25,30 +25,101 @@ THE SOFTWARE.
from .stream import define_stream
# Write address channel
AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMonitor = define_stream("AxiLiteAW",
AxiLiteAWBus, AxiLiteAWTransaction, AxiLiteAWSource, AxiLiteAWSink, AxiLiteAWMonitor = define_stream("AxiLiteAW",
signals=["awaddr", "awprot", "awvalid", "awready"],
signal_widths={"awprot": 3}
)
# Write data channel
AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor = define_stream("AxiLiteW",
AxiLiteWBus, AxiLiteWTransaction, AxiLiteWSource, AxiLiteWSink, AxiLiteWMonitor = define_stream("AxiLiteW",
signals=["wdata", "wstrb", "wvalid", "wready"]
)
# Write response channel
AxiLiteBTransaction, AxiLiteBSource, AxiLiteBSink, AxiLiteBMonitor = define_stream("AxiLiteB",
AxiLiteBBus, AxiLiteBTransaction, AxiLiteBSource, AxiLiteBSink, AxiLiteBMonitor = define_stream("AxiLiteB",
signals=["bresp", "bvalid", "bready"],
signal_widths={"bresp": 2}
)
# Read address channel
AxiLiteARTransaction, AxiLiteARSource, AxiLiteARSink, AxiLiteARMonitor = define_stream("AxiLiteAR",
AxiLiteARBus, AxiLiteARTransaction, AxiLiteARSource, AxiLiteARSink, AxiLiteARMonitor = define_stream("AxiLiteAR",
signals=["araddr", "arprot", "arvalid", "arready"],
signal_widths={"arprot": 3}
)
# Read data channel
AxiLiteRTransaction, AxiLiteRSource, AxiLiteRSink, AxiLiteRMonitor = define_stream("AxiLiteR",
AxiLiteRBus, AxiLiteRTransaction, AxiLiteRSource, AxiLiteRSink, AxiLiteRMonitor = define_stream("AxiLiteR",
signals=["rdata", "rresp", "rvalid", "rready"],
signal_widths={"rresp": 2}
)
class AxiLiteWriteBus:
def __init__(self, aw=None, w=None, b=None):
self.aw = aw
self.w = w
self.b = b
@classmethod
def from_entity(cls, entity, **kwargs):
aw = AxiLiteAWBus.from_entity(entity, **kwargs)
w = AxiLiteWBus.from_entity(entity, **kwargs)
b = AxiLiteBBus.from_entity(entity, **kwargs)
return cls(aw, w, b)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
aw = AxiLiteAWBus.from_prefix(entity, prefix, **kwargs)
w = AxiLiteWBus.from_prefix(entity, prefix, **kwargs)
b = AxiLiteBBus.from_prefix(entity, prefix, **kwargs)
return cls(aw, w, b)
@classmethod
def from_channels(cls, aw, w, b):
return cls(aw, w, b)
class AxiLiteReadBus:
def __init__(self, ar=None, r=None):
self.ar = ar
self.r = r
@classmethod
def from_entity(cls, entity, **kwargs):
ar = AxiLiteARBus.from_entity(entity, **kwargs)
r = AxiLiteRBus.from_entity(entity, **kwargs)
return cls(ar, r)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
ar = AxiLiteARBus.from_prefix(entity, prefix, **kwargs)
r = AxiLiteRBus.from_prefix(entity, prefix, **kwargs)
return cls(ar, r)
@classmethod
def from_channels(cls, ar, r):
return cls(ar, r)
class AxiLiteBus:
def __init__(self, write=None, read=None, **kwargs):
self.write = write
self.read = read
@classmethod
def from_entity(cls, entity, **kwargs):
write = AxiLiteWriteBus.from_entity(entity, **kwargs)
read = AxiLiteReadBus.from_entity(entity, **kwargs)
return cls(write, read)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
write = AxiLiteWriteBus.from_prefix(entity, prefix, **kwargs)
read = AxiLiteReadBus.from_prefix(entity, prefix, **kwargs)
return cls(write, read)
@classmethod
def from_channels(cls, aw, w, b, ar, r):
write = AxiLiteWriteBus.from_channels(aw, w, b)
read = AxiLiteReadBus.from_channels(ar, r)
return cls(write, read)

View File

@@ -45,19 +45,17 @@ AxiLiteReadResp = namedtuple("AxiLiteReadResp", ["address", "data", "resp"])
class AxiLiteMasterWrite(Reset):
def __init__(self, entity, name, clock, reset=None):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None):
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI lite master (write)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.reset = reset
self.aw_channel = AxiLiteAWSource(entity, name, clock, reset)
self.w_channel = AxiLiteWSource(entity, name, clock, reset)
self.b_channel = AxiLiteBSink(entity, name, clock, reset)
self.aw_channel = AxiLiteAWSource(bus.aw, clock, reset)
self.w_channel = AxiLiteWSource(bus.w, clock, reset)
self.b_channel = AxiLiteBSink(bus.b, clock, reset)
self.write_command_queue = deque()
self.write_command_sync = Event()
@@ -271,18 +269,16 @@ class AxiLiteMasterWrite(Reset):
class AxiLiteMasterRead(Reset):
def __init__(self, entity, name, clock, reset=None):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None):
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI lite master (read)")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
self.reset = reset
self.ar_channel = AxiLiteARSource(entity, name, clock, reset)
self.r_channel = AxiLiteRSink(entity, name, clock, reset)
self.ar_channel = AxiLiteARSource(bus.ar, clock, reset)
self.r_channel = AxiLiteRSink(bus.r, clock, reset)
self.read_command_queue = deque()
self.read_command_sync = Event()
@@ -481,12 +477,12 @@ class AxiLiteMasterRead(Reset):
class AxiLiteMaster:
def __init__(self, entity, name, clock, reset=None):
def __init__(self, bus, clock, reset=None):
self.write_if = None
self.read_if = None
self.write_if = AxiLiteMasterWrite(entity, name, clock, reset)
self.read_if = AxiLiteMasterRead(entity, name, clock, reset)
self.write_if = AxiLiteMasterWrite(bus.write, clock, reset)
self.read_if = AxiLiteMasterRead(bus.read, clock, reset)
def init_read(self, address, length, prot=AxiProt.NONSECURE, event=None):
self.read_if.init_read(address, length, prot, event)

View File

@@ -34,8 +34,8 @@ from .reset import Reset
class AxiLiteRamWrite(Memory, Reset):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{bus.aw._entity._name}.{bus.aw._name}")
self.log.info("AXI lite RAM model (write)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -44,11 +44,9 @@ class AxiLiteRamWrite(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.reset = reset
self.aw_channel = AxiLiteAWSink(entity, name, clock, reset)
self.w_channel = AxiLiteWSink(entity, name, clock, reset)
self.b_channel = AxiLiteBSource(entity, name, clock, reset)
self.aw_channel = AxiLiteAWSink(bus.aw, clock, reset)
self.w_channel = AxiLiteWSink(bus.w, clock, reset)
self.b_channel = AxiLiteBSource(bus.b, clock, reset)
self.width = len(self.w_channel.bus.wdata)
self.byte_size = 8
@@ -117,8 +115,8 @@ class AxiLiteRamWrite(Memory, Reset):
class AxiLiteRamRead(Memory, Reset):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{bus.ar._entity._name}.{bus.ar._name}")
self.log.info("AXI lite RAM model (read)")
self.log.info("cocotbext-axi version %s", __version__)
@@ -127,10 +125,8 @@ class AxiLiteRamRead(Memory, Reset):
super().__init__(size, mem, *args, **kwargs)
self.reset = reset
self.ar_channel = AxiLiteARSink(entity, name, clock, reset)
self.r_channel = AxiLiteRSource(entity, name, clock, reset)
self.ar_channel = AxiLiteARSink(bus.ar, clock, reset)
self.r_channel = AxiLiteRSource(bus.r, clock, reset)
self.width = len(self.r_channel.bus.rdata)
self.byte_size = 8
@@ -186,11 +182,11 @@ class AxiLiteRamRead(Memory, Reset):
class AxiLiteRam(Memory):
def __init__(self, entity, name, clock, reset=None, size=1024, mem=None, *args, **kwargs):
def __init__(self, bus, clock, reset=None, size=1024, mem=None, *args, **kwargs):
self.write_if = None
self.read_if = None
super().__init__(size, mem, *args, **kwargs)
self.write_if = AxiLiteRamWrite(entity, name, clock, reset, mem=self.mem)
self.read_if = AxiLiteRamRead(entity, name, clock, reset, mem=self.mem)
self.write_if = AxiLiteRamWrite(bus.write, clock, reset, mem=self.mem)
self.read_if = AxiLiteRamRead(bus.read, clock, reset, mem=self.mem)

View File

@@ -231,6 +231,23 @@ class AxiStreamFrame:
return bytes(self.tdata)
class AxiStreamBus(Bus):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity=None, prefix=None, **kwargs):
super().__init__(entity, prefix, self._signals, optional_signals=self._optional_signals, **kwargs)
@classmethod
def from_entity(cls, entity, **kwargs):
return cls(entity, **kwargs)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
return cls(entity, prefix, **kwargs)
class AxiStreamBase(Reset):
_signals = ["tdata"]
@@ -243,12 +260,11 @@ class AxiStreamBase(Reset):
_valid_init = None
_ready_init = None
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
def __init__(self, bus, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
self.log.info("AXI stream %s", self._type)
self.log.info("cocotbext-axi version %s", __version__)
@@ -502,8 +518,8 @@ class AxiStreamMonitor(AxiStreamBase):
_valid_init = None
_ready_init = None
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs)
def __init__(self, bus, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(bus, clock, reset, byte_size, byte_lanes, *args, **kwargs)
self.read_queue = []
@@ -604,8 +620,8 @@ class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
_valid_init = None
_ready_init = 0
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs)
def __init__(self, bus, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(bus, clock, reset, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1

View File

@@ -32,6 +32,23 @@ from cocotb.bus import Bus
from .reset import Reset
class StreamBus(Bus):
_signals = ["data"]
_optional_signals = []
def __init__(self, entity=None, prefix=None, **kwargs):
super().__init__(entity, prefix, self._signals, optional_signals=self._optional_signals, **kwargs)
@classmethod
def from_entity(cls, entity, **kwargs):
return cls(entity, **kwargs)
@classmethod
def from_prefix(cls, entity, prefix, **kwargs):
return cls(entity, prefix, **kwargs)
class StreamTransaction:
_signals = ["data"]
@@ -65,13 +82,13 @@ class StreamBase(Reset):
_ready_init = None
_transaction_obj = StreamTransaction
_bus_obj = StreamBus
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
def __init__(self, bus, clock, reset=None, *args, **kwargs):
self.bus = bus
self.clock = clock
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
self.log = logging.getLogger(f"cocotb.{bus._entity._name}.{bus._name}")
super().__init__(*args, **kwargs)
@@ -254,8 +271,8 @@ class StreamSink(StreamMonitor, StreamPause):
_valid_init = None
_ready_init = 0
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
super().__init__(entity, name, clock, reset, *args, **kwargs)
def __init__(self, bus, clock, reset=None, *args, **kwargs):
super().__init__(bus, clock, reset, *args, **kwargs)
self.queue_occupancy_limit = None
@@ -327,6 +344,11 @@ def define_stream(name, signals, optional_signals=None, valid_signal=None, ready
if s not in (ready_signal, valid_signal):
filtered_signals.append(s)
attrib = {}
attrib['_signals'] = signals
attrib['_optional_signals'] = optional_signals
bus = type(name+"Bus", (StreamBus,), attrib)
attrib = {s: 0 for s in filtered_signals}
attrib['_signals'] = filtered_signals
@@ -339,9 +361,10 @@ def define_stream(name, signals, optional_signals=None, valid_signal=None, ready
attrib['_ready_signal'] = ready_signal
attrib['_valid_signal'] = valid_signal
attrib['_transaction_obj'] = transaction
attrib['_bus_obj'] = bus
source = type(name+"Source", (StreamSource,), attrib)
sink = type(name+"Sink", (StreamSink,), attrib)
monitor = type(name+"Monitor", (StreamMonitor,), attrib)
return transaction, source, sink, monitor
return bus, transaction, source, sink, monitor