from collections import OrderedDict
from boa.interop import VMOp
from boa.interop.BigInteger import BigInteger
from bytecode import Label
from boa.code.pyop import *
NEO_SC_FRAMEWORK = 'boa.interop.'
[docs]class VMToken(object):
"""
"""
addr = None
pytoken = None
data = None
vm_op = None
src_method = None
target_method = None
is_annotation = None
updatable_data = None
@property
def out_op(self):
"""
:return:
"""
if type(self.vm_op) is int:
return self.vm_op
elif type(self.vm_op) is bytes:
return ord(self.vm_op)
else:
raise Exception('Invalid op: %s ' % self.vm_op)
def __init__(self, vm_op=None, pytoken=None, addr=None, data=None):
self.vm_op = vm_op
self.pytoken = pytoken
self.addr = addr
self.data = data
self.src_method = None
self.target_method = None
self.is_annotation = False
class VMTokenizer(object):
"""
"""
method = None
_address = None
vm_tokens = None
total_param_and_body_count_token = None
def __init__(self, method):
self.method = method
self._address = 0
self.vm_tokens = OrderedDict()
self.method_begin_items()
def method_begin_items(self):
# we just need to inssert the total number of arguments + body variables
# which is the length of the method `scope` dictionary
# then create a new array for the vm to store
self.convert_push_integer(self.method.stacksize)
self.convert1(VMOp.NEWARRAY)
self.convert1(VMOp.TOALTSTACK)
for index, arg in enumerate(self.method.args):
self.convert_load_parameter(arg, index)
def method_end_items(self):
self.insert1(VMOp.FROMALTSTACK)
self.insert1(VMOp.DROP)
def insert_vm_token_at(self, vm_token, index):
"""
:param vm_token:
:param index:
"""
self.vm_tokens[index] = vm_token
def insert1(self, vm_op, data=None):
"""
:param vm_op:
:param data:
:return:
"""
start_addr = self._address
vmtoken = VMToken(vm_op=vm_op, addr=start_addr, data=data)
self._address += 1
if vmtoken.data is not None:
self._address += len(vmtoken.data)
self.insert_vm_token_at(vmtoken, vmtoken.addr)
return vmtoken
def insert_push_data(self, data):
"""
:param data:
:return:
"""
dlen = len(data)
if dlen == 0:
return self.insert1(VMOp.PUSH0)
elif dlen <= 75:
return self.insert1(dlen, data)
if dlen < 0x100:
prefixlen = 1
code = VMOp.PUSHDATA1
elif dlen < 0x1000:
prefixlen = 2
code = VMOp.PUSHDATA2
else:
prefixlen = 4
code = VMOp.PUSHDATA4
byts = bytearray(dlen.to_bytes(prefixlen, 'little')) + data
return self.insert1(code, byts)
def insert_push_integer(self, i):
"""
:param i:
:return:
"""
if i == 0:
return self.insert1(VMOp.PUSH0)
elif i == -1:
return self.insert1(VMOp.PUSHM1)
elif 0 < i <= 16:
out = 0x50 + i
return self.insert1(out)
bigint = BigInteger(i)
outdata = bigint.ToByteArray(signed=False)
print("big int %s %s" % (bigint, outdata))
return self.insert_push_data(outdata)
def convert1(self, vm_op, py_token=None, data=None):
"""
:param vm_op:
:param py_token:
:param data:
:return:
"""
start_addr = self._address
vmtoken = VMToken(vm_op=vm_op, addr=start_addr,
pytoken=py_token, data=data)
self._address += 1
if vmtoken.data is not None and type(vmtoken.data) is not Label:
self._address += len(data)
self.insert_vm_token_at(vmtoken, start_addr)
return vmtoken
def convert_new_array(self, py_token=None):
# push the length of the array
"""
:param py_token:
"""
if type(py_token.args) is int:
self.insert_push_integer(py_token.args)
else:
self.convert_load_local(py_token, py_token.args)
self.convert1(VMOp.PACK, py_token)
def convert_dup_top_two(self, py_token=None):
# a, b, c, d
# <- SWAP
# b, a, c, d
# <- DUP
# b, b, a, c, d
# <- ROT
# b, a, b, c, d
# <- OVER
# a, b, a, b, c, d
self.convert1(VMOp.SWAP, py_token)
self.convert1(VMOp.DUP, py_token)
self.convert1(VMOp.ROT, py_token)
self.convert1(VMOp.OVER, py_token)
def convert_pop_jmp_if(self, pytoken):
# token = tokenizer.convert1(VMOp.JMPIF, self, data=bytearray(2))
token = self.convert1(VMOp.JMPIF, pytoken, data=bytearray(2))
# self.insert1(VMOp.DROP)
return token
def convert_load_const(self, pytoken):
if type(pytoken.args) is int:
token = self.convert_push_integer(pytoken.args, pytoken)
elif type(pytoken.args) is str:
str_bytes = pytoken.args.encode('utf-8')
# pytoken.args = str_bytes
token = self.convert_push_data(str_bytes, pytoken)
elif type(pytoken.args) is bytes:
token = self.convert_push_data(pytoken.args, pytoken)
elif type(pytoken.args) is bytearray:
token = self.convert_push_data(bytes(pytoken.args), pytoken)
elif type(pytoken.args) is bool:
token = self.convert_push_integer(pytoken.args, pytoken)
elif isinstance(pytoken.args, type(None)):
token = self.convert_push_data(bytearray(0))
# TODO - process tuple
# elif type(pytoken.args) == Code:
# pass
else:
raise Exception("Could not load type %s for item %s " % (
type(pytoken.args), pytoken.args))
return token
def convert_push_data(self, data, py_token=None):
"""
:param data:
:param py_token:
:return:
"""
dlen = len(data)
if dlen == 0:
return self.convert1(VMOp.PUSH0, py_token=py_token)
elif dlen <= 75:
return self.convert1(len(data), py_token=py_token, data=data)
if dlen < 0x100:
prefixlen = 1
code = VMOp.PUSHDATA1
elif dlen < 0x1000:
prefixlen = 2
code = VMOp.PUSHDATA2
else:
prefixlen = 4
code = VMOp.PUSHDATA4
byts = bytearray(dlen.to_bytes(prefixlen, 'little')) + data
return self.convert1(code, py_token=py_token, data=byts)
def convert_push_integer(self, i, py_token=None):
"""
:param i:
:param py_token:
:return:
"""
if i == 0:
return self.convert1(VMOp.PUSH0, py_token=py_token)
elif i == -1:
return self.convert1(VMOp.PUSHM1, py_token=py_token)
elif 0 < i <= 16:
out = 0x50 + i
return self.convert1(out, py_token=py_token)
bigint = BigInteger(i)
outdata = bigint.ToByteArray()
return self.convert_push_data(outdata, py_token=py_token)
def convert_store_local(self, py_token):
# set array
"""
:param py_token:
"""
self.convert1(VMOp.DUPFROMALTSTACK, py_token=py_token)
local_name = py_token.args
if local_name in self.method.scope.keys():
position = self.method.scope[local_name]
self.convert_push_integer(position)
# set item
self.convert_push_integer(2)
self.convert1(VMOp.ROLL)
self.convert1(VMOp.SETITEM)
else:
raise Exception("local name '%s' not found in method scope '%s'" % (local_name, self.method.full_name))
def convert_load_local(self, py_token, name=None):
"""
:param py_token:
:param name:
"""
local_name = py_token.args
# check to see if this local is a variable
if local_name in self.method.scope:
position = self.method.scope[local_name]
# get array
self.convert1(VMOp.DUPFROMALTSTACK, py_token=py_token)
# get i
self.convert_push_integer(position)
self.convert1(VMOp.PICKITEM)
else:
raise Exception("CANNOT LOAD LOCAL! %s " % local_name)
def convert_store_subscr(self, pytoken):
self.convert1(VMOp.ROT)
self.convert1(VMOp.SETITEM, pytoken)
def convert_load_parameter(self, arg, position):
"""
:param arg:
:param position:
"""
# get array
self.convert1(VMOp.DUPFROMALTSTACK)
self.insert_push_integer(position)
self.insert_push_integer(2)
self.insert1(VMOp.ROLL)
self.insert1(VMOp.SETITEM)
def convert_built_in_list(self, pytoken):
"""
:param pytoken:
"""
lenfound = False
self.convert1(VMOp.NEWARRAY, pytoken)
def convert_build_slice(self, pytoken):
# this was fun!
# rotate so list is on the top, then move it to alt stack
self.convert1(VMOp.ROT)
self.convert1(VMOp.TOALTSTACK, py_token=pytoken)
# swap the end index and the start index, duplicate start index to alt stack
self.convert1(VMOp.SWAP)
self.convert1(VMOp.DUP)
self.convert1(VMOp.TOALTSTACK)
# subtract end index from start index, this is placed on the stack
self.convert1(VMOp.SUB)
# get the start index and list from alt stack
self.convert1(VMOp.FROMALTSTACK)
self.convert1(VMOp.FROMALTSTACK)
# swap the list and the start index
self.convert_push_integer(2)
self.convert1(VMOp.XSWAP)
# and now perform substr. whew.
self.convert1(VMOp.SUBSTR)
def convert_method_call(self, pytoken):
# special case for list initialization
"""
:param pytoken:
:return:
"""
if pytoken.func_name == 'list':
return self.convert_built_in_list(pytoken)
elif pytoken.func_name == 'bytearray':
return self.convert_push_data(bytes(pytoken.instruction.arg), pytoken)
elif pytoken.func_name == 'bytes':
return self.convert_push_data(pytoken.func_params[0].args, pytoken)
param_len = pytoken.num_params
if param_len <= 1:
pass
elif param_len == 2:
# if we are using concat or take, we don't want to swap
if pytoken.func_name != 'concat' and pytoken.func_name != 'take' and pytoken.func_name != 'has_key':
self.insert1(VMOp.SWAP)
elif param_len == 3:
if pytoken.func_name != 'substr':
self.insert_push_integer(2)
self.insert1(VMOp.XSWAP)
else:
half_p = int(param_len / 2)
for i in range(0, half_p):
save_to = param_len - 1 - i
self.insert_push_integer(save_to)
self.insert1(VMOp.PICK)
self.insert_push_integer(i + 1)
self.insert1(VMOp.PICK)
self.insert_push_integer(save_to + 2)
self.insert1(VMOp.XSWAP)
self.insert1(VMOp.DROP)
self.insert_push_integer(i + 1)
self.insert1(VMOp.XSWAP)
self.insert1(VMOp.DROP)
# self.insert1(VMOp.NOP)
fname = pytoken.func_name
full_name = None
for m in self.method.module.methods:
if fname == m.name:
full_name = m.full_name
# operational call like len(items) or abs(value)
if self.is_op_call(fname):
vmtoken = self.convert_op_call(fname, pytoken)
# runtime.notify event
elif self.is_notify_event(pytoken):
vmtoken = self.convert_notify_event(pytoken)
# app call ( for calling other contracts on blockchain )
elif self.is_smart_contract_call(pytoken):
vmtoken = self.convert_smart_contract_call(pytoken, param_len)
elif self.is_sys_call(full_name):
vmtoken = self.convert_sys_call(full_name, pytoken)
# used for python specific built in methods like `enumerate` or `tuple`
elif self.is_built_in(fname):
vmtoken = self.convert_built_in(fname, pytoken)
# otherwise we assume the method is defined by the module
else:
vmtoken = self.convert_default_call(pytoken, param_len)
return vmtoken
def convert_default_call(self, pytoken, param_len):
vmtoken = self.convert1(
VMOp.CALL, py_token=pytoken, data=bytearray(b'\x05\x00'))
vmtoken.src_method = self.method
vmtoken.target_method = pytoken.func_name
return vmtoken
@staticmethod
def is_op_call(op):
"""
:param op:
:return:
"""
if op in ['len', 'abs', 'min', 'max', 'concat', 'take', 'substr',
'reverse', 'append', 'remove', 'keys', 'values', 'has_key',
'sha1', 'sha256', 'hash160', 'hash256', 'breakpoint',
'verify_signature',
'Exception', 'throw_if_null', ]:
return True
return False
def convert_op_call(self, op, pytoken=None):
"""
:param op:
:param pytoken:
:return:
"""
if op == 'len':
return self.convert1(VMOp.ARRAYSIZE, pytoken)
elif op == 'abs':
return self.convert1(VMOp.ABS, pytoken)
elif op == 'min':
return self.convert1(VMOp.MIN, pytoken)
elif op == 'max':
return self.convert1(VMOp.MAX, pytoken)
elif op == 'concat':
return self.convert1(VMOp.CAT, pytoken)
elif op == 'take':
return self.convert1(VMOp.LEFT, pytoken)
elif op == 'substr':
return self.convert1(VMOp.SUBSTR, pytoken)
elif op == 'keys':
return self.convert1(VMOp.KEYS, pytoken)
elif op == 'values':
return self.convert1(VMOp.VALUES, pytoken)
elif op == 'has_key':
return self.convert1(VMOp.HASKEY, pytoken)
elif op == 'sha1':
return self.convert1(VMOp.SHA1, pytoken)
elif op == 'sha256':
return self.convert1(VMOp.SHA256, pytoken)
elif op == 'hash160':
return self.convert1(VMOp.HASH160, pytoken)
elif op == 'hash256':
return self.convert1(VMOp.HASH256, pytoken)
elif op == 'verify_signature':
return self.convert1(VMOp.VERIFY, pytoken)
elif op == 'reverse':
return self.convert1(VMOp.REVERSE, pytoken)
elif op == 'append':
return self.convert1(VMOp.APPEND, pytoken)
elif op == 'remove':
return self.convert1(VMOp.REMOVE, pytoken)
elif op == 'Exception':
return self.convert1(VMOp.THROW, pytoken)
elif op == 'throw_if_null':
return self.convert1(VMOp.THROWIFNOT, pytoken)
elif op == 'breakpoint':
pytoken.is_breakpoint = True
return self.convert1(VMOp.NOP, pytoken)
return None
@staticmethod
def is_sys_call(op):
"""
:param op:
:return:
"""
if op is not None and NEO_SC_FRAMEWORK in op:
return True
return False
def convert_sys_call(self, op, pytoken=None):
"""
:param op:
:param pytoken:
:return:
"""
if 'TriggerType.ApplicationR' in op:
return self.convert_push_data(bytearray(b'\x11'), pytoken)
elif 'TriggerType.VerificationR' in op:
return self.convert_push_data(bytearray(b'\x01'), pytoken)
elif 'TriggerType.Application' in op:
return self.convert_push_data(bytearray(b'\x10'), pytoken)
elif 'TriggerType.Verification' in op:
return self.convert_push_data(bytearray(b'\x00'), pytoken)
elif 'TransactionType' in op:
return self.convert_tx_type(op, pytoken)
elif 'GetTXHash' in op:
op = op.replace('GetTXHash', 'GetHash')
elif 'GetInputHash' in op:
op = op.replace('GetInputHash', 'GetHash')
elif 'Iterator.Iter' in op:
op = op.replace('Iterator.Iter', 'Iterator.')
elif 'Enumerator.Enumerator' in op:
op = op.replace('Enumerator.Enumerator', 'Enumerator.')
if op == 'Neo.Enumerator':
op = 'Neo.Enumerator.Create'
elif 'GetIsPayable' in op:
op = op.replace('GetIsPayable', 'IsPayable')
syscall_name = op.replace(NEO_SC_FRAMEWORK, '').encode('utf-8')
length = len(syscall_name)
ba = bytearray([length]) + bytearray(syscall_name)
pytoken.is_sys_call = False
vmtoken = self.convert1(VMOp.SYSCALL, pytoken, data=ba)
self.insert1(VMOp.NOP)
return vmtoken
def convert_tx_type(self, op, pytoken=None):
if 'MinerTransaction' in op:
return self.convert_push_data(bytearray(b'\x00'), pytoken)
elif 'IssueTransaction' in op:
return self.convert_push_data(bytearray(b'\x01'), pytoken)
elif 'ClaimTransaction' in op:
return self.convert_push_data(bytearray(b'\x02'), pytoken)
elif 'EnrollmentTransaction' in op:
return self.convert_push_data(bytearray(b'\x20'), pytoken)
elif 'VotingTransaction' in op:
return self.convert_push_data(bytearray(b'\x24'), pytoken)
elif 'RegisterTransaction' in op:
return self.convert_push_data(bytearray(b'\x40'), pytoken)
elif 'ContractTransaction' in op:
return self.convert_push_data(bytearray(b'\x80'), pytoken)
elif 'AgencyTransaction' in op:
return self.convert_push_data(bytearray(b'\xb0'), pytoken)
elif 'PublishTransaction' in op:
return self.convert_push_data(bytearray(b'\xd0'), pytoken)
elif 'InvocationTransaction' in op:
return self.convert_push_data(bytearray(b'\xd1'), pytoken)
elif 'StateTransaction' in op:
return self.convert_push_data(bytearray(b'\x90'), pytoken)
@staticmethod
def is_built_in(op):
"""
:param op:
:return:
"""
if op in ['zip', 'type', 'tuple', 'super', 'str', 'slice',
'set', 'reversed', 'property', 'memoryview',
'map', 'list', 'frozenset', 'float', 'filter',
'enumerate', 'dict', 'divmod', 'complex', 'bytes', 'bytearray', 'bool',
'int', 'vars', 'sum', 'sorted', 'round', 'setattr', 'getattr',
'rep', 'quit', 'print', 'pow', 'ord', 'oct', 'next', 'locals', 'license',
'iter', 'isinstance', 'issubclass', 'input', 'id', 'hex',
'help', 'hash', 'hasattr', 'globals', 'format', 'exit',
'exec', 'eval', 'dir', 'deleteattr', 'credits', 'copyright',
'compile', 'chr', 'callable', 'bin', 'ascii', 'any', 'all', ]:
return True
return False
def convert_built_in(self, op, pytoken):
"""
:param op:
:param pytoken:
:return:
"""
syscall_name = None
if op == 'print':
syscall_name = 'Neo.Runtime.Log'.encode('utf-8')
elif op == 'enumerate':
syscall_name = b'Neo.Enumerator.Create'
elif op == 'iter':
syscall_name = b'Neo.Iterator.Create'
elif op == 'next':
syscall_name = b'Neo.Enumerator.Next'
elif op == 'reversed':
raise NotImplementedError(
"[Compilation error] Built in %s is not implemented. Use array.reverse() instead." % op)
if syscall_name:
length = len(syscall_name)
ba = bytearray([length]) + bytearray(syscall_name)
vmtoken = self.convert1(VMOp.SYSCALL, pytoken, data=ba)
return vmtoken
raise NotImplementedError(
"[Compilation error] Built in %s is not implemented" % op)
def is_notify_event(self, pytoken):
"""
:param pytoken:
:return:
"""
name = pytoken.func_name
for action in self.method.module.actions:
if action.method_name == name:
return True
return False
def convert_notify_event(self, pytoken):
"""
:param pytoken:
:return:
"""
event_action = None
for action in self.method.module.actions:
if action.method_name == pytoken.func_name:
event_action = action
if event_action is None:
raise Exception("Event action not found")
# push the event name
event_name = event_action.event_name.encode('utf-8')
self.convert_push_data(event_name, py_token=None)
# push the num params
self.convert_push_integer(len(event_action.event_args))
# pack the array
self.convert1(VMOp.PACK)
# insert syscall
syscall_name = 'Neo.Runtime.Notify'.encode('utf-8')
length = len(syscall_name)
ba = bytearray([length]) + bytearray(syscall_name)
vmtoken = self.convert1(VMOp.SYSCALL, pytoken, data=ba)
# self.insert1(VMOp.NOP)
return vmtoken
def is_smart_contract_call(self, pytoken):
"""
:param pytoken:
:return:
"""
name = pytoken.func_name
if name == 'DynamicAppCall':
pytoken.is_dynamic_appcall = True
return True
for appcall in self.method.module.app_call_registrations:
if appcall.method_name == name:
return True
return False
def convert_smart_contract_call(self, pytoken, param_len):
"""
:param pytoken:
:return:
"""
if pytoken.is_dynamic_appcall:
# push the contract hash
vmtoken = self.convert1(
VMOp.APPCALL, py_token=pytoken, data=bytearray(20))
# self.insert1(VMOp.NOP)
return vmtoken
# this is used for app calls that are registered
# using RegisterAppCall(script_hash, *args)
sc_appcall = None
for appcall in self.method.module.app_call_registrations:
if appcall.method_name == pytoken.func_name:
sc_appcall = appcall
if sc_appcall is None:
raise Exception("Smart Contract Appcall %s not found " %
pytoken.func_name)
# push the contract hash
vmtoken = self.convert1(
VMOp.APPCALL, py_token=pytoken, data=sc_appcall.script_hash_addr)
return vmtoken
class Nep8VMTokenizer(VMTokenizer):
def convert_default_call(self, pytoken, param_len):
# for NEP8 we need to add in the num params and the return count for the method call
# return count is always 1
method_opts = bytearray([1, param_len, 0, 0])
vmtoken = self.convert1(
VMOp.CALL_I, py_token=pytoken, data=method_opts)
vmtoken.src_method = self.method
vmtoken.target_method = pytoken.func_name
return vmtoken
def convert_smart_contract_call(self, pytoken, param_len):
"""
:param pytoken:
:return:
"""
if pytoken.is_dynamic_appcall:
# in dynamic appcall definition, one of the params is the script hash,
# so we need one less than is passed in as `param_len`
method_opts = bytearray([1, param_len - 1])
# push the call with param count and return count
vmtoken = self.convert1(
VMOp.CALL_ED, py_token=pytoken, data=method_opts)
return vmtoken
# this is used for app calls that are registered
# using RegisterAppCall(script_hash, *args)
sc_appcall = None
for appcall in self.method.module.app_call_registrations:
if appcall.method_name == pytoken.func_name:
sc_appcall = appcall
if sc_appcall is None:
raise Exception("Smart Contract Appcall %s not found " %
pytoken.func_name)
# push the contract hash + p/r count
appcall_data = bytearray([1, param_len]) + sc_appcall.script_hash_addr
vmtoken = self.convert1(
VMOp.CALL_E, py_token=pytoken, data=appcall_data)
return vmtoken