Separate processing coroutines for each ID

This commit is contained in:
Alex Forencich
2021-03-24 17:07:16 -07:00
parent 9e28bd7fbb
commit f991096272

View File

@@ -69,8 +69,8 @@ class AxiMasterWrite(Reset):
self.cur_id = 0
self.active_id = Counter()
self.int_write_resp_command_queue = Queue()
self.current_write_resp_command = None
self.int_write_resp_command_queue = [Queue() for k in range(self.id_count)]
self.current_write_resp_command = [None for k in range(self.id_count)]
self.int_write_resp_queue_list = [Queue() for k in range(self.id_count)]
self.in_flight_operations = 0
@@ -101,6 +101,7 @@ class AxiMasterWrite(Reset):
self._process_write_cr = None
self._process_write_resp_cr = None
self._process_write_resp_id_cr = None
self._init_reset(reset, reset_active_level)
@@ -203,6 +204,10 @@ class AxiMasterWrite(Reset):
if self._process_write_resp_cr is not None:
self._process_write_resp_cr.kill()
self._process_write_resp_cr = None
if self._process_write_resp_id_cr is not None:
for cr in self._process_write_resp_id_cr:
cr.kill()
self._process_write_resp_id_cr = None
self.aw_channel.clear()
self.w_channel.clear()
@@ -222,13 +227,15 @@ class AxiMasterWrite(Reset):
self.current_write_command = None
flush_cmd(cmd)
while not self.int_write_resp_command_queue.empty():
cmd = self.int_write_resp_command_queue.get_nowait()
for q in self.int_write_resp_command_queue:
while not q.empty():
cmd = q.get_nowait()
flush_cmd(cmd)
if self.current_write_resp_command:
cmd = self.current_write_resp_command
self.current_write_resp_command = None
for k in range(len(self.current_write_resp_command)):
if self.current_write_resp_command[k]:
cmd = self.current_write_resp_command[k]
self.current_write_resp_command[k] = None
flush_cmd(cmd)
for q in self.int_write_resp_queue_list:
@@ -246,6 +253,8 @@ class AxiMasterWrite(Reset):
self._process_write_cr = cocotb.fork(self._process_write())
if self._process_write_resp_cr is None:
self._process_write_resp_cr = cocotb.fork(self._process_write_resp())
if self._process_write_resp_id_cr is None:
self._process_write_resp_id_cr = [cocotb.fork(self._process_write_resp_id(i)) for i in range(self.id_count)]
async def _process_write(self):
while True:
@@ -307,7 +316,7 @@ class AxiMasterWrite(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((awid, burst_length))
burst_list.append(burst_length)
aw = self.aw_channel._transaction_obj()
aw.awid = awid
@@ -349,30 +358,31 @@ class AxiMasterWrite(Reset):
cycle_offset = (cycle_offset + num_bytes) % self.byte_width
resp_cmd = AxiWriteRespCmd(cmd.address, len(cmd.data), cmd.size, cycles, cmd.prot, burst_list, cmd.event)
await self.int_write_resp_command_queue.put(resp_cmd)
await self.int_write_resp_command_queue[awid].put(resp_cmd)
self.current_write_command = None
async def _process_write_resp(self):
while True:
cmd = await self.int_write_resp_command_queue.get()
self.current_write_resp_command = cmd
b = await self.b_channel.recv()
bid = int(b.bid)
if self.active_id[bid] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
await self.int_write_resp_queue_list[bid].put(b)
async def _process_write_resp_id(self, bid):
while True:
cmd = await self.int_write_resp_command_queue[bid].get()
self.current_write_resp_command[bid] = cmd
resp = AxiResp.OKAY
user = []
for bid, burst_length in cmd.burst_list:
while self.int_write_resp_queue_list[bid].empty():
b = await self.b_channel.recv()
i = int(b.bid)
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {bid}")
self.int_write_resp_queue_list[i].put_nowait(b)
b = self.int_write_resp_queue_list[bid].get_nowait()
for burst_length in cmd.burst_list:
b = await self.int_write_resp_queue_list[bid].get()
burst_id = int(b.bid)
burst_resp = AxiResp(b.bresp)
@@ -398,7 +408,7 @@ class AxiMasterWrite(Reset):
cmd.event.set(write_resp)
self.current_write_resp_command = None
self.current_write_resp_command[bid] = None
self.in_flight_operations -= 1
@@ -425,8 +435,8 @@ class AxiMasterRead(Reset):
self.cur_id = 0
self.active_id = Counter()
self.int_read_resp_command_queue = Queue()
self.current_read_resp_command = None
self.int_read_resp_command_queue = [Queue() for k in range(self.id_count)]
self.current_read_resp_command = [None for k in range(self.id_count)]
self.int_read_resp_queue_list = [Queue() for k in range(self.id_count)]
self.in_flight_operations = 0
@@ -455,6 +465,7 @@ class AxiMasterRead(Reset):
self._process_read_cr = None
self._process_read_resp_cr = None
self._process_read_resp_id_cr = None
self._init_reset(reset, reset_active_level)
@@ -552,6 +563,10 @@ class AxiMasterRead(Reset):
if self._process_read_resp_cr is not None:
self._process_read_resp_cr.kill()
self._process_read_resp_cr = None
if self._process_read_resp_id_cr is not None:
for cr in self._process_read_resp_id_cr:
cr.kill()
self._process_read_resp_id_cr = None
self.ar_channel.clear()
self.r_channel.clear()
@@ -570,13 +585,15 @@ class AxiMasterRead(Reset):
self.current_read_command = None
flush_cmd(cmd)
while not self.int_read_resp_command_queue.empty():
cmd = self.int_read_resp_command_queue.get_nowait()
for q in self.int_read_resp_command_queue:
while not q.empty():
cmd = q.get_nowait()
flush_cmd(cmd)
if self.current_read_resp_command:
cmd = self.current_read_resp_command
self.current_read_resp_command = None
for k in range(len(self.current_read_resp_command)):
if self.current_read_resp_command[k]:
cmd = self.current_read_resp_command[k]
self.current_read_resp_command[k] = None
flush_cmd(cmd)
for q in self.int_read_resp_queue_list:
@@ -594,6 +611,8 @@ class AxiMasterRead(Reset):
self._process_read_cr = cocotb.fork(self._process_read())
if self._process_read_resp_cr is None:
self._process_read_resp_cr = cocotb.fork(self._process_read_resp())
if self._process_read_resp_id_cr is None:
self._process_read_resp_id_cr = [cocotb.fork(self._process_read_resp_id(i)) for i in range(self.id_count)]
async def _process_read(self):
while True:
@@ -632,7 +651,7 @@ class AxiMasterRead(Reset):
# split on 4k address boundary
burst_length = (min(burst_length*num_bytes, 0x1000-(cur_addr & 0xfff))+num_bytes-1)//num_bytes
burst_list.append((arid, burst_length))
burst_list.append(burst_length)
ar = self.r_channel._transaction_obj()
ar.arid = arid
@@ -656,14 +675,25 @@ class AxiMasterRead(Reset):
cur_addr += num_bytes
resp_cmd = AxiReadRespCmd(cmd.address, cmd.length, cmd.size, cycles, cmd.prot, burst_list, cmd.event)
await self.int_read_resp_command_queue.put(resp_cmd)
await self.int_read_resp_command_queue[arid].put(resp_cmd)
self.current_read_command = None
async def _process_read_resp(self):
while True:
cmd = await self.int_read_resp_command_queue.get()
self.current_read_resp_command = cmd
r = await self.r_channel.recv()
rid = int(r.rid)
if self.active_id[rid] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
await self.int_read_resp_queue_list[rid].put(r)
async def _process_read_resp_id(self, rid):
while True:
cmd = await self.int_read_resp_command_queue[rid].get()
self.current_read_resp_command[rid] = cmd
num_bytes = 2**cmd.size
@@ -680,19 +710,9 @@ class AxiMasterRead(Reset):
first = True
for rid, burst_length in cmd.burst_list:
for burst_length in cmd.burst_list:
for k in range(burst_length):
while self.int_read_resp_queue_list[rid].empty():
r = await self.r_channel.recv()
i = int(r.rid)
if self.active_id[i] <= 0:
raise Exception(f"Unexpected burst ID {rid}")
self.int_read_resp_queue_list[i].put_nowait(r)
r = self.int_read_resp_queue_list[rid].get_nowait()
r = await self.int_read_resp_queue_list[rid].get()
cycle_id = int(r.rid)
cycle_data = int(r.rdata)
@@ -737,7 +757,7 @@ class AxiMasterRead(Reset):
cmd.event.set(read_resp)
self.current_read_resp_command = None
self.current_read_resp_command[rid] = None
self.in_flight_operations -= 1