Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,6 @@ miner_objects/miner_dashboard/*.tsbuildinfo

# macOS files
.DS_Store

#vim
*.swp
5 changes: 2 additions & 3 deletions data_generator/base_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,21 @@ async def handle_msg(self, msg):
def instantiate_not_pickleable_objects(self):
raise NotImplementedError

def get_closes_websocket(self, trade_pairs: List[TradePair], trade_pair_to_last_order_time_ms) -> dict[str: PriceSource]:
def get_closes_websocket(self, trade_pairs: List[TradePair], time_ms) -> dict[str: PriceSource]:
events = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup!

for trade_pair in trade_pairs:
symbol = trade_pair.trade_pair
if symbol not in self.trade_pair_to_recent_events:
continue

# Get the closest aligned event
time_ms = trade_pair_to_last_order_time_ms[trade_pair]
symbol = trade_pair.trade_pair
latest_event = self.trade_pair_to_recent_events[symbol].get_closest_event(time_ms)
events[trade_pair] = latest_event

return events

def get_closes_rest(self, trade_pairs: List[TradePair]) -> dict[str: float]:
def get_closes_rest(self, trade_pairs: List[TradePair], time_ms) -> dict[str: float]:
pass

def get_websocket_lag_for_trade_pair_s(self, tp: str, now_ms: int) -> float | None:
Expand Down
8 changes: 4 additions & 4 deletions data_generator/polygon_data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ def symbol_to_trade_pair(self, symbol: str):
raise ValueError(f"Unknown symbol: {symbol}")
return tp

def get_closes_rest(self, pairs: List[TradePair]) -> dict:
def get_closes_rest(self, trade_pairs: List[TradePair], time_ms, live=True) -> dict:
all_trade_pair_closes = {}
# Multi-threaded fetching of REST data over all requested trade pairs. Max parallelism is 5.
with ThreadPoolExecutor(max_workers=5) as executor:
# Dictionary to keep track of futures
future_to_trade_pair = {executor.submit(self.get_close_rest, p): p for p in pairs}
future_to_trade_pair = {executor.submit(self.get_close_rest, p, time_ms): p for p in trade_pairs}

for future in as_completed(future_to_trade_pair):
tp = future_to_trade_pair[future]
Expand Down Expand Up @@ -931,7 +931,7 @@ def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, qu
#if tp != TradePair.GBPUSD:
# continue

print('getting close for', tp.trade_pair_id, ':', polygon_data_provider.get_close_rest(tp))
print('getting close for', tp.trade_pair_id, ':', polygon_data_provider.get_close_rest(tp, TimeUtil.now_in_millis()))

time.sleep(100000)

Expand All @@ -951,4 +951,4 @@ def get_currency_conversion(self, trade_pair: TradePair=None, base: str=None, qu
aggs.append(a)

assert 0, aggs
"""
"""
190 changes: 74 additions & 116 deletions data_generator/tiingo_data_service.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion meta/meta.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"subnet_version": "7.1.2"
"subnet_version": "7.1.3"
}
7 changes: 4 additions & 3 deletions tests/shared_objects/mock_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,13 @@ def __init__(self, secrets, disable_ws):
super().__init__(secrets=secrets, disable_ws=disable_ws)
self.polygon_data_service = MockPolygonDataService(api_key=secrets["polygon_apikey"], disable_ws=disable_ws)

def get_sorted_price_sources_for_trade_pair(self, trade_pair, processed_ms):
return [PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1)]

def get_close_at_date(self, trade_pair, timestamp_ms, order=None, verbose=True):
return PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1)

def get_sorted_price_sources_for_trade_pair(self, trade_pair, time_ms=None, live=True):
return [PriceSource(open=1, high=1, close=1, low=1, bid=1, ask=1)]


class MockPolygonDataService(PolygonDataService):
def __init__(self, api_key, disable_ws=True):
super().__init__(api_key, disable_ws=disable_ws)
Expand Down
44 changes: 18 additions & 26 deletions vali_objects/utils/live_price_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def sorted_valid_price_sources(self, price_events: List[PriceSource | None], cur
if not valid_events:
return None

if not current_time_ms:
current_time_ms = TimeUtil.now_in_millis()

best_event = PriceSource.get_winning_event(valid_events, current_time_ms)
if not best_event:
return None
Expand All @@ -50,10 +53,7 @@ def sorted_valid_price_sources(self, price_events: List[PriceSource | None], cur

return PriceSource.non_null_events_sorted(valid_events, current_time_ms)

def dual_rest_get(
self,
trade_pairs: List[TradePair]
) -> Tuple[Dict[TradePair, PriceSource], Dict[TradePair, PriceSource]]:
def dual_rest_get(self, trade_pairs: List[TradePair], time_ms, live) -> Tuple[Dict[TradePair, PriceSource], Dict[TradePair, PriceSource]]:
"""
Fetch REST closes from both Polygon and Tiingo in parallel,
using ThreadPoolExecutor to run both calls concurrently.
Expand All @@ -62,8 +62,8 @@ def dual_rest_get(
tiingo_results = {}
with ThreadPoolExecutor(max_workers=2) as executor:
# Submit both REST calls to the executor
poly_fut = executor.submit(self.polygon_data_service.get_closes_rest, trade_pairs)
tiingo_fut = executor.submit(self.tiingo_data_service.get_closes_rest, trade_pairs)
poly_fut = executor.submit(self.polygon_data_service.get_closes_rest, trade_pairs, time_ms, live)
tiingo_fut = executor.submit(self.tiingo_data_service.get_closes_rest, trade_pairs, time_ms, live)

try:
# Wait for both futures to complete with a 10s timeout
Expand All @@ -87,37 +87,31 @@ def get_latest_price(self, trade_pair: TradePair, time_ms=None) -> Tuple[float,
Gets the latest price for a single trade pair by utilizing WebSocket and possibly REST data sources.
Tries to get the price as close to time_ms as possible.
"""
if not time_ms:
time_ms = TimeUtil.now_in_millis()
price_sources = self.get_sorted_price_sources_for_trade_pair(trade_pair, time_ms)
winning_event = PriceSource.get_winning_event(price_sources, time_ms)
return winning_event.parse_best_best_price_legacy(time_ms), price_sources

def get_sorted_price_sources_for_trade_pair(self, trade_pair: TradePair, time_ms:int) -> List[PriceSource] | None:
temp = self.get_tp_to_sorted_price_sources([trade_pair], {trade_pair: time_ms})
def get_sorted_price_sources_for_trade_pair(self, trade_pair: TradePair, time_ms: int, live=True) -> List[PriceSource] | None:
temp = self.get_tp_to_sorted_price_sources([trade_pair], time_ms, live)
return temp.get(trade_pair)

def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair],
trade_pair_to_last_order_time_ms: Dict[TradePair, int] = None) -> Dict[TradePair, List[PriceSource]]:
def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair], time_ms: int, live=True) -> Dict[TradePair, List[PriceSource]]:
"""
Retrieves the latest prices for multiple trade pairs, leveraging both WebSocket and REST APIs as needed.
"""
if not trade_pair_to_last_order_time_ms:
current_time_ms = TimeUtil.now_in_millis()
trade_pair_to_last_order_time_ms = {tp: current_time_ms for tp in trade_pairs}
websocket_prices_polygon = self.polygon_data_service.get_closes_websocket(trade_pairs=trade_pairs,
trade_pair_to_last_order_time_ms=trade_pair_to_last_order_time_ms)
websocket_prices_tiingo_data = self.tiingo_data_service.get_closes_websocket(trade_pairs=trade_pairs,
trade_pair_to_last_order_time_ms=trade_pair_to_last_order_time_ms)
if not time_ms:
time_ms = TimeUtil.now_in_millis()

websocket_prices_polygon = self.polygon_data_service.get_closes_websocket(trade_pairs, time_ms)
websocket_prices_tiingo_data = self.tiingo_data_service.get_closes_websocket(trade_pairs, time_ms)
trade_pairs_needing_rest_data = []

results = {}

# Initial check using WebSocket data
for trade_pair in trade_pairs:
current_time_ms = trade_pair_to_last_order_time_ms[trade_pair]
events = [websocket_prices_polygon.get(trade_pair), websocket_prices_tiingo_data.get(trade_pair)]
sources = self.sorted_valid_price_sources(events, current_time_ms, filter_recent_only=True)
sources = self.sorted_valid_price_sources(events, time_ms, filter_recent_only=True)
if sources:
results[trade_pair] = sources
else:
Expand All @@ -127,16 +121,15 @@ def get_tp_to_sorted_price_sources(self, trade_pairs: List[TradePair],
if not trade_pairs_needing_rest_data:
return results

rest_prices_polygon, rest_prices_tiingo_data = self.dual_rest_get(trade_pairs_needing_rest_data)
rest_prices_polygon, rest_prices_tiingo_data = self.dual_rest_get(trade_pairs_needing_rest_data, time_ms, live)

for trade_pair in trade_pairs_needing_rest_data:
current_time_ms = trade_pair_to_last_order_time_ms[trade_pair]
sources = self.sorted_valid_price_sources([
websocket_prices_polygon.get(trade_pair),
websocket_prices_tiingo_data.get(trade_pair),
rest_prices_polygon.get(trade_pair),
rest_prices_tiingo_data.get(trade_pair)
], current_time_ms, filter_recent_only=False)
], time_ms, filter_recent_only=False)
results[trade_pair] = sources

return results
Expand Down Expand Up @@ -288,12 +281,11 @@ def get_close_at_date(self, trade_pair, timestamp_ms, order=None, verbose=True):
f"Fell back to Polygon get_date_minute_fallback for price of {trade_pair.trade_pair} at {TimeUtil.timestamp_ms_to_eastern_time_str(timestamp_ms)}, price_source: {price_source}")

if price_source is None:
price_source = self.tiingo_data_service.get_close_rest(trade_pair=trade_pair, target_time_ms=timestamp_ms)
price_source = self.tiingo_data_service.get_close_rest(trade_pair=trade_pair, timestamp_ms=timestamp_ms, live=False)
if verbose and price_source is not None:
bt.logging.warning(
f"Fell back to Tiingo get_date for price of {trade_pair.trade_pair} at {TimeUtil.timestamp_ms_to_eastern_time_str(timestamp_ms)}, ms: {timestamp_ms}")


"""
if price is None:
price, time_delta = self.polygon_data_service.get_close_in_past_hour_fallback(trade_pair=trade_pair,
Expand Down
2 changes: 1 addition & 1 deletion vali_objects/utils/mdd_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_sorted_price_sources(self, hotkey_positions) -> Dict[TradePair, List[Pri
required_trade_pairs_for_candles.add(tp)

now = TimeUtil.now_in_millis()
trade_pair_to_price_sources = self.live_price_fetcher.get_tp_to_sorted_price_sources(list(required_trade_pairs_for_candles))
trade_pair_to_price_sources = self.live_price_fetcher.get_tp_to_sorted_price_sources(list(required_trade_pairs_for_candles), now)
#bt.logging.info(f"Got candle data for {len(candle_data)} {candle_data}")
for tp, sources in trade_pair_to_price_sources.items():
if sources and any(x and not x.websocket for x in sources):
Expand Down
2 changes: 1 addition & 1 deletion vali_objects/utils/position_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ def close_open_orders_for_suspended_trade_pairs(self):
if position.is_closed_position:
continue
if position.trade_pair in tps_to_eliminate:
price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(position.trade_pair, TARGET_MS)
price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(position.trade_pair, time_ms=TARGET_MS, live=False)
live_price = price_sources[0].parse_appropriate_price(TARGET_MS, position.trade_pair.is_forex, OrderType.FLAT, position)
flat_order = Order(price=live_price,
price_sources=price_sources,
Expand Down
5 changes: 3 additions & 2 deletions vali_objects/utils/price_slippage_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def get_features(cls, trade_pairs: list[TradePair], processed_ms: int, adv_lookb
try:
bars_df = cls.get_bars_with_features(trade_pair, processed_ms, adv_lookback_window, calc_vol_window)
row_selected = bars_df.iloc[-1]
annualized_volatility = row_selected['annualized_vol']
annualized_volatility = row_selected['annualized_vol'] # recalculate slippage false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure what this comment means

avg_daily_volume = row_selected[f'adv_last_{adv_lookback_window}_days']

tp_to_vol[trade_pair.trade_pair_id] = annualized_volatility
Expand Down Expand Up @@ -305,12 +305,13 @@ def update_historical_slippage(self, positions_at_t_f):
break

bt.logging.info(f"updating order attributes {o}")

bid = o.bid
ask = o.ask

if self.fetch_slippage_data:

price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(trade_pair=o.trade_pair, time_ms=o.processed_ms)
price_sources = self.live_price_fetcher.get_sorted_price_sources_for_trade_pair(trade_pair=o.trade_pair, time_ms=o.processed_ms, live=False)
if not price_sources:
raise ValueError(
f"Ignoring order for [{hk}] due to no live prices being found for trade_pair [{o.trade_pair}]. Please try again.")
Expand Down
7 changes: 6 additions & 1 deletion vali_objects/vali_dataclasses/price_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import bittensor as bt
from typing import Optional
from pydantic import BaseModel
from time_util.time_util import TimeUtil

from vali_objects.enums.order_type_enum import OrderType

Expand Down Expand Up @@ -55,14 +56,18 @@ def end_ms(self):
def get_start_time_ms(self):
return self.start_ms

def time_delta_from_now_ms(self, now_ms: int) -> int:
def time_delta_from_now_ms(self, now_ms:int = None) -> int:
if not now_ms:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix. this could have caused errors

now_ms = TimeUtil.now_in_millis()
if self.websocket:
return abs(now_ms - self.start_ms)
else:
return min(abs(now_ms - self.start_ms),
abs(now_ms - self.end_ms))

def parse_best_best_price_legacy(self, now_ms: int):
if not now_ms:
now_ms = TimeUtil.now_in_millis()
if self.websocket:
return self.open
else:
Expand Down
4 changes: 4 additions & 0 deletions vali_objects/vali_dataclasses/recent_event_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def get_closest_event(self, timestamp_ms):
#print(f"Looking for event at {TimeUtil.millis_to_formatted_date_str(timestamp_ms)}")
if self.count_events() == 0:
return None

# Find the event closest to the given timestamp
if not timestamp_ms:
timestamp_ms = TimeUtil.now_in_millis()

idx = self.events.bisect_left((timestamp_ms,))
if idx == 0:
return self.events[0][1]
Expand Down
Loading