Consolidate AXI stream implementation to remove duplicate code

This commit is contained in:
Alex Forencich
2021-01-08 16:08:36 -08:00
parent 222af15437
commit c18fdd6e22

View File

@@ -231,11 +231,18 @@ class AxiStreamFrame:
return bytes(self.tdata)
class AxiStreamSource(Reset):
class AxiStreamBase(Reset):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
_type = "base"
_init_x = False
_valid_init = None
_ready_init = None
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
@@ -243,7 +250,7 @@ class AxiStreamSource(Reset):
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
self.log.info("AXI stream source")
self.log.info("AXI stream %s", self._type)
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
@@ -252,10 +259,7 @@ class AxiStreamSource(Reset):
self.active = False
self.queue = deque()
self.pause = False
self._pause_generator = None
self._pause_cr = None
self.queue_sync = Event()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
@@ -263,19 +267,17 @@ class AxiStreamSource(Reset):
self.width = len(self.bus.tdata)
self.byte_lanes = 1
self.bus.tdata.setimmediatevalue(0)
if hasattr(self.bus, "tvalid"):
self.bus.tvalid.setimmediatevalue(0)
if hasattr(self.bus, "tlast"):
self.bus.tlast.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.bus.tkeep.setimmediatevalue(0)
if hasattr(self.bus, "tid"):
self.bus.tid.setimmediatevalue(0)
if hasattr(self.bus, "tdest"):
self.bus.tdest.setimmediatevalue(0)
if hasattr(self.bus, "tuser"):
self.bus.tuser.setimmediatevalue(0)
if self._valid_init is not None and hasattr(self.bus, "tvalid"):
self.bus.tvalid.setimmediatevalue(self._valid_init)
if self._ready_init is not None and hasattr(self.bus, "tready"):
self.bus.tready.setimmediatevalue(self._ready_init)
for sig in self._signals+self._optional_signals:
if hasattr(self.bus, sig):
if self._init_x and sig not in ("tvalid", "tready"):
v = getattr(self.bus, sig).value
v.binstr = 'x'*len(v)
getattr(self.bus, sig).setimmediatevalue(v)
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
@@ -292,7 +294,7 @@ class AxiStreamSource(Reset):
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream source configuration:")
self.log.info("AXI stream %s configuration:", self._type)
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
@@ -323,6 +325,70 @@ class AxiStreamSource(Reset):
self._init_reset(reset)
def count(self):
return len(self.queue)
def empty(self):
return not self.queue
def clear(self):
self.queue.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
raise NotImplementedError()
class AxiStreamPause:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pause = False
self._pause_generator = None
self._pause_cr = None
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
self._pause_cr = None
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.fork(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
async def _run_pause(self):
for val in self._pause_generator:
self.pause = val
await RisingEdge(self.clock)
class AxiStreamSource(AxiStreamBase, AxiStreamPause):
_type = "source"
_init_x = True
_valid_init = 0
_ready_init = None
async def send(self, frame):
self.send_nowait(frame)
@@ -338,49 +404,16 @@ class AxiStreamSource(Reset):
def write_nowait(self, data):
self.send_nowait(data)
def count(self):
return len(self.queue)
def empty(self):
return not self.queue
def idle(self):
return self.empty() and not self.active
def clear(self):
self.queue.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self):
while not self.idle():
await RisingEdge(self.clock)
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
self._pause_cr = None
self._pause_generator = generator
if self._pause_generator is not None:
self._pause_cr = cocotb.fork(self._run_pause())
def clear_pause_generator(self):
self.set_pause_generator(None)
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
super()._handle_reset(state)
self.active = False
self.bus.tdata <= 0
if hasattr(self.bus, "tvalid"):
self.bus.tvalid <= 0
@@ -459,101 +492,25 @@ class AxiStreamSource(Reset):
self.bus.tlast <= 0
self.active = bool(frame)
async def _run_pause(self):
for val in self._pause_generator:
self.pause = val
await RisingEdge(self.clock)
class AxiStreamMonitor(AxiStreamBase):
class AxiStreamSink(Reset):
_type = "monitor"
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
_init_x = False
_valid_init = None
_ready_init = None
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs)
self.log.info("AXI stream sink")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(*args, **kwargs)
self.active = False
self.queue = deque()
self.sync = Event()
self.read_queue = []
self.pause = False
self._pause_generator = None
self._pause_cr = None
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.queue_occupancy_limit_bytes = None
self.queue_occupancy_limit_frames = None
self.width = len(self.bus.tdata)
self.byte_lanes = 1
if hasattr(self.bus, "tready"):
self.bus.tready.setimmediatevalue(0)
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream sink configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
if hasattr(self.bus, "tkeep"):
self.log.info(" tkeep width: %d bits", len(self.bus.tkeep))
else:
self.log.info(" tkeep: not present")
if hasattr(self.bus, "tid"):
self.log.info(" tid width: %d bits", len(self.bus.tid))
else:
self.log.info(" tid: not present")
if hasattr(self.bus, "tdest"):
self.log.info(" tdest width: %d bits", len(self.bus.tdest))
else:
self.log.info(" tdest: not present")
if hasattr(self.bus, "tuser"):
self.log.info(" tuser width: %d bits", len(self.bus.tuser))
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
self._run_cr = None
self._init_reset(reset)
async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
self.queue_sync.clear()
await self.queue_sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
@@ -582,62 +539,88 @@ class AxiStreamSink(Reset):
del self.read_queue[:count]
return data
def count(self):
return len(self.queue)
def empty(self):
return not self.queue
def full(self):
if self.queue_occupancy_limit_bytes and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
elif self.queue_occupancy_limit_frames and self.queue_occupancy_frames > self.queue_occupancy_limit_frames:
return True
else:
return False
def idle(self):
return not self.active
def clear(self):
self.queue.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty():
return
self.sync.clear()
self.queue_sync.clear()
if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit))
await First(self.queue_sync.wait(), Timer(timeout, timeout_unit))
else:
await self.sync.wait()
await self.queue_sync.wait()
def set_pause_generator(self, generator=None):
if self._pause_cr is not None:
self._pause_cr.kill()
self._pause_cr = None
async def _run(self):
frame = None
self.active = False
self._pause_generator = generator
while True:
await RisingEdge(self.clock)
if self._pause_generator is not None:
self._pause_cr = cocotb.fork(self._run_pause())
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
def clear_pause_generator(self):
self.set_pause_generator(None)
if tready_sample and tvalid_sample:
if frame is None:
if self.byte_size == 8:
frame = AxiStreamFrame(bytearray(), [], [], [], [])
else:
frame = AxiStreamFrame([], [], [], [], [])
frame.sim_time_start = get_sim_time()
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
if hasattr(self.bus, "tid"):
frame.tid.append(self.bus.tid.value.integer)
if hasattr(self.bus, "tdest"):
frame.tdest.append(self.bus.tdest.value.integer)
if hasattr(self.bus, "tuser"):
frame.tuser.append(self.bus.tuser.value.integer)
if not hasattr(self.bus, "tlast") or self.bus.tlast.value:
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame)
self.queue_occupancy_bytes += len(frame)
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.queue_sync.set()
frame = None
class AxiStreamSink(AxiStreamMonitor, AxiStreamPause):
_type = "sink"
_init_x = False
_valid_init = None
_ready_init = 0
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
super().__init__(entity, name, clock, reset, byte_size, byte_lanes, *args, **kwargs)
self.queue_occupancy_limit_bytes = -1
self.queue_occupancy_limit_frames = -1
def full(self):
if self.queue_occupancy_limit_bytes > 0 and self.queue_occupancy_bytes > self.queue_occupancy_limit_bytes:
return True
elif self.queue_occupancy_limit_frames > 0 and self.queue_occupancy_frames > self.queue_occupancy_limit_frames:
return True
else:
return False
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
super()._handle_reset(state)
self.active = False
if hasattr(self.bus, "tready"):
self.bus.tready <= 0
@@ -680,199 +663,9 @@ class AxiStreamSink(Reset):
self.queue_occupancy_frames += 1
self.queue.append(frame)
self.sync.set()
self.queue_sync.set()
frame = None
if hasattr(self.bus, "tready"):
self.bus.tready <= (not self.full() and not self.pause)
async def _run_pause(self):
for val in self._pause_generator:
self.pause = val
await RisingEdge(self.clock)
class AxiStreamMonitor(Reset):
_signals = ["tdata"]
_optional_signals = ["tvalid", "tready", "tlast", "tkeep", "tid", "tdest", "tuser"]
def __init__(self, entity, name, clock, reset=None, byte_size=None, byte_lanes=None, *args, **kwargs):
self.log = logging.getLogger(f"cocotb.{entity._name}.{name}")
self.entity = entity
self.clock = clock
self.reset = reset
self.bus = Bus(self.entity, name, self._signals, optional_signals=self._optional_signals, **kwargs)
self.log.info("AXI stream monitor")
self.log.info("cocotbext-axi version %s", __version__)
self.log.info("Copyright (c) 2020 Alex Forencich")
self.log.info("https://github.com/alexforencich/cocotbext-axi")
super().__init__(*args, **kwargs)
self.active = False
self.queue = deque()
self.sync = Event()
self.read_queue = []
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
self.width = len(self.bus.tdata)
self.byte_lanes = 1
if hasattr(self.bus, "tkeep"):
self.byte_lanes = len(self.bus.tkeep)
if byte_size is not None or byte_lanes is not None:
raise ValueError("Cannot specify byte_size or byte_lanes if tkeep is connected")
else:
if byte_lanes is not None:
self.byte_lanes = byte_lanes
if byte_size is not None:
raise ValueError("Cannot specify both byte_size and byte_lanes")
elif byte_size is not None:
self.byte_lanes = self.width // byte_size
self.byte_size = self.width // self.byte_lanes
self.byte_mask = 2**self.byte_size-1
self.log.info("AXI stream monitor configuration:")
self.log.info(" Byte size: %d bits", self.byte_size)
self.log.info(" Data width: %d bits (%d bytes)", self.width, self.byte_lanes)
self.log.info(" tvalid: %s", "present" if hasattr(self.bus, "tvalid") else "not present")
self.log.info(" tready: %s", "present" if hasattr(self.bus, "tready") else "not present")
self.log.info(" tlast: %s", "present" if hasattr(self.bus, "tlast") else "not present")
if hasattr(self.bus, "tkeep"):
self.log.info(" tkeep width: %d bits", len(self.bus.tkeep))
else:
self.log.info(" tkeep: not present")
if hasattr(self.bus, "tid"):
self.log.info(" tid width: %d bits", len(self.bus.tid))
else:
self.log.info(" tid: not present")
if hasattr(self.bus, "tdest"):
self.log.info(" tdest width: %d bits", len(self.bus.tdest))
else:
self.log.info(" tdest: not present")
if hasattr(self.bus, "tuser"):
self.log.info(" tuser width: %d bits", len(self.bus.tuser))
else:
self.log.info(" tuser: not present")
if self.byte_lanes * self.byte_size != self.width:
raise ValueError(f"Bus does not evenly divide into byte lanes "
f"({self.byte_lanes} * {self.byte_size} != {self.width})")
self._run_cr = None
self._init_reset(reset)
async def recv(self, compact=True):
while self.empty():
self.sync.clear()
await self.sync.wait()
return self.recv_nowait(compact)
def recv_nowait(self, compact=True):
if self.queue:
frame = self.queue.popleft()
self.queue_occupancy_bytes -= len(frame)
self.queue_occupancy_frames -= 1
if compact:
frame.compact()
return frame
return None
async def read(self, count=-1):
while not self.read_queue:
frame = await self.recv(compact=True)
self.read_queue.extend(frame.tdata)
return self.read_nowait(count)
def read_nowait(self, count=-1):
while not self.empty():
frame = self.recv_nowait(compact=True)
self.read_queue.extend(frame.tdata)
if count < 0:
count = len(self.read_queue)
data = self.read_queue[:count]
del self.read_queue[:count]
return data
def count(self):
return len(self.queue)
def empty(self):
return not self.queue
def idle(self):
return not self.active
def clear(self):
self.queue.clear()
self.queue_occupancy_bytes = 0
self.queue_occupancy_frames = 0
async def wait(self, timeout=0, timeout_unit='ns'):
if not self.empty():
return
self.sync.clear()
if timeout:
await First(self.sync.wait(), Timer(timeout, timeout_unit))
else:
await self.sync.wait()
def _handle_reset(self, state):
if state:
self.log.info("Reset asserted")
if self._run_cr is not None:
self._run_cr.kill()
self._run_cr = None
else:
self.log.info("Reset de-asserted")
if self._run_cr is None:
self._run_cr = cocotb.fork(self._run())
self.active = False
async def _run(self):
frame = None
self.active = False
while True:
await RisingEdge(self.clock)
# read handshake signals
tready_sample = (not hasattr(self.bus, "tready")) or self.bus.tready.value
tvalid_sample = (not hasattr(self.bus, "tvalid")) or self.bus.tvalid.value
if tready_sample and tvalid_sample:
if frame is None:
if self.byte_size == 8:
frame = AxiStreamFrame(bytearray(), [], [], [], [])
else:
frame = AxiStreamFrame([], [], [], [], [])
frame.sim_time_start = get_sim_time()
for offset in range(self.byte_lanes):
frame.tdata.append((self.bus.tdata.value.integer >> (offset * self.byte_size)) & self.byte_mask)
if hasattr(self.bus, "tkeep"):
frame.tkeep.append((self.bus.tkeep.value.integer >> offset) & 1)
if hasattr(self.bus, "tid"):
frame.tid.append(self.bus.tid.value.integer)
if hasattr(self.bus, "tdest"):
frame.tdest.append(self.bus.tdest.value.integer)
if hasattr(self.bus, "tuser"):
frame.tuser.append(self.bus.tuser.value.integer)
if not hasattr(self.bus, "tlast") or self.bus.tlast.value:
frame.sim_time_end = get_sim_time()
self.log.info("RX frame: %s", frame)
self.queue.append(frame)
self.sync.set()
frame = None