initial commit

This commit is contained in:
2023-09-10 13:23:07 +02:00
commit 984c188c20
13 changed files with 474 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
.idea/
__pycache__
*.pyc
*.img

12
Pipfile Normal file
View File

@@ -0,0 +1,12 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
[dev-packages]
[requires]
python_version = "3.11"
python_full_version = "3.11.3"

0
diskutil/__init__.py Normal file
View File

49
diskutil/blockdev.py Normal file
View File

@@ -0,0 +1,49 @@
import math
from typing import IO
from threading import Lock
class BlockDevice:
sector_size: int
def read_sector(self, lba: int, count: int) -> bytes:
raise NotImplementedError()
def read_bytes(self, offset: int, count: int):
lba_start = math.floor(offset/self.sector_size)
lba_end = math.ceil(offset + count / self.sector_size)
lba_count = lba_end - lba_start
sectors = self.read_sector(lba_start, lba_count)
return sectors[offset % self.sector_size:count]
class OffsetBlockDevice(BlockDevice):
def __init__(self, actual_block_device: BlockDevice, lba_offset: int):
self.sector_size = actual_block_device.sector_size
self._actual_block_device = actual_block_device
self._lba_offset = lba_offset
def read_sector(self, lba: int, count: int) -> bytes:
return self._actual_block_device.read_sector(lba + self._lba_offset, count)
class IOBlockDevice(BlockDevice):
def __init__(self, io: IO[bytes], sector_size: int):
self.sector_size = sector_size
self._io = io
self._lock = Lock()
def read_sector(self, lba: int, count: int) -> bytes:
with self._lock:
self._io.seek(self.sector_size * lba)
return self._io.read(self.sector_size * count)
def block_device_from_file(filename: str) -> IOBlockDevice:
f = open(filename, mode='rb')
return IOBlockDevice(f, 512)
__all__ = ['BlockDevice', 'IOBlockDevice', 'block_device_from_file']

56
diskutil/manager.py Normal file
View File

@@ -0,0 +1,56 @@
import string
from dataclasses import dataclass
from typing import Optional, List
from .blockdev import BlockDevice, OffsetBlockDevice
from .part import partition_schemes
from .vfs import vfs_schemes, Vfs
@dataclass
class Drive:
name: str
parent: Optional['Drive']
block_device: BlockDevice
pass
class Manager:
def __init__(self):
self.drives: List[Drive] = []
self.filesystems: List[Vfs] = []
self.last_drive_letter = 0
def _get_next_drive_name(self) -> str:
next_letter = string.ascii_lowercase[self.last_drive_letter]
self.last_drive_letter += 1
return f'sd{next_letter}'
def add_device(self, block_device: BlockDevice):
drive = Drive(self._get_next_drive_name(), None, block_device)
self.drives.append(drive)
self.scan_drive(drive)
def scan_drive(self, drive: Drive):
partition_found = False
for part_scheme in partition_schemes:
try:
part = part_scheme.from_blockdevice(drive.block_device)
except ValueError:
continue
for i, partition in enumerate(part.get_partitions()):
part_drive = Drive(f'{drive.name}{i}', drive, OffsetBlockDevice(drive.block_device, partition.offset_lba))
self.drives.append(
part_drive
)
self.scan_drive(part_drive)
partition_found = True
if not partition_found:
for vfs in vfs_schemes:
try:
fs = vfs.from_blockdevice(drive.block_device)
self.filesystems.append(fs)
except ValueError as e:
print(e)
continue

73
diskutil/offsetio.py Normal file
View File

@@ -0,0 +1,73 @@
import os
from types import TracebackType
from typing import IO, AnyStr, Type, Iterator, Iterable
class OffsetIO(IO[bytes]):
def __init__(self, actual: IO[bytes], offset: int):
self._offset = offset
self._actual = actual
def close(self) -> None:
self._actual.close()
def fileno(self) -> int:
# todo ok?
return self._actual.fileno()
def flush(self) -> None:
self._actual.flush()
def isatty(self) -> bool:
return self._actual.isatty()
def read(self, __n: int = ...) -> AnyStr:
return self._actual.read(__n)
def readable(self) -> bool:
return self._actual.readable()
def readline(self, __limit: int = ...) -> AnyStr:
return self._actual.readline(__limit)
def readlines(self, __hint: int = ...) -> list[AnyStr]:
return self._actual.readlines(__hint)
def seek(self, __offset: int, __whence: int = ...) -> int:
if __whence == os.SEEK_SET or __whence is None:
return self._actual.seek(self._offset + __offset, __whence)
elif __whence == os.SEEK_CUR:
return self._actual.seek(__offset, __whence)
else:
raise NotImplementedError(f'__whence is not supported by offset io {__whence}')
def seekable(self) -> bool:
return self._actual.seekable()
def tell(self) -> int:
return self._actual.tell() - self._offset
def truncate(self, __size: int | None = ...) -> int:
return self._actual.truncate(__size)
def writable(self) -> bool:
return self._actual.writable()
def write(self, __s: AnyStr) -> int:
return self._actual.write(__s)
def writelines(self, __lines: Iterable[AnyStr]) -> None:
return self._actual.writelines(__lines)
def __next__(self) -> AnyStr:
return self._actual.__next__()
def __iter__(self) -> Iterator[AnyStr]:
return self.__iter__()
def __enter__(self) -> IO[AnyStr]:
return self.__enter__()
def __exit__(self, __t: Type[BaseException] | None, __value: BaseException | None,
__traceback: TracebackType | None) -> None:
return self.__exit__(__t, __value, __traceback)

12
diskutil/part/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
from typing import List, Type
from .generic import PartitionScheme
from .msdos import Mbr
partition_schemes: List[Type[PartitionScheme]] = [
Mbr
]
__all__ = ['partition_schemes', 'Mbr', 'PartitionScheme']

25
diskutil/part/generic.py Normal file
View File

@@ -0,0 +1,25 @@
from dataclasses import dataclass
from typing import List, Dict
from ..blockdev import BlockDevice
@dataclass
class Partition:
offset_lba: int
offset_bytes: int
count_lba: int
count_bytes: int
metadata: Dict[str, str]
class PartitionScheme:
@staticmethod
def from_blockdevice(dev: BlockDevice) -> 'PartitionScheme':
raise NotImplementedError()
def get_partitions(self) -> List[Partition]:
raise NotImplementedError()
__all__ = ['PartitionScheme', 'Partition']

76
diskutil/part/msdos.py Normal file
View File

@@ -0,0 +1,76 @@
from dataclasses import dataclass
from struct import unpack_from
from typing import List
from .generic import PartitionScheme, Partition
from ..blockdev import BlockDevice
MBR_SIZE = 512
MBR_MAGIC = b'\x55\xaa'
@dataclass
class MbrPartitionTableEntry:
drive_attributes: int
chs_address: bytes
partition_type: int
chs_address_end: bytes
lba_partition_start: int
number_of_sectors: int
@staticmethod
def from_bytes(data: bytes) -> "MbrPartitionTableEntry":
return MbrPartitionTableEntry(*unpack_from("<B3sb3sII", data))
def present(self) -> bool:
return not self.absent()
def absent(self) -> bool:
return self.drive_attributes == 0 \
and self.chs_address == b'\0\0\0' \
and self.partition_type == 0 \
and self.chs_address_end == b'\0\0\0' \
and self.lba_partition_start == 0 \
and self.number_of_sectors == 0
def get_partition(self) -> Partition:
if not self.lba_partition_start or not self.number_of_sectors:
raise NotImplementedError('CHS to LBA not implemented')
return Partition(self.lba_partition_start, 0, self.number_of_sectors, 0, {
'mbr_drive_attributes': str(self.drive_attributes),
'mbr_partition_type': str(self.partition_type),
})
@dataclass
class Mbr(PartitionScheme):
bootstrap: bytes
unique_disk_id: bytes
partition_table: List[MbrPartitionTableEntry]
magic: bytes
def get_partitions(self) -> List[Partition]:
return [part.get_partition() for part in self.partition_table if part.present()]
@staticmethod
def from_blockdevice(dev: BlockDevice):
return Mbr.from_bytes(dev.read_bytes(0, MBR_SIZE))
@staticmethod
def from_bytes(data: bytes) -> "Mbr":
bootstrap, disk_id, \
part_1, part_2, part_3, part_4, \
magic = unpack_from("<440sI2x16s16s16s16s2s", data)
if magic != MBR_MAGIC:
raise ValueError('Not a valid MBR')
return Mbr(
bootstrap, disk_id,
[MbrPartitionTableEntry.from_bytes(p) for p in [part_1, part_2, part_3, part_4]],
magic,
)
__all__ = ['Mbr']

8
diskutil/vfs/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import List, Type
from .ext import ExtFs
from .generic import Vfs
vfs_schemes: List[Type[Vfs]] = [
ExtFs
]

134
diskutil/vfs/ext.py Normal file
View File

@@ -0,0 +1,134 @@
from dataclasses import dataclass
from struct import unpack_from
from .generic import Vfs
from ..blockdev import BlockDevice
EXT_HEADER_OFFSET = 1024
EXT_HEADER_SIZE = 1024
@dataclass
class ExtFs(Vfs):
no_inodes: int
no_blocks: int
no_reserved_super: int
no_unallocated_blocks: int
no_unallocated_inodes: int
super_block: int
block_size_log2_m10: int
fragment_size_log2_m10: int
no_blocks_per_group: int
no_fragments_per_group: int
no_inodes_per_group: int
last_mounted: int
last_written: int
mount_count_since_fschk: int
mount_count_since_fschk_max: int
signature: int
fs_state: int
error_behavior: int
minor_version: int
last_fschk: int
interval_fschk: int
os_id: int
major_version: int
reserved_uid: int
reserved_gid: int
first_nonreserved_inode: int
inode_size: int
bg_superblock: int
optional_features: int
required_features: int
ro_features: int
fsid: bytes
fsname: str
last_mount_path: str
compression: int
no_prealloc_file: int
no_prealloc_dir: int
journal_id: int
journal_inode: int
journal_device: int
orphan_list: int
@staticmethod
def from_blockdevice(device: BlockDevice) -> 'Vfs':
return ExtFs.from_bytes(device.read_bytes(EXT_HEADER_OFFSET, EXT_HEADER_SIZE))
@staticmethod
def from_bytes(data: bytes) -> 'Vfs':
(
no_inodes, no_blocks,
no_reserved_super,
no_unallocated_blocks, no_unallocated_inodes,
super_block,
block_size_log2_m10, fragment_size_log2_m10,
no_blocks_per_group, no_fragments_per_group, no_inodes_per_group,
last_mounted, last_written,
mount_count_since_fschk, mount_count_since_fschk_max,
signature, fs_state,
error_behavior, minor_version,
last_fschk, interval_fschk,
os_id, major_version,
reserved_uid, reserved_gid,
first_nonreserved_inode,
inode_size, bg_superblock,
optional_features, required_features, ro_features,
fsid,
fsname,
last_mount_path,
compression,
no_prealloc_file, no_prealloc_dir,
journal_id, journal_inode, journal_device,
orphan_list) = unpack_from('<II'
'I'
'II'
'I'
'II'
'III'
'II'
'HH'
'HH'
'HH'
'II'
'II'
'HH'
'I'
'HH'
'III'
'16s'
'16s'
'64s'
'I'
'BB'
'xx'
'16s'
'II'
'I'
'787x', data)
return ExtFs(
no_inodes, no_blocks,
no_reserved_super,
no_unallocated_blocks, no_unallocated_inodes,
super_block,
block_size_log2_m10, fragment_size_log2_m10,
no_blocks_per_group, no_fragments_per_group, no_inodes_per_group,
last_mounted, last_written,
mount_count_since_fschk, mount_count_since_fschk_max,
signature, fs_state,
error_behavior, minor_version,
last_fschk, interval_fschk,
os_id, major_version,
reserved_uid, reserved_gid,
first_nonreserved_inode,
inode_size, bg_superblock,
optional_features, required_features, ro_features,
fsid,
fsname,
last_mount_path,
compression,
no_prealloc_file, no_prealloc_dir,
journal_id, journal_inode, journal_device,
orphan_list
)

9
diskutil/vfs/generic.py Normal file
View File

@@ -0,0 +1,9 @@
from typing import Type, List
from ..blockdev import BlockDevice
class Vfs:
@staticmethod
def from_blockdevice(device: BlockDevice) -> 'Vfs':
raise NotImplementedError()

16
test.py Normal file
View File

@@ -0,0 +1,16 @@
from diskutil.manager import Manager
from diskutil.blockdev import block_device_from_file
def main():
bd = block_device_from_file('test.img')
m = Manager()
m.add_device(bd)
print("done")
if __name__ == '__main__':
main()