Source code for boa.code.module

from boa.code import pyop
from boa.code.method import method as BoaMethod
from boa.code.action import action as BoaAction
from boa.code.appcall import appcall as BoaAppcall

from boa.util import BlockType, get_block_type
from bytecode import UNSET, Bytecode, BasicBlock, ControlFlowGraph, dump_bytecode, Label
from boa.interop import VMOp, Neo
import importlib
from logzero import logger
from collections import OrderedDict
import os
import sys
import hashlib
import zipfile 
from boa import __version__
import json


[docs]class Module(object): bc = None cfg = None blocks = None methods = None actions = None app_call_registrations = None path = None all_vm_tokens = OrderedDict() module_name = '' to_import = None _extra_instr = None _local_methods = None abi_methods = {} abi_entry_point = None @property def extra_instructions(self): return self._extra_instr @property def local_methods(self): return self._local_methods
[docs] @staticmethod def ImportFromBlock(block: BasicBlock, current_file_path): mpath = None mnames = [] filepath = os.path.dirname(os.path.abspath(current_file_path)) sys.path.append(filepath) for index, instr in enumerate(block): if instr.opcode == pyop.IMPORT_NAME: mpath = instr.arg elif instr.opcode == pyop.STORE_NAME: if instr.arg not in mnames: mnames.append(instr.arg) elif instr.opcode == pyop.IMPORT_STAR: mnames = ['*'] # Don't load the abi module when imported if 'boa.abi' in mpath: return pymodule = importlib.import_module(mpath, mpath) filename = pymodule.__file__ return Module(filename, mpath, mnames)
@property def main(self): """ Return the default method in this module. :return: the default method in this module :rtype: ``boa.code.method.Method`` """ for m in self.methods: if m.name in ['Main', 'main']: return m if len(self.methods): return self.methods[0] return None @property def orderered_methods(self): """ An ordered list of methods :return: A list of ordered methods is this module :rtype: list """ oms = [] self.methods.reverse() if self.main: oms = [self.main] for m in self.methods: if m == self.main: continue oms.append(m) return oms
[docs] def method_by_name(self, method_name): """ Look up a method by its name from the module ``methods`` list. :param method_name: the name of the method to look up :type method_name: str :return: the method ( if it is found) :rtype: ``boa.code.method.Method`` """ # print("METHODS FOR MODULE: %s " % self.path) for m in self.methods: if m.full_name == method_name or m.name == method_name: return m return None
[docs] def has_method(self, full_name): for m in self.methods: if m.full_name == full_name: return True return False
def __init__(self, path: str, module_name='', to_import=['*']): self.path = path self.to_import = to_import self.module_name = module_name self._local_methods = [] self.abi_methods = {} self.abi_entry_point = None source = open(path, 'rb') compiled_source = compile(source.read(), path, 'exec') self.bc = Bytecode.from_code(compiled_source) self.cfg = ControlFlowGraph.from_bytecode(self.bc) source.close() self.build()
[docs] def build(self): self.blocks = [] self.methods = [] self.actions = [] self.app_call_registrations = [] for block in self.cfg: start_ln = block[0].lineno for index, instr in enumerate(block): if instr.lineno != start_ln: self.cfg.split_block(block, index) self._extra_instr = [] new_method_blks = [] for blk in self.cfg: type = get_block_type(blk) if type == BlockType.MAKE_FUNCTION: new_method_blks.append(blk) elif type == BlockType.IMPORT_ITEM: new_module = Module.ImportFromBlock(blk, self.path) if new_module: for method in new_module.methods: if not self.has_method(method.full_name): self.methods.append(method) for method in new_module.local_methods: self.methods.append(method) self._extra_instr = self._extra_instr + new_module.extra_instructions elif type == BlockType.UNKNOWN: self._extra_instr.append(blk) elif type == BlockType.CALL_FUNCTION: self._extra_instr.append(blk) elif type == BlockType.ACTION_REG: self.actions.append(BoaAction(blk)) elif type == BlockType.MAKE_CLASS: pass elif type == BlockType.APPCALL_REG: self.app_call_registrations.append(BoaAppcall(blk)) for m in new_method_blks: new_method = BoaMethod(self, m, self.module_name, self._extra_instr) if not self.has_method(new_method.full_name): self.methods.append(new_method)
[docs] def write(self): """ Write the current module to a byte string. Note that if you are using the ``Compiler.load('path/to/file.py')``, you must call ``module.write()`` before any inspection of the module is possible. :return: A bytestring of representing the current module :rtype: bytes """ self.link_methods() return self.write_methods()
[docs] def write_methods(self): """ Write all methods in the current module to a byte string. :return: A bytestring of all current methods in this module :rtype: bytes """ b_array = bytearray() for key, vm_token in self.all_vm_tokens.items(): b_array.append(vm_token.out_op) if vm_token.data is not None and vm_token.vm_op != VMOp.NOP: b_array = b_array + vm_token.data # self.to_s() return b_array
[docs] def to_s(self): """ this method is used to print the output of the executable in a readable/ tokenized format. sample usage: >>> from boa.compiler import Compiler >>> module = Compiler.load('./boa/tests/src/LambdaTest.py').default >>> module.write() >>> print(module.to_s()) 12 3 LOAD_CONST 9 [data] 4 STORE_FAST j [data] 22 11 LOAD_FAST j [data] 17 CALL_FUNCTION Main.<locals>.q_1 \ [<boa.code.pytoken.PyToken object at 0x10cb53c50>] [data] 22 20 STORE_FAST m [data] 24 27 243 b'\x03\x00' [data] 3 30 LOAD_FAST m [data] 35 NOP [data] 36 241 [data] 37 242 [data] 38 RETURN_VALUE [data] 20 49 243 b'\x03\x00' [data] 3 52 LOAD_FAST x [data] 57 LOAD_CONST 1 [data] 58 BINARY_ADD [data] 59 NOP [data] 60 241 [data] 61 242 [data] 62 RETURN_VALUE [data] """ # Initialize if needed if self.all_vm_tokens is None: self.link_methods() lineno = 0 output = [] pstart = True for i, (key, value) in enumerate(self.all_vm_tokens.items()): if value.pytoken: pt = value.pytoken do_print_line_no = False to_label = None from_label = ' ' if pt.lineno != lineno: output.append("\n") lineno = pt.lineno do_print_line_no = True ds = '' if value.data is not None: try: ds = int.from_bytes(value.data, 'little', signed=True) except Exception as e: pass if type(ds) is not int and len(ds) < 1: try: ds = value.data.decode('utf-8') except Exception as e: pass lno = "{:<10}".format( pt.lineno if do_print_line_no or pstart else '') addr = "{:<5}".format(key) op = "{:<20}".format(pt.instruction.name) # If this is a number, it is likely a custom python opcode, get the name if str(pt.pyop).isnumeric(): opname = pyop.to_name(int(str(pt.pyop))).replace('HAVE_ARGUMENT', 'STORE_NAME').replace('YIELD_VALUE', 'REVERSE') if opname is not None: op = "{:<20}".format(opname) arg = "{:<50}".format(pt.arg_str) data = "[data] {:<20}".format(ds) output.append("%s%s%s%s%s%s" % (lno, from_label, addr, op, arg, data)) pstart = False return "\n".join(output)
[docs] def include_abi_method(self, method, types): num_methods = len(method.args) num_types = len(types) args_types = {} # params and return types if num_types == num_methods + 1: for index, arg in enumerate(method.args): args_types[arg] = types[index] args_types['return'] = types[num_types - 1] else: raise Exception("Number of arguments for the abi is incompatible with the function '%s'" % method.full_name) self.abi_methods[method.full_name] = args_types
[docs] def set_abi_entry_point(self, method, types): if self.abi_entry_point is None: self.include_abi_method(method, types) self.abi_entry_point = method.full_name else: raise Exception("Only one method should be entry point")
[docs] def export_debug(self, output_path): """ this method is used to generate a debug map for NEO debugger """ file = open(output_path, 'rb') file_hash = hashlib.md5(file.read()).hexdigest() file.close() avm_name = os.path.splitext(os.path.basename(output_path))[0] debug_info = self.generate_avmdbgnfo(avm_name, file_hash) debug_json_filename = os.path.basename(output_path.replace('.avm', '.debug.json')) avmdbgnfo_filename = output_path.replace('.avm', '.avmdbgnfo') with zipfile.ZipFile(avmdbgnfo_filename, 'w', zipfile.ZIP_DEFLATED) as avmdbgnfo: avmdbgnfo.writestr(debug_json_filename, debug_info)
[docs] def generate_avmdbgnfo(self, avm_name, file_hash): if self.all_vm_tokens is None: self.link_methods() data = {} files = [] methods = [] events = [] for m in self.methods: if m.is_interop: continue method = {} method['id'] = m.id.urn method['name'] = "{0},{1}".format(m.module.module_name, m.name) (_, start) = next(x for x in m.vm_tokens.items()) (_, end) = next(x for x in reversed(m.vm_tokens.items())) method['range'] = '{}-{}'.format(start.addr, end.addr) method['params'] = ["{},".format(a) for a in m.args] method['return'] = "" argCount = len(m.args) method['variables'] = ["{},".format(a[0]) for a in m.scope.items() if a[1] >= argCount] tokens = [] last_lineno = None method['sequence-points'] = tokens for _, (_, value) in enumerate(m.vm_tokens.items()): if value.pytoken: pt = value.pytoken if pt.file not in files: files.append(pt.file) fileIndex = files.index(pt.file) lineno = pt.method_lineno + pt.lineno if last_lineno != lineno: tokens.append("{}[{}]{}:0-{}:0".format(value.addr, fileIndex, lineno, lineno)) last_lineno = lineno methods.append(method) data['entrypoint'] = self.main.id.urn data['documents'] = files data['methods'] = methods data['events'] = events json_data = json.dumps(data, indent=4) return json_data
[docs] def export_abi_json(self, output_path): """ this method is used to generate a debug map for NEO debugger """ with open(output_path, 'rb') as file: script = Neo.to_script_hash(file.read()) file_hash = Neo.to_hex_str(script) avm_name = os.path.splitext(os.path.basename(output_path))[0] abi_info = self.generate_abi_json(avm_name, file_hash) abi_json_filename = output_path.replace('.avm', '.abi.json') with open(abi_json_filename, 'w+') as out_file: out_file.write(abi_info)
[docs] def generate_abi_json(self, avm_name, file_hash): # Initialize if needed if self.all_vm_tokens is None: self.link_methods() data = {} functions = [] events = [] data['hash'] = file_hash if self.abi_entry_point is not None: data['entrypoint'] = self.abi_entry_point elif 'main' in self.abi_methods: data['entrypoint'] = 'main' elif 'Main' in self.abi_methods: data['entrypoint'] = 'Main' elif len(self.abi_methods) > 0: data['entrypoint'] = self.abi_methods.get(0) else: data['entrypoint'] = self.main.name data['functions'] = functions data['events'] = events for method in self.abi_methods: types = self.abi_methods[method] params = [] for t in types: if t != 'return': params.append({'name': t, 'type': types[t]}) function = { 'name': method, 'parameters': params, 'returntype': types['return'] } functions.append(function) print() json_data = json.dumps(data, indent=4) return json_data
[docs] def generate_debug_json(self, avm_name, file_hash): # Initialize if needed if self.all_vm_tokens is None: self.link_methods() lineno = 0 data = {} data['avm'] = {'name': avm_name, 'hash': file_hash} data['compiler'] = {'name': 'neo-boa', 'version': __version__} files = {} breakpoints = [] data['files'] = files map = [] start_ofs = -1 last_ofs = 0 fileid = 0 for i, (key, value) in enumerate(self.all_vm_tokens.items()): if value.pytoken: pt = value.pytoken if pt.file: if pt.file not in files.keys(): fileid = len(files.values()) + 1 files[pt.file] = fileid else: fileid = files[pt.file] if pt.lineno != lineno: if start_ofs >= 0: map.append({'start': start_ofs, 'end': key - 1, 'file': fileid, 'method': pt.method_name, 'line': lineno, 'file_line_no': pt.method_lineno + lineno}) start_ofs = key lineno = pt.lineno if pt.is_breakpoint: breakpoints.append(start_ofs) last_ofs = key if last_ofs >= 0: map.append({'start': start_ofs, 'end': last_ofs, 'file': fileid, 'line': lineno}) data['map'] = map data['breakpoints'] = breakpoints data['files'] = [{'id': val, 'url': os.path.abspath(key)} for key, val in files.items()] json_data = json.dumps(data, indent=4) return json_data