Add address space abstraction

This commit is contained in:
Alex Forencich
2021-11-16 16:55:53 -08:00
parent f7660e9038
commit b9b9a2da72
2 changed files with 324 additions and 0 deletions

View File

@@ -0,0 +1,320 @@
"""
Copyright (c) 2021 Alex Forencich
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import mmap
from .buddy_allocator import BuddyAllocator
from .utils import hexdump, hexdump_lines, hexdump_str
class MemoryInterface:
def __init__(self, size, base=0, parent=None, **kwargs):
self._parent = parent
self._size = size
self._base = base
super().__init__(**kwargs)
@property
def parent(self):
return self._parent
@property
def size(self):
return self._size
@property
def base(self):
return self._base
def check_range(self, address, length=0):
if address < 0 or address >= self.size:
raise ValueError("address out of range")
if length < 0:
raise ValueError("invalid length")
if address+length > self.size:
raise ValueError("operation out of range")
def get_absolute_address(self, address):
if self.base is None:
return None
self.check_range(address)
return address+self.base
async def _read(self, address, length, **kwargs):
raise NotImplementedError()
async def read(self, address, length, **kwargs):
self.check_range(address, length)
return await self._read(address, length, **kwargs)
async def read_words(self, address, count, byteorder='little', ws=2, **kwargs):
data = bytes(await self.read(address, count*ws, **kwargs))
words = []
for k in range(count):
words.append(int.from_bytes(data[ws*k:ws*(k+1)], byteorder))
return words
async def read_dwords(self, address, count, byteorder='little', **kwargs):
return await self.read_words(address, count, byteorder, 4, **kwargs)
async def read_qwords(self, address, count, byteorder='little', **kwargs):
return await self.read_words(address, count, byteorder, 8, **kwargs)
async def read_byte(self, address, **kwargs):
return (await self.read(address, 1, **kwargs)).data[0]
async def read_word(self, address, byteorder='little', ws=2, **kwargs):
return (await self.read_words(address, 1, byteorder, ws, **kwargs))[0]
async def read_dword(self, address, byteorder='little', **kwargs):
return (await self.read_dwords(address, 1, byteorder, **kwargs))[0]
async def read_qword(self, address, byteorder='little', **kwargs):
return (await self.read_qwords(address, 1, byteorder, **kwargs))[0]
async def _write(self, address, data, **kwargs):
raise NotImplementedError()
async def write(self, address, data, **kwargs):
self.check_range(address, len(data))
await self._write(address, data, **kwargs)
async def write_words(self, address, data, byteorder='little', ws=2, **kwargs):
words = data
data = bytearray()
for w in words:
data.extend(w.to_bytes(ws, byteorder))
await self.write(address, data, **kwargs)
async def write_dwords(self, address, data, byteorder='little', **kwargs):
await self.write_words(address, data, byteorder, 4, **kwargs)
async def write_qwords(self, address, data, byteorder='little', **kwargs):
await self.write_words(address, data, byteorder, 8, **kwargs)
async def write_byte(self, address, data, **kwargs):
await self.write(address, [data], **kwargs)
async def write_word(self, address, data, byteorder='little', ws=2, **kwargs):
await self.write_words(address, [data], byteorder, ws, **kwargs)
async def write_dword(self, address, data, byteorder='little', **kwargs):
await self.write_dwords(address, [data], byteorder, **kwargs)
async def write_qword(self, address, data, byteorder='little', **kwargs):
await self.write_qwords(address, [data], byteorder, **kwargs)
def create_window(self, offset, size):
self.check_range(offset, size)
return Window(self, offset, size, base=self.get_absolute_address(offset))
def create_window_pool(self, offset=None, size=None):
if offset is None:
offset = 0
if size is None:
size = self.size - offset
self.check_range(offset, size)
return WindowPool(self, offset, size, base=self.get_absolute_address(offset))
def __len__(self):
return self._size
class Window(MemoryInterface):
def __init__(self, parent, offset, size, base=0, **kwargs):
super().__init__(size, base=base, parent=parent, **kwargs)
self._offset = offset
@property
def offset(self):
return self._offset
def get_parent_address(self, address):
if address < 0 or address >= self.size:
raise ValueError("address out of range")
return address+self.offset
async def _read(self, address, length, **kwargs):
return await self.parent.read(self.get_parent_address(address), length, **kwargs)
async def _write(self, address, data, **kwargs):
await self.parent.write(self.get_parent_address(address), data, **kwargs)
class WindowPool(Window):
def __init__(self, parent, offset, size, base=None, **kwargs):
super().__init__(parent, offset, size, base=base, **kwargs)
self.allocator = BuddyAllocator(size)
def alloc_window(self, size):
return self.create_window(self.allocator.alloc(size), size)
class Region(MemoryInterface):
def __init__(self, size, **kwargs):
super().__init__(size, **kwargs)
class MemoryRegion(Region):
def __init__(self, size=4096, mem=None, **kwargs):
super().__init__(size, **kwargs)
if mem is None:
mem = mmap.mmap(-1, size)
self.mem = mem
async def _read(self, address, length, **kwargs):
return self.mem[address:address+length]
async def _write(self, address, data, **kwargs):
self.mem[address:address+len(data)] = data
def hexdump(self, address, length, prefix=""):
hexdump(self.mem[address:address+length], prefix=prefix, offset=address)
def hexdump_lines(self, address, length, prefix=""):
return hexdump_lines(self.mem[address:address+length], prefix=prefix, offset=address)
def hexdump_str(self, address, length, prefix=""):
return hexdump_str(self.mem[address:address+length], prefix=prefix, offset=address)
def __getitem__(self, key):
return self.mem[key]
def __setitem__(self, key, value):
self.mem[key] = value
def __bytes__(self):
return bytes(self.mem)
class PeripheralRegion(Region):
def __init__(self, obj, size, **kwargs):
super().__init__(size, **kwargs)
self.obj = obj
async def _read(self, address, length, **kwargs):
try:
return await self.obj.read(address, length, **kwargs)
except TypeError:
return self.obj.read(address, length, **kwargs)
async def _write(self, address, data, **kwargs):
try:
await self.obj.write(address, data, **kwargs)
except TypeError:
self.obj.write(address, data, **kwargs)
class AddressSpace(Region):
def __init__(self, size=2**64, base=0, parent=None, **kwargs):
super().__init__(size=size, base=base, parent=parent, **kwargs)
self.regions = []
def find_regions(self, address, length=1):
regions = []
if address < 0 or address >= self.size:
raise ValueError("address out of range")
if length < 0:
raise ValueError("invalid length")
length = max(length, 1)
for (base, size, translate, region) in self.regions:
if address < base+size and base < address+length:
regions.append((base, size, translate, region))
regions.sort()
return regions
def register_region(self, region, base, size=None, offset=0):
if size is None:
size = region.size
if self.find_regions(base, size):
raise ValueError("overlaps existing region")
region._parent = self
if offset == 0:
region._base = self.get_absolute_address(base)
else:
region._base = None
self.regions.append((base, size, offset, region))
async def read(self, address, length, **kwargs):
regions = self.find_regions(address, length)
data = bytearray()
if not regions:
raise Exception("Invalid address")
for base, size, offset, region in regions:
if base > address:
raise Exception("Invalid address")
seg_addr = address - base
seg_len = min(size-seg_addr, length)
if offset is None:
seg_addr = address
offset = 0
data.extend(bytes(await region.read(seg_addr+offset, seg_len, **kwargs)))
address += seg_len
length -= seg_len
if length > 0:
raise Exception("Invalid address")
return bytes(data)
async def write(self, address, data, **kwargs):
start = 0
length = len(data)
regions = self.find_regions(address, length)
if not regions:
raise Exception("Invalid address")
for base, size, offset, region in regions:
if base > address:
raise Exception("Invalid address")
seg_addr = address - base
seg_len = min(size-seg_addr, length)
if offset is None:
seg_addr = address
offset = 0
await region.write(seg_addr+offset, data[start:start+seg_len], **kwargs)
address += seg_len
start += seg_len
length -= seg_len
if length > 0:
raise Exception("Invalid address")
def create_pool(self, base=None, size=None):
if base is None:
base = 0
if size is None:
size = self.size - base
self.check_range(base, size)
pool = Pool(self, base, size)
self.register_region(pool, base, size)
return pool
class Pool(AddressSpace):
def __init__(self, parent, base, size, **kwargs):
super().__init__(parent=parent, base=base, size=size, **kwargs)
self.allocator = BuddyAllocator(size)
def alloc_region(self, size):
base = self.allocator.alloc(size)
region = MemoryRegion(size)
self.register_region(region, base)
return region