Accept byte_size or byte_lanes arguments in AXI stream constructors if tkeep is not used

This commit is contained in:
Alex Forencich
2020-12-07 01:58:49 -08:00
parent 3dd8114c05
commit e6e8a06dfe

View File

@@ -214,7 +214,7 @@ class AxiStreamSource(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.entity = entity
self.clock = clock
@@ -249,7 +249,6 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tlast"):
self.bus.tlast.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
self.bus.tkeep.setimmediatevalue(0)
if hasattr(self.bus, "tid"):
self.bus.tid.setimmediatevalue(0)
@@ -258,6 +257,18 @@ class AxiStreamSource(object):
if hasattr(self.bus, "tuser"):
self.bus.tuser.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
@@ -284,6 +295,10 @@ class AxiStreamSource(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def send(self, frame):
@@ -413,7 +428,7 @@ class AxiStreamSink(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.entity = entity
self.clock = clock
@@ -448,8 +463,18 @@ class AxiStreamSink(object):
if hasattr(self.bus, "tready"):
self.bus.tready.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
@@ -477,6 +502,10 @@ class AxiStreamSink(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def recv(self, compact=True):
@@ -602,7 +631,7 @@ class AxiStreamMonitor(object):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, *args, **kwargs):
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = SimLog("cocotb.%s.%s" % (entity._name, name))
self.entity = entity
self.clock = clock
@@ -631,6 +660,15 @@ class AxiStreamMonitor(object):
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
@@ -658,6 +696,10 @@ class AxiStreamMonitor(object):
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
cocotb.fork(self._run())
def recv(self, compact=True):