cndm: Use event queues in driver model

Signed-off-by: Alex Forencich <alex@alexforencich.com>
This commit is contained in:
Alex Forencich
2026-03-10 00:58:52 -07:00
parent c7279a1ea2
commit d7eb1b21a2

View File

@@ -14,7 +14,9 @@ import logging
import struct
from collections import deque
import cocotb
from cocotb.queue import Queue
from cocotb.triggers import RisingEdge
# Command opcodes
@@ -61,19 +63,18 @@ CNDM_CMD_PTP_FLG_OFFSET_FNS = 0x00000010
CNDM_CMD_PTP_FLG_SET_PERIOD = 0x00000080
class Cq:
class Eq:
def __init__(self, driver, port):
self.driver = driver
self.log = driver.log
self.port = port
self.irqn = None
self.log_size = 0
self.size = 0
self.size_mask = 0
self.stride = 0
self.cqn = None
self.eqn = None
self.enabled = False
self.buf_size = 0
@@ -81,10 +82,9 @@ class Cq:
self.buf_dma = 0
self.buf = None
self.eq = None
self.irqn = None
self.src_ring = None
self.handler = None
self.cq_table = {}
self.cons_ptr = None
@@ -92,11 +92,9 @@ class Cq:
self.hw_regs = self.driver.hw_regs
async def open(self, irqn, size):
if self.cqn is not None:
if self.eqn is not None:
raise Exception("Already open")
self.irqn = irqn
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
@@ -111,13 +109,177 @@ class Cq:
self.cons_ptr = 0
self.irqn = irqn
self.cq_table = {}
rsp = await self.driver.exec_cmd(struct.pack("<HHLLLLLLLQQLLLL",
0, # rsvd
CNDM_CMD_OP_CREATE_EQ, # opcode
0x00000000, # flags
self.port.index, # port
0, # eqn
self.irqn, # irqn
0, # pd
self.log_size, # size
0, # dboffs
self.buf_dma, # base addr
0, # ptr2
0, # prod_ptr
0, # cons_ptr
0, # rsvd
0, # rsvd
))
rsp_unpacked = struct.unpack("<HHLLLLLLLQQLLLL", rsp)
print(rsp_unpacked)
self.eqn = rsp_unpacked[4]
self.db_offset = rsp_unpacked[8]
if self.db_offset == 0:
self.eqn = None
self.db_offset = None
self.log.error("Failed to allocate EQ")
return
await self.write_cons_ptr_arm()
self.log.info("Opened EQ %d", self.eqn)
self.log.info("Using doorbell at offset 0x%08x", self.db_offset)
self.enabled = True
async def close(self):
if self.eqn is None:
return
self.enabled = False
rsp = await self.driver.exec_cmd(struct.pack("<HHLLLLLLLQQLLLL",
0, # rsvd
CNDM_CMD_OP_DESTROY_EQ, # opcode
0x00000000, # flags
self.port.index, # port
self.eqn, # eqn
0, # irqn
0, # pd
0, # size
0, # dboffs
0, # base addr
0, # ptr2
0, # prod_ptr
0, # cons_ptr
0, # rsvd
0, # rsvd
))
self.eqn = None
# TODO free buffer
def attach_cq(self, cq):
self.cq_table[cq.cqn] = cq
def detach_cq(self, cq):
del self.cq_table[cq.cqn]
async def write_cons_ptr(self):
await self.hw_regs.write_dword(self.db_offset, self.cons_ptr & 0xffff)
async def write_cons_ptr_arm(self):
await self.hw_regs.write_dword(self.db_offset, (self.cons_ptr & 0xffff) | 0x80000000)
async def process_eq(self):
self.log.info("Process EQ")
eq_cons_ptr = self.cons_ptr
eq_index = eq_cons_ptr & self.size_mask
while True:
# event_data = struct.unpack_from("<HHLLLLLLL", self.buf, eq_index*self.stride)
event_data = struct.unpack_from("<HHLLL", self.buf, eq_index*self.stride)
self.log.info("EQ %d index %d data: %s", self.eqn, eq_index, repr(event_data))
if bool(event_data[-1] & 0x80000000) == bool(eq_cons_ptr & self.size):
self.log.info("EQ %d empty", self.eqn)
break
if event_data[1] == 0x0000:
# completion
self.log.info("Event from CQ %d", event_data[2])
cq = self.cq_table[event_data[2]]
await cq.handler(cq)
eq_cons_ptr += 1
eq_index = eq_cons_ptr & self.size_mask
self.cons_ptr = eq_cons_ptr
await self.write_cons_ptr_arm()
class Cq:
def __init__(self, driver, port):
self.driver = driver
self.log = driver.log
self.port = port
self.log_size = 0
self.size = 0
self.size_mask = 0
self.stride = 0
self.cqn = None
self.enabled = False
self.buf_size = 0
self.buf_region = None
self.buf_dma = 0
self.buf = None
self.eq = None
self.irqn = None
self.src_ring = None
self.handler = None
self.cons_ptr = None
self.db_offset = None
self.hw_regs = self.driver.hw_regs
async def open(self, eq, size):
if self.cqn is not None:
raise Exception("Already open")
self.log_size = size.bit_length() - 1
self.size = 2**self.log_size
self.size_mask = self.size-1
self.stride = 16
self.buf_size = self.size*self.stride
self.buf_region = self.driver.pool.alloc_region(self.buf_size)
self.buf_dma = self.buf_region.get_absolute_address(0)
self.buf = self.buf_region.mem
self.buf[0:self.buf_size] = b'\x00'*self.buf_size
self.cons_ptr = 0
if isinstance(eq, Eq):
self.eq = eq
dqn = eq.eqn
else:
self.irqn = eq
dqn = eq | 0x80000000
rsp = await self.driver.exec_cmd(struct.pack("<HHLLLLLLLQQLLLL",
0, # rsvd
CNDM_CMD_OP_CREATE_CQ, # opcode
0x00000000, # flags
self.port.index, # port
0, # cqn
self.irqn, # eqn
dqn, # eqn
0, # pd
self.log_size, # size
0, # dboffs
@@ -140,6 +302,9 @@ class Cq:
self.log.error("Failed to allocate CQ")
return
if self.eq:
self.eq.attach_cq(self)
await self.write_cons_ptr_arm()
self.log.info("Opened CQ %d", self.cqn)
@@ -575,6 +740,9 @@ class Port:
self.index = index
self.hw_regs = driver.hw_regs
self.eq_count = 1
self.eq = []
self.rxq_count = 1
self.rxq = []
@@ -584,12 +752,17 @@ class Port:
self.rx_queue = Queue()
async def init(self):
for k in range(self.eq_count):
eq = Eq(self.driver, self)
await eq.open(self.index, 256)
self.eq.append(eq)
await self.open()
async def open(self):
for k in range(self.rxq_count):
cq = Cq(self.driver, self)
await cq.open(self.index, 256)
await cq.open(self.eq[0], 256)
q = Rq(self.driver, self)
await q.open(cq, 256)
@@ -598,7 +771,7 @@ class Port:
for k in range(self.txq_count):
cq = Cq(self.driver, self)
await cq.open(self.index, 256)
await cq.open(self.eq[0], 256)
q = Sq(self.driver, self)
await q.open(cq, 256)
@@ -614,12 +787,36 @@ class Port:
async def recv_nowait(self):
return self.rx_queue.get_nowait()
async def interrupt_handler(self):
self.log.info("Interrupt")
for q in self.rxq:
await q.cq.handler(q.cq)
for q in self.txq:
await q.cq.handler(q.cq)
class Interrupt:
def __init__(self, index, handler=None):
self.index = index
self.queue = Queue()
self.handler = handler
self.signal = None
cocotb.start_soon(self._run())
@classmethod
def from_edge(cls, index, signal, handler=None):
obj = cls(index, handler)
obj.signal = signal
cocotb.start_soon(obj._run_edge())
return obj
async def interrupt(self):
self.queue.put_nowait(None)
async def _run(self):
while True:
await self.queue.get()
if self.handler:
await self.handler(self.index)
async def _run_edge(self):
while True:
await RisingEdge(self.signal)
await self.interrupt()
class Driver:
@@ -630,6 +827,8 @@ class Driver:
self.pool = None
self.hw_regs = None
self.irq_list = []
self.port_count = None
self.ports = []
@@ -685,6 +884,12 @@ class Driver:
self.hw_regs = dev.bar_window[0]
# set up MSI
for index in range(32):
irq = Interrupt(index, self.interrupt_handler)
self.dev.request_irq(index, irq.interrupt)
self.irq_list.append(irq)
await self.init_common()
async def init_common(self):
@@ -882,7 +1087,6 @@ class Driver:
for k in range(self.port_count):
port = Port(self, k)
await port.init()
self.dev.request_irq(k, port.interrupt_handler)
self.ports.append(port)
@@ -941,6 +1145,24 @@ class Driver:
a[k] = await self.hw_regs.read_dword(0x10040+k*4)
return a.tobytes()
async def interrupt_handler(self, irqn):
self.log.info("Interrupt handler start (IRQ %d)", irqn)
for p in self.ports:
if p.eq:
# using EQs
for eq in p.eq:
if eq.irqn == irqn:
await eq.process_eq()
else:
# using IRQs directly from CQs
for q in p.rxq:
if q.cq.irqn == irqn:
await q.cq.handler(q.cq)
for q in p.txq:
if q.cq.irqn == irqn:
await q.cq.handler(q.cq)
self.log.info("Interrupt handler end (IRQ %d)", irqn)
def alloc_pkt(self):
if self.free_packets:
return self.free_packets.popleft()