import collections import enum import logging import struct from typing import Dict, List, Optional, Union, Tuple from pjvm.unpacker import Unpacker from .classloader import ClassLoader from .expressions import METHOD_SIGNATURE LOGGER = logging.getLogger(__name__) exception_table_tuple = collections.namedtuple('exception_table', ['start_pc', 'end_pc', 'handler_pc', 'catch_type']) class ClassAccessFlags(enum.IntFlag): ACC_PUBLIC = 0x0001 # Declared public; may be accessed from outside its package. ACC_FINAL = 0x0010 # Declared final; no subclasses allowed. ACC_SUPER = 0x0020 # Treat superclass methods specially when invoked by the invokespecial instruction. ACC_INTERFACE = 0x0200 # Is an interface, not a class. ACC_ABSTRACT = 0x0400 # Declared abstract; must not be instantiated. ACC_SYNTHETIC = 0x1000 # Declared synthetic; not present in the source code. ACC_ANNOTATION = 0x2000 # Declared as an annotation type. ACC_ENUM = 0x4000 # Declared as an enum type. class FieldAccessFlags(enum.IntFlag): ACC_PUBLIC = 0x0001 # Declared public; may be accessed from outside its package. ACC_PRIVATE = 0x0002 # Declared private; usable only within the defining class. ACC_PROTECTED = 0x0004 # Declared protected; may be accessed within subclasses. ACC_STATIC = 0x0008 # Declared static. ACC_FINAL = 0x0010 # Declared final; never directly assigned to after object construction (JLS §17.5). ACC_VOLATILE = 0x0040 # Declared volatile; cannot be cached. ACC_TRANSIENT = 0x0080 # Declared transient; not written or read by a persistent object manager. ACC_SYNTHETIC = 0x1000 # Declared synthetic; not present in the source code. ACC_ENUM = 0x4000 # Declared as an element of an enum. class MethodAccessFlags(enum.IntFlag): ACC_PUBLIC = 0x0001 # Declared public; may be accessed from outside its package. ACC_PRIVATE = 0x0002 # Declared private; accessible only within the defining class. ACC_PROTECTED = 0x0004 # Declared protected; may be accessed within subclasses. ACC_STATIC = 0x0008 # Declared static. ACC_FINAL = 0x0010 # Declared final; must not be overridden (§5.4.5). ACC_SYNCHRONIZED = 0x0020 # Declared synchronized; invocation is wrapped by a monitor use. ACC_BRIDGE = 0x0040 # A bridge method, generated by the compiler. ACC_VARARGS = 0x0080 # Declared with variable number of arguments. ACC_NATIVE = 0x0100 # Declared native; implemented in a language other than Java. ACC_ABSTRACT = 0x0400 # Declared abstract; no implementation is provided. ACC_STRICT = 0x0800 # Declared strictfp; floating-point mode is FP-strict. ACC_SYNTHETIC = 0x1000 # Declared synthetic; not present in the source code. KNOWN_ATTRIBUTES = ['SourceFile', 'Code'] NOT_IMPLEMENTED_ATTRIBUTES = ['ConstantValue', 'StackMapTable', 'Exceptions', 'InnerClasses', 'EnclosingMethod', 'Synthetic', 'Signature', 'SourceDebugExtension', 'LineNumberTable', 'LocalVariableTable ', 'LocalVariableTypeTable', 'Deprecated', 'RuntimeVisibleAnnotations', 'RuntimeInvisibleAnnotations', 'RuntimeVisibleParameterAnnotations', 'RuntimeInvisibleParameterAnnotations', 'AnnotationDefault', 'BootstrapMethods'] class Class: methods: Dict[str, "Method"] fields: Dict[str, "Field"] def __init__(self, loader: ClassLoader): self.loader = loader self.methods = {} self.attributes = {} self.fields = {} self.access_flags: ClassAccessFlags = ClassAccessFlags(0) self.this_class: str = "" self.super_class: Optional[str] = None # todo other info self.source_file: Optional[str] = None self.initialize() def initialize(self): self.access_flags = ClassAccessFlags(self.loader.access_flags) self.this_class = self.cp_get_classname(self.loader.this_class) self.super_class = self.cp_get_classname(self.loader.super_class) LOGGER.info(f"Parsing info for {self.access_flags!s} {self.this_class} : {self.super_class}") self.initialize_fields() self.initialize_methods() self.initialize_attributes() def initialize_fields(self): field: Dict[str, Union[int, list]] for field in self.loader.fields: acc = FieldAccessFlags(field['access_flags']) name = self.cp_get_utf8(field['name_index']) descriptor = self.cp_get_utf8(field['descriptor_index']) attributes = field['attributes'] self.fields[name] = Field(acc, name, descriptor, attributes, self) def initialize_methods(self): method: Dict[str, Union[int, list]] for method in self.loader.methods: acc = MethodAccessFlags(method['access_flags']) name = self.cp_get_utf8(method['name_index']) descriptor = self.cp_get_utf8(method['descriptor_index']) attributes = method['attributes'] self.methods[name] = Method(acc, name, descriptor, attributes, self) def initialize_attributes(self): for attr in self.loader.attributes: attr_name = self.cp_get_utf8(attr['attribute_name_index']) if attr_name in NOT_IMPLEMENTED_ATTRIBUTES: print(f"Attribute {attr_name} found but not implemented. {attr['info'][:10]}") elif attr_name in KNOWN_ATTRIBUTES: getattr(self, f'_attr_{attr_name}')(attr['info']) else: pass # ignore unknowns # noinspection PyPep8Naming def _attr_SourceFile(self, info: bytes): self.source_file = self.cp_get_utf8(struct.unpack('>h', info)[0]) LOGGER.info(f"Found source file {self.source_file} for FULL_NAME") def cp_get(self, index: int) -> dict: return self.loader.constant_pool[index - 1] def cp_get_utf8(self, index: int) -> str: return self.cp_get(index)['value'] def cp_get_classname(self, index: int) -> str: return self.cp_get_utf8(self.cp_get(index)['name_index']) def cp_get_name_type(self, index: int) -> Tuple[str, str]: nt = self.cp_get(index) return self.cp_get_utf8(nt['name_index']), self.cp_get_utf8(nt['descriptor_index']) def cp_get_fieldref(self, index: int) -> Tuple[str, str, str]: # Exactly the same as methodref, only name differs return self.cp_get_methodref(index) def cp_get_methodref(self, index: int) -> Tuple[str, str, str]: """ Get the class name, method name and descriptor Args: index: the index """ mr = self.cp_get(index) name, typename = self.cp_get_name_type(mr['name_and_type_index']) return self.cp_get_classname(mr['class_index']), name, typename class Code: def __init__(self, code: bytes): self.exception_handlers = [] self.attributes = [] unpacker = Unpacker.from_bytes(code) self.max_stack, = unpacker['h'] self.max_locals, = unpacker['h'] code_len, = unpacker['i'] self.code, = unpacker[f'{code_len}s'] exception_table_length, = unpacker['h'] for i in range(exception_table_length): self.exception_handlers.append(exception_table_tuple(*unpacker['hhhh'])) attributes_count, = unpacker['h'] for i in range(attributes_count): attribute_name_index, attribute_length = unpacker['hi'] data, = unpacker[f'{attribute_length}s'] self.attributes.append( {'attribute_name_index': attribute_name_index, 'attribute_length': attribute_length, 'info': data}) class Field: def __init__(self, acc, name, descriptor, attributes, clazz: Class): self.acc = acc self.name = name self.descriptor = descriptor self.attributes = attributes self.clazz = clazz self.parse_attributes() LOGGER.info(f"New field {name} {descriptor}") # todo initialize def parse_attributes(self): LOGGER.info("TODO parse field attributes") pass class Method: def __init__(self, acc, name, descriptor, attributes, clazz: Class): self.acc = acc self.name = name self.descriptor = descriptor self.attributes = attributes self.clazz = clazz self.code: Optional[Code] = None self.return_value: str = '' self.args: List[str] = [] self.parse_descriptor() self.parse_attributes() LOGGER.info(f"New method {self.return_value} {name}({', '.join(self.args)})") def parse_descriptor(self): res = METHOD_SIGNATURE.search(self.descriptor) self.return_value = res.group('RET') self.args = res.group('ARGS').split(';') self.args = list(filter(None, self.args)) # todo initialize def parse_attributes(self): # todo Exceptions, Synthetic, Signature, Deprecated, RuntimeVisibleAnnotations, RuntimeInvisibleAnnotations, RuntimeVisibleParameterAnnotations, RuntimeInvisibleParameterAnnotations, AnnotationDefault for attr in self.attributes: name = self.clazz.cp_get_utf8(attr['attribute_name_index']) if name == 'Code': self.code = Code(attr['info'])