Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
415 changes: 415 additions & 0 deletions swap_router/contracts/v3/swap_router_v3_approval.tl

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions swap_router/contracts/v3/swap_router_v3_clear_state.tl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#pragma version 10
#tealish version git+https://github.com/Hipo/tealish.git@d7441973671cf6b79dd55843016892f4b86ceeba

exit(1)
Empty file added swap_router/sdk/__init__.py
Empty file.
77 changes: 48 additions & 29 deletions swap_router/sdk/base_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from base64 import b64decode, b64encode
import time
from algosdk.encoding import decode_address
from tinyman.utils import TransactionGroup, int_to_bytes
from base64 import b64decode, b64encode
from algosdk import transaction
from algosdk.encoding import decode_address, encode_address
from algosdk.logic import get_application_address
from algosdk.account import generate_account

from tinyman.utils import TransactionGroup
from swap_router.sdk.struct import get_struct, get_box_costs


class BaseClient():
Expand All @@ -18,13 +17,13 @@ def __init__(self, algod, app_id, user_address, user_sk) -> None:
self.add_key(user_address, user_sk)
self.current_timestamp = None
self.simulate = False

def get_suggested_params(self):
return self.algod.suggested_params()

def get_current_timestamp(self):
return self.current_timestamp or time.time()

def _submit(self, transactions, additional_fees=0):
transactions = self.flatten_transactions(transactions)
fee = transactions[0].fee
Expand All @@ -45,7 +44,7 @@ def _submit(self, transactions, additional_fees=0):
else:
txn_info = txn_group.submit(self.algod, wait=True)
return txn_info

def flatten_transactions(self, txns):
result = []
if isinstance(txns, transaction.Transaction):
Expand All @@ -54,23 +53,17 @@ def flatten_transactions(self, txns):
for txn in txns:
result += self.flatten_transactions(txn)
return result


def calculate_min_balance(self, accounts=0, assets=0, boxes=None):
cost = 0
cost += accounts * 100_000
cost += assets * 100_000
cost += get_box_costs(boxes or {})
return cost

def add_key(self, address, key):
self.keys[address] = key

def get_global(self, key, default=None, app_id=None):
app_id = app_id or self.app_id
global_state = {s["key"]: s["value"] for s in self.algod.application_info(app_id)["params"]["global-state"]}
key = b64encode(key).decode()
if key in global_state:
value = global_state[key]
if value["type"] == 2:
return value["uint"]
else:
return b64decode(value["bytes"])
else:
return default

def get_globals(self, app_id=None):
app_id = app_id or self.app_id
gs = self.algod.application_info(app_id)["params"]["global-state"]
Expand All @@ -86,6 +79,26 @@ def get_globals(self, app_id=None):
state = dict(sorted(state.items(), key=lambda x: x[0]))
return state

def get_global(self, key, default=None, app_id=None):
app_id = app_id or self.app_id
global_state = {s["key"]: s["value"] for s in self.algod.application_info(app_id)["params"]["global-state"]}
key = b64encode(key).decode()
if key in global_state:
value = global_state[key]
if value["type"] == 2:
return value["uint"]
else:
return b64decode(value["bytes"])
else:
return default

def get_box(self, box_name, struct_name, app_id=None):
app_id = app_id or self.app_id
box_value = b64decode(self.algod.application_box_by_name(app_id, box_name)["value"])
struct_class = get_struct(struct_name)
struct = struct_class(box_value)
return struct

def box_exists(self, box_name, app_id=None):
app_id = app_id or self.app_id
try:
Expand All @@ -95,6 +108,9 @@ def box_exists(self, box_name, app_id=None):
return False

def is_opted_in(self, address, asset_id):
if asset_id == 0:
return True

try:
self.algod.account_asset_info(address, asset_id)
return True
Expand All @@ -103,9 +119,12 @@ def is_opted_in(self, address, asset_id):

def get_optin_if_needed_txn(self, sender, asset_id):
if not self.is_opted_in(sender, asset_id):
txn = transaction.AssetOptInTxn(
sender=sender,
sp=self.get_suggested_params(),
index=asset_id,
)
return txn
return self.get_optin_txn(sender, asset_id)

def get_optin_txn(self, sender, asset_id):
txn = transaction.AssetOptInTxn(
sender=sender,
sp=self.get_suggested_params(),
index=asset_id,
)
return txn
162 changes: 162 additions & 0 deletions swap_router/sdk/struct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import json
import re
from typing import Any, Dict


MINIMUM_BALANCE_REQUIREMENT_PER_BOX = 2_500
MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE = 400


class StructRegistry:
def __init__(self):
self.struct_definitions: Dict[str, Dict] = {}

def load_from_file(self, filepath: str) -> None:
"""Load struct definitions from a JSON file."""
with open(filepath, 'r') as f:
data = json.load(f)
self.load_from_dict(data.get('structs', {}))

def load_from_dict(self, struct_dict: Dict) -> None:
"""Load struct definitions from a dictionary."""
self.struct_definitions.update(struct_dict)

def get_type(self, name: str) -> Any:
"""Get the appropriate type handler for a given type name."""
if name == "int":
return TealishInt()
elif name.startswith("uint"):
return TealishInt()
elif name.startswith("bytes"):
return TealishBytes()
elif name in self.struct_definitions:
return Struct(name=name, manager=self, **self.struct_definitions[name])
elif "[" in name:
name, length = re.match(r"([A-Za-z_0-9]+)\[(\d+)\]", name).groups()
return ArrayData(
Struct(name=name, manager=self, **self.struct_definitions[name]),
int(length)
)
else:
raise KeyError(f"Unknown type: {name}")

def get_struct(self, name: str) -> 'Struct':
"""Get a Struct instance by name."""
if name not in self.struct_definitions:
raise KeyError(f"Struct '{name}' not found")
return Struct(name=name, **self.struct_definitions[name])


STRUCT_REGISTRY = StructRegistry()


def register_struct_file(filepath: str) -> None:
STRUCT_REGISTRY.load_from_file(filepath)


def get_struct(name: str) -> 'Struct':
return STRUCT_REGISTRY.get_struct(name)


class Struct():
def __init__(self, name, size, fields):
self._name = name
self._size = size
self._fields = fields
self._data = None

def __call__(self, data=None) -> Any:
if data is None:
data = bytearray(self._size)
struct = Struct(self._name, self._size, self._fields)
struct._data = memoryview(data)
return struct

def __getattribute__(self, name: str) -> Any:
if name.startswith("_"):
return super().__getattribute__(name)
field = self._fields[name]
start = field["offset"]
end = field["offset"] + field["size"]
value = self._data[start:end]
type = STRUCT_REGISTRY.get_type(field["type"])
return type(value)

def __setattr__(self, name: str, value: Any) -> None:
if name.startswith("_"):
return super().__setattr__(name, value)
field = self._fields[name]
start = field["offset"]
end = field["offset"] + field["size"]
if field["type"] in ("int",):
value = value.to_bytes(field["size"], "big")
if isinstance(value, (Struct, ArrayData)):
value = value._data
self._data[start:end] = value

def __setitem__(self, index, value):
if isinstance(value, (Struct, ArrayData)):
value = value._data
self._data[:] = value

def __str__(self) -> str:
return repr(bytes(self._data))

def __repr__(self) -> str:
fields = {f: getattr(self, f) for f in self._fields}
return f"{self._name}({fields})"

def __len__(self):
return len(self._data)

def __conform__(self, protocol):
return bytes(self._data)

def __bytes__(self):
return bytes(self._data.tobytes())


class ArrayData():
def __init__(self, struct, length):
self._struct = struct
self._length = length

def __call__(self, data=None) -> Any:
if data is None:
data = bytearray(self._struct._size * self.length)
self._data = memoryview(data)
return self

def __getitem__(self, index):
offset = self._struct._size * index
end = offset + self._struct._size
value = self._data[offset:end]
return self._struct(value)

def __setitem__(self, index, value):
offset = self._struct._size * index
end = offset + self._struct._size
if isinstance(value, Struct):
value = value._data
self._data[offset:end] = value

def __repr__(self) -> str:
return ", ".join(repr(self[i]) for i in range(self._length))


class TealishInt():
def __call__(self, value) -> Any:
return int.from_bytes(value, "big")


class TealishBytes():
def __call__(self, value) -> Any:
return value


def get_box_costs(boxes):
cost = MINIMUM_BALANCE_REQUIREMENT_PER_BOX
for name, struct in boxes.items():
cost += len(name) * MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE
cost += struct._size * MINIMUM_BALANCE_REQUIREMENT_PER_BOX_BYTE
return cost
7 changes: 4 additions & 3 deletions swap_router/sdk/client.py → swap_router/sdk/v2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from algosdk.encoding import decode_address, encode_address
from algosdk.logic import get_application_address
from algosdk.constants import ZERO_ADDRESS
from .base_client import BaseClient
from .utils import int_array, bytes_array
from ..base_client import BaseClient
from ..utils import int_array, bytes_array


class SwapRouterClient(BaseClient):
Expand All @@ -12,6 +12,7 @@ def __init__(self, algod, app_id, tinyman_amm_app_id, talgo_app_id, user_address
super().__init__(algod, app_id, user_address, user_sk)
self.amm_app_id = tinyman_amm_app_id
self.talgo_app_id = talgo_app_id

state = self.get_globals(talgo_app_id)
self.talgo_app_address = encode_address(state[b"account_0"])
self.talgo_asset_id = state[b"talgo_asset_id"]
Expand Down Expand Up @@ -134,7 +135,7 @@ def swap(self, input_amount, output_amount, route, pools):
))
inner_txns = sum(tx.get("inner_txns", 0) for tx in transactions)
return self._submit(txns, additional_fees=inner_txns)

def claim_extra(self, asset_id):
sp = self.get_suggested_params()
txns = [
Expand Down
Loading