Source code for boa.code.expression

from bytecode import Instr, Compare, Label
from boa.code import pyop
from boa.code.pytoken import PyToken
from bytecode import Bytecode
import ast
import astor


[docs]class Expression(object): updated_blocklist = None block = None container_method = None tokenizer = None next = None methodnames = None ops = None def __init__(self, block, tokenizer, container_method): self.block = block self.tokenizer = tokenizer self.container_method = container_method self.methodnames = [] self.next = None self.ops = [] for item in self.block: if isinstance(item, Instr): self.ops.append(item.opcode)
[docs] def add_method(self, pytoken): self.methodnames.append(pytoken.instruction.arg)
def _reverselists(self): indices = [] for index, instr in enumerate(self.updated_blocklist): if not isinstance(instr, Label) and instr.opcode == pyop.BUILD_LIST: indices.append(index) if len(indices): output = [] for index, instr in enumerate(self.updated_blocklist): output.append(instr) if index in indices: output.append(Instr("DUP_TOP", lineno=instr.lineno)) output.append(Instr('YIELD_VALUE', lineno=instr.lineno)) self.updated_blocklist = output def _checkbytearray(self): to_remove = [] for index, instr in enumerate(self.block): if not isinstance(instr, Label) and instr.opcode == pyop.LOAD_GLOBAL and instr.arg == 'bytearray': self.methodnames.append('bytearray') to_remove.append(instr) to_remove.append(self.block[index + 2]) if len(to_remove): self._remove_instructions(to_remove) def _check_load_attr(self): replaceable_attr_calls = ['append', 'remove', 'reverse', 'keys', 'values', 'has_key', 'IterKey', 'IterValue', 'IterNext', 'next', ] needs_call_func = [] for index, instr in enumerate(self.updated_blocklist): if not isinstance(instr, Label) and instr.opcode in [pyop.LOAD_ATTR, pyop.LOAD_METHOD]: # if instr.opcode == pyop.LOAD_METHOD: # self.methodnames.append(instr.arg) if instr.arg in replaceable_attr_calls: instr.opcode = pyop.LOAD_GLOBAL else: attr_name = 'Get%s' % instr.arg module_methods = self.container_method.module.methods matches = [] for n in module_methods: if attr_name in n.full_name: matches.append(n) # these two are special cases for name collisions between # block.Hash, tx.Hash, and input.Hash if len(matches) == 0: attr_name = 'GetTX%s' % instr.arg for n in module_methods: if attr_name in n.full_name: matches.append(n) if len(matches) == 0: attr_name = 'GetInput%s' % instr.arg for n in module_methods: if attr_name in n.full_name: matches.append(n) # Special case for enumerators/iterator if len(matches) == 0: attr_name = 'Enumerator%s' % instr.arg for n in module_methods: if attr_name in n.full_name: matches.append(n) if len(matches) == 0: attr_name = 'Iter%s' % instr.arg for n in module_methods: if attr_name == n.full_name.split('.')[-1]: matches.append(n) if len(matches) == 0: raise Exception("Could not load attribute %s " % instr.arg) elif len(matches) > 1: raise Exception( "Could not determine attribute to load. Options: %s " % [n.full_name for n in matches]) else: instr.opcode = pyop.LOAD_GLOBAL instr.arg = attr_name needs_call_func.append(instr) blocklist = [] for index, instr in enumerate(self.updated_blocklist): blocklist.append(instr) if instr in needs_call_func: blocklist.append(Instr('CALL_FUNCTION', 1, lineno=instr.lineno)) self.updated_blocklist = blocklist def _check_function_kwargs(self): to_remove = [] for index, instr in enumerate(self.updated_blocklist): if not isinstance(instr, Label) and instr.opcode == pyop.CALL_FUNCTION_KW: instr.opcode = pyop.CALL_FUNCTION to_remove.append(self.updated_blocklist[index - 1]) if len(to_remove): self._remove_instructions(to_remove) def _remove_instructions(self, to_remove): updated = [] for item in self.block: if item not in to_remove: updated.append(item) self.updated_blocklist = updated def _checkslice(self): last = None to_del_index = -1 for index, instr in enumerate(self.block): if isinstance(instr, Instr): if last == pyop.BUILD_SLICE and instr.opcode == pyop.BINARY_SUBSCR: to_del_index = index last = instr.opcode if to_del_index > -1: self.block.pop(to_del_index) self.updated_blocklist = self.block self._checkslice() def _ast_to_instr(self, astobj, lineno): src = astor.to_source(astobj).replace('"""', "'") cp = compile(src, filename='<ast>', mode='eval') bc = Bytecode.from_code(cp) instructions = bc[:-1] for instr in instructions: instr.lineno = lineno # if its calling a method, we don't want it to do `LOAD_NAME` if instructions[-1].opcode == pyop.CALL_FUNCTION and instructions[0].opcode == pyop.LOAD_NAME: instructions[0] = Instr("LOAD_GLOBAL", arg=instructions[0].arg, lineno=instructions[0].lineno) return instructions def _check_dictionary_defs(self): new_instr = [] new_instr_ind = -1 for index, instr in enumerate(self.block): if isinstance(instr, Instr) and instr.opcode == pyop.STORE_FAST: for dictionary in self.container_method.dictionary_defs: if dictionary.name == instr.arg: new_instr_ind = index for index, item in enumerate(dictionary.keys): # load the value new_instr += self._ast_to_instr(dictionary.values[index], instr.lineno) # load the dict new_instr.append(Instr("LOAD_FAST", arg=instr.arg, lineno=instr.lineno)) # load the key new_instr += self._ast_to_instr(dictionary.keys[index], instr.lineno) new_instr.append(Instr("STORE_SUBSCR", lineno=instr.lineno)) self.container_method.dictionary_defs.remove(dictionary) if new_instr_ind > -1: self.updated_blocklist = self.updated_blocklist[0:new_instr_ind + 1] + new_instr + self.updated_blocklist[new_instr_ind + 1:] def _checkloops(self): if pyop.SETUP_LOOP in self.ops and pyop.GET_ITER in self.ops: counter = self.container_method.forloop_counter loopcounter_name = 'forloop_counter_%s' % counter looplength_name = 'forloop_length_%s' % counter iterable = self.block[1].arg iterable_name = self.block[-1].arg ln = self.block[0].lineno if iterable in ['range', 'keys', 'values'] or self.block[2].opcode in [pyop.LOAD_ATTR, pyop.LOAD_METHOD, pyop.CALL_FUNCTION]: dynamic_iterable_name = 'dynamic_iterable_%s' % counter self.container_method.add_to_scope(dynamic_iterable_name) get_iter_index = self.ops.index(pyop.GET_ITER) load_range_ops = self.block[1:get_iter_index] load_range_ops.append(Instr("STORE_FAST", dynamic_iterable_name, lineno=ln)) loop_exit = self.block[0].arg loop_done = self.block[get_iter_index + 2] loop_start = self.block[get_iter_index + 1] if loop_done.opcode == pyop.EXTENDED_ARG: loop_done = self.block[get_iter_index + 3] loop_done = loop_done.arg instructions = [ # Instr("SETUP_LOOP",loop_exit,lineno=ln), Instr("LOAD_CONST", 0, lineno=ln), Instr("STORE_FAST", arg=loopcounter_name, lineno=ln) ] + load_range_ops + [ Instr("LOAD_FAST", arg=dynamic_iterable_name, lineno=ln), Instr("LOAD_GLOBAL", arg="len", lineno=ln), Instr("CALL_FUNCTION", arg=1, lineno=ln), Instr("STORE_FAST", arg=looplength_name, lineno=ln), loop_start, # Instr("FOR_ITER",loop_done,lineno=ln), Instr("LOAD_FAST", arg=loopcounter_name, lineno=ln), Instr("LOAD_FAST", arg=looplength_name, lineno=ln), Instr("COMPARE_OP", arg=Compare.LT, lineno=ln), Instr("POP_JUMP_IF_FALSE", arg=loop_done, lineno=ln), Instr("LOAD_FAST", arg=dynamic_iterable_name, lineno=ln), Instr("LOAD_FAST", arg=loopcounter_name, lineno=ln), Instr("BINARY_SUBSCR", lineno=ln), Instr("STORE_FAST", arg=iterable_name, lineno=ln), Instr("LOAD_FAST", loopcounter_name, lineno=ln), Instr("LOAD_CONST", 1, lineno=ln), Instr("INPLACE_ADD", lineno=ln), Instr("STORE_FAST", loopcounter_name, lineno=ln), ] else: loop_exit = self.block[0].arg loop_done = self.block[4].arg loop_start = self.block[3] instructions = [ # Instr("SETUP_LOOP",loop_exit,lineno=ln), Instr("LOAD_CONST", 0, lineno=ln), Instr("STORE_FAST", arg=loopcounter_name, lineno=ln), Instr("LOAD_FAST", arg=iterable, lineno=ln), Instr("LOAD_GLOBAL", arg="len", lineno=ln), Instr("CALL_FUNCTION", arg=1, lineno=ln), Instr("STORE_FAST", arg=looplength_name, lineno=ln), loop_start, # Instr("FOR_ITER",loop_done,lineno=ln), Instr("LOAD_FAST", arg=loopcounter_name, lineno=ln), Instr("LOAD_FAST", arg=looplength_name, lineno=ln), Instr("COMPARE_OP", arg=Compare.LT, lineno=ln), Instr("POP_JUMP_IF_FALSE", arg=loop_done, lineno=ln), Instr("LOAD_FAST", arg=iterable, lineno=ln), Instr("LOAD_FAST", arg=loopcounter_name, lineno=ln), Instr("BINARY_SUBSCR", lineno=ln), Instr("STORE_FAST", arg=iterable_name, lineno=ln), Instr("LOAD_FAST", loopcounter_name, lineno=ln), Instr("LOAD_CONST", 1, lineno=ln), Instr("INPLACE_ADD", lineno=ln), Instr("STORE_FAST", loopcounter_name, lineno=ln), ] self.container_method.add_to_scope(loopcounter_name) self.container_method.add_to_scope(looplength_name) self.container_method.add_to_scope(iterable_name) self.block = self.updated_blocklist = instructions
[docs] def tokenize(self): self.updated_blocklist = self.block self._check_dictionary_defs() self._checkslice() self._checkbytearray() self._checkloops() self._check_load_attr() self._check_function_kwargs() self._reverselists() ln = None last_token = None for index, instr in enumerate(self.updated_blocklist): if isinstance(instr, Instr): ln = instr.lineno try: token = PyToken(instr, self, index, ln) token.to_vm(self.tokenizer, last_token) except Exception as e: cm = self.container_method print("ERROR: %s:%s in %s() with msg - %s" % (cm.module.path, cm.start_line_no + ln - 1, cm.name, str(e))) raise e last_token = token
[docs] def lookup_method_name(self, index): return self.methodnames.pop()