From e6e8a06dfe078dfa8bafe458bd643da31d50d19b Mon Sep 17 00:00:00 2001 From: Alex Forencich Date: Mon, 7 Dec 2020 01:58:49 -0800 Subject: [PATCH] Accept byte_size or byte_lanes arguments in AXI stream constructors if tkeep is not used --- cocotbext/axi/axis.py | 50 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/cocotbext/axi/axis.py b/cocotbext/axi/axis.py index b1d1e76..e51e7f2 100644 --- a/cocotbext/axi/axis.py +++ b/cocotbext/axi/axis.py @@ -214,7 +214,7 @@ class AxiStreamSource(object): _signals = ["tdata"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): + def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.entity = entity self.clock = clock @@ -249,7 +249,6 @@ class AxiStreamSource(object): if hasattr(self.bus, "tlast"): self.bus.tlast.setimmediatevalue(0) if hasattr(self.bus, "tkeep"): - self.byte_lanes = len(self.bus.tkeep) self.bus.tkeep.setimmediatevalue(0) if hasattr(self.bus, "tid"): self.bus.tid.setimmediatevalue(0) @@ -258,6 +257,18 @@ class AxiStreamSource(object): if hasattr(self.bus, "tuser"): self.bus.tuser.setimmediatevalue(0) + if hasattr(self.bus, "tkeep"): + self.byte_lanes = len(self.bus.tkeep) + if byte_size is not None or byte_lanes is not None: + raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected") + else: + if byte_lanes is not None: + self.byte_lanes = byte_lanes + if byte_size is not None: + raise ValueError("Cannot specify both byte_size and byte_lanes") + elif byte_size is not None: + self.byte_lanes = self.width // byte_size + self.byte_size = self.width // self.byte_lanes self.byte_mask = 2**self.byte_size-1 @@ -284,6 +295,10 @@ class AxiStreamSource(object): else: self.log.info(" tuser: not present") + if self.byte_lanes * self.byte_size != self.width: + raise ValueError(f"Bus does not evenly divide into byte lanes " + f"({self.byte_lanes} * {self.byte_size} != {self.width})") + cocotb.fork(self._run()) def send(self, frame): @@ -413,7 +428,7 @@ class AxiStreamSink(object): _signals = ["tdata"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): + def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.entity = entity self.clock = clock @@ -448,8 +463,18 @@ class AxiStreamSink(object): if hasattr(self.bus, "tready"): self.bus.tready.setimmediatevalue(0) + if hasattr(self.bus, "tkeep"): self.byte_lanes = len(self.bus.tkeep) + if byte_size is not None or byte_lanes is not None: + raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected") + else: + if byte_lanes is not None: + self.byte_lanes = byte_lanes + if byte_size is not None: + raise ValueError("Cannot specify both byte_size and byte_lanes") + elif byte_size is not None: + self.byte_lanes = self.width // byte_size self.byte_size = self.width // self.byte_lanes self.byte_mask = 2**self.byte_size-1 @@ -477,6 +502,10 @@ class AxiStreamSink(object): else: self.log.info(" tuser: not present") + if self.byte_lanes * self.byte_size != self.width: + raise ValueError(f"Bus does not evenly divide into byte lanes " + f"({self.byte_lanes} * {self.byte_size} != {self.width})") + cocotb.fork(self._run()) def recv(self, compact=True): @@ -602,7 +631,7 @@ class AxiStreamMonitor(object): _signals = ["tdata"] _optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"] - def __init__(self, entity, name, clock, reset=None, *args, **kwargs): + def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs): self.log = SimLog("cocotb.%s.%s" % (entity._name, name)) self.entity = entity self.clock = clock @@ -631,6 +660,15 @@ class AxiStreamMonitor(object): if hasattr(self.bus, "tkeep"): self.byte_lanes = len(self.bus.tkeep) + if byte_size is not None or byte_lanes is not None: + raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected") + else: + if byte_lanes is not None: + self.byte_lanes = byte_lanes + if byte_size is not None: + raise ValueError("Cannot specify both byte_size and byte_lanes") + elif byte_size is not None: + self.byte_lanes = self.width // byte_size self.byte_size = self.width // self.byte_lanes self.byte_mask = 2**self.byte_size-1 @@ -658,6 +696,10 @@ class AxiStreamMonitor(object): else: self.log.info(" tuser: not present") + if self.byte_lanes * self.byte_size != self.width: + raise ValueError(f"Bus does not evenly divide into byte lanes " + f"({self.byte_lanes} * {self.byte_size} != {self.width})") + cocotb.fork(self._run()) def recv(self, compact=True):