Skip to content

Commit 4345689

Browse files
committed
Setup event loop refactor
1 parent eb0ce7e commit 4345689

File tree

9 files changed

+313
-94
lines changed

9 files changed

+313
-94
lines changed
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
from typing import List
2+
from datetime import datetime, timedelta, timezone
3+
from investing_algorithm_framework.domain import TimeUnit, ENVIRONMENT, \
4+
Environment, BACKTESTING_INDEX_DATETIME
5+
from .strategy import TradingStrategy
6+
7+
8+
class EventLoopService:
9+
"""
10+
A service that manages the event loop for the trading bot.
11+
This service is responsible for running the trading strategy and handling
12+
events in its lifecycle, such as pending orders changes, stop loss changes,
13+
take profit changes, and price data updates.
14+
15+
The event loop runs strategies in a so called interation, which consists
16+
out of various tasks. An iteration consists out of the following tasks:
17+
18+
- Collect all strategies and tasks that need to be
19+
run (overdue on schedule)
20+
- Collect all market data for the strategies
21+
- Check all pending orders and update their status if needed
22+
- Check all stop loss orders and update their status if needed
23+
- Check all take profit orders and update their status if needed
24+
- Run all tasks
25+
- Run all strategies
26+
- Run all on_strategy_run hooks
27+
- Snapshot the portfolios based on the defined snapshot interval
28+
29+
The goal of this service is to provide a way to run the trading in the
30+
most efficient way possible in both live trading and backtesting. This
31+
is achieved by running strategies and tasks in a loop, where each
32+
iteration checks which strategies and tasks are due to run based on their
33+
defined intervals and time units (seconds, minutes, hours). The next run
34+
times for each strategy are initialized to the current time in UTC.
35+
The service also collects all data configurations from the strategies and
36+
tasks, and runs them in a single iteration to avoid multiple calls to the
37+
data provider service, which can be expensive in terms of performance.
38+
"""
39+
40+
def __init__(
41+
self,
42+
order_service,
43+
portfolio_service,
44+
configuration_service,
45+
data_provider_service=None
46+
):
47+
"""
48+
Initializes the event loop service with the provided algorithm.
49+
50+
Args:
51+
order_service: The service responsible for managing orders.
52+
portfolio_service: The service responsible for managing portfolios.
53+
configuration_service: The service responsible for configuration.
54+
"""
55+
self.tasks = []
56+
self._algorithm = None
57+
self._strategies = []
58+
self._order_service = order_service
59+
self._portfolio_service = portfolio_service
60+
self._configuration_service = configuration_service
61+
self._data_provider_service = data_provider_service
62+
self._data_configurations = []
63+
self.next_run_times = {}
64+
65+
def _get_due_strategies(self):
66+
"""
67+
Checks which strategies are due to run based on their defined intervals
68+
Returns:
69+
70+
"""
71+
environment = self._configuration_service.config[ENVIRONMENT]
72+
73+
if Environment.BACKTEST.equals(environment):
74+
now = self._configuration_service\
75+
.config[BACKTESTING_INDEX_DATETIME]
76+
else:
77+
now = datetime.now(timezone.utc)
78+
79+
due = []
80+
81+
for strategy in self._strategies:
82+
interval = timedelta(
83+
**{strategy.time_unit.value.lower(): strategy.interval}
84+
)
85+
86+
if now >= self.next_run_times[strategy]:
87+
due.append(strategy)
88+
self.next_run_times[strategy] = now + interval
89+
90+
return due
91+
92+
def initialize(self, algorithm):
93+
"""
94+
Initializes the event loop service by calculating the schedule for
95+
running strategies and tasks based on their defined intervals and time
96+
units (seconds, minutes, hours).
97+
98+
The next run times for each strategy are initialized to the current
99+
time in UTC.
100+
101+
Args:
102+
algorithm: The trading algorithm to be managed by this service.
103+
104+
Returns:
105+
None
106+
"""
107+
108+
self._algorithm = algorithm
109+
self._strategies = algorithm.strategies
110+
self.next_run_times = {
111+
strategy.identifier: {
112+
"next_run": datetime.now(timezone.utc)
113+
"data_configurations": strategy.data_configurations
114+
}
115+
for strategy in self._strategies
116+
}
117+
118+
# Collect all data configurations
119+
for strategy in self._strategies:
120+
self._data_configurations.append(
121+
strategy.data_configurations
122+
)
123+
124+
def start(self, number_of_iterations=None):
125+
"""
126+
Runs the event loop for the trading algorithm.
127+
128+
Args:
129+
number_of_iterations: Optional; the number of iterations to run.
130+
If None, runs indefinitely.
131+
"""
132+
pass
133+
134+
def _run_iteration_backtest(self):
135+
"""
136+
Runs a single iteration of the event loop in backtesting mode.
137+
This method collects all due strategies, fetches their data
138+
configurations, and runs the strategies with the collected data.
139+
140+
Returns:
141+
None
142+
"""
143+
due_strategies = self._get_due_strategies()
144+
145+
if not due_strategies:
146+
return
147+
148+
# Step 1: Collect all data
149+
data_configurations = []
150+
151+
for strategy in due_strategies:
152+
data_configurations.extend(strategy.data_configurations)
153+
154+
# Make sure we have unique data configurations
155+
data_configurations = list(set(data_configurations))
156+
data_object = {}
157+
158+
for data_config in data_configurations:
159+
data_object[data_config.identifier] = \
160+
self._data_provider_service.get_backest_data(
161+
symbol=data_config.symbol,
162+
data_type=data_config.data_type,
163+
date=data_config.date,
164+
market=data_config.market,
165+
time_frame=data_config.time_frame,
166+
start_date=data_config.start_date,
167+
end_date=data_config.end_date,
168+
window_size=data_config.window_size,
169+
pandas=data_config.pandas,
170+
)
171+
172+
# Step 2: Update prices of trades
173+
174+
# Step 3: Check pending orders
175+
176+
# Step 4: Check stop loss orders
177+
178+
# Step 5: Check take profit orders
179+
180+
# Step 6: Run all tasks
181+
182+
# Step 7: Run all strategies
183+
184+
def _run_iteration(self):
185+
"""
186+
Runs a single iteration of the event loop. This method collects all
187+
due strategies, fetches their data configurations, and runs the
188+
strategies with the collected data. It also checks for pending orders,
189+
stop loss orders, and take profit orders, and updates their status if
190+
needed. Finally, it runs all tasks and strategies, and takes a snapshot
191+
of the portfolios if needed.
192+
193+
Returns:
194+
None
195+
"""
196+
due_strategies = self._get_due_strategies()
197+
198+
if not due_strategies:
199+
return
200+
201+
# Step 1: Collect all data
202+
data_configurations = []
203+
204+
for strategy in due_strategies:
205+
data_configurations.extend(strategy.data_configurations)
206+
207+
# Make sure we have unique data configurations
208+
data_configurations = list(set(data_configurations))
209+
data_object = {}
210+
211+
for data_config in data_configurations:
212+
data_object[data_config.identifier] = \
213+
self._data_provider_service.get_data(
214+
symbol=data_config.symbol,
215+
data_type=data_config.data_type,
216+
date=data_config.date,
217+
market=data_config.market,
218+
time_frame=data_config.time_frame,
219+
start_date=data_config.start_date,
220+
end_date=data_config.end_date,
221+
window_size=data_config.window_size,
222+
pandas=data_config.pandas,
223+
)
224+
225+
# Step 2: Update prices of trades
226+
227+
# Step 3: Check pending orders
228+
229+
# Step 4: Check stop loss orders
230+
231+
# Step 5: Check take profit orders
232+
233+
# Step 6: Run all tasks
234+
235+
# Step 7: Run all strategies

investing_algorithm_framework/domain/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
PortfolioConfiguration, Portfolio, Position, Order, TradeStatus, \
2121
BacktestResult, PortfolioSnapshot, StrategyProfile, \
2222
BacktestPosition, Trade, MarketCredential, PositionSnapshot, \
23-
AppMode, BacktestDateRange, DateRange, \
23+
AppMode, BacktestDateRange, DataType, DataSource, \
2424
MarketDataType, TradeRiskType, TradeTakeProfit, TradeStopLoss, \
2525
DataSource, Event, SnapshotInterval
2626
from .order_executor import OrderExecutor
@@ -113,7 +113,6 @@
113113
"RoundingService",
114114
"BacktestDateRange",
115115
"convert_polars_to_pandas",
116-
"DateRange",
117116
"DEFAULT_LOGGING_CONFIG",
118117
"DATABASE_DIRECTORY_NAME",
119118
"BACKTESTING_INITIAL_AMOUNT",
@@ -138,5 +137,7 @@
138137
"SNAPSHOT_INTERVAL",
139138
"SnapshotInterval",
140139
"AWS_S3_STATE_BUCKET_NAME",
141-
"AWS_LAMBDA_LOGGING_CONFIG"
140+
"AWS_LAMBDA_LOGGING_CONFIG",
141+
"DataType",
142+
"DataSource",
142143
]

investing_algorithm_framework/domain/models/__init__.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
TradeRiskType
1414
from .trading_data_types import TradingDataType
1515
from .trading_time_frame import TradingTimeFrame
16-
from .date_range import DateRange
1716
from .market_data_type import MarketDataType
18-
from .data_source import DataSource
1917
from .snapshot_interval import SnapshotInterval
2018
from .event import Event
19+
from .data import DataSource, DataType
2120

2221
__all__ = [
2322
"OrderStatus",
@@ -40,10 +39,10 @@
4039
"Trade",
4140
"MarketCredential",
4241
"TradeStatus",
43-
"BacktestReportsEvaluation",
42+
"DataType",
4443
"AppMode",
4544
"BacktestDateRange",
46-
"DateRange",
45+
"DataSource",
4746
"MarketDataType",
4847
"TradeStopLoss",
4948
"TradeTakeProfit",
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from .data_source import DataSource
2+
from .data_type import DataType
3+
4+
__all__ = [
5+
"DataSource",
6+
"DataType",
7+
]
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from dataclasses import dataclass
2+
from .data_type import DataType
3+
4+
@dataclass()
5+
class DataSource:
6+
"""
7+
Base class for data sources.
8+
"""
9+
identifier: str = None
10+
data_type: DataType = None
11+
symbol: str = None
12+
window_size: int = None
13+
time_frame: str = None
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from enum import Enum
2+
3+
4+
class DataType(Enum):
5+
OHLCV = "OHLCV"
6+
TICKER = "TICKER"
7+
ORDER_BOOK = "ORDER_BOOK"
8+
CUSTOM = "CUSTOM"
9+
10+
@staticmethod
11+
def from_string(value: str):
12+
13+
if isinstance(value, str):
14+
15+
for entry in DataType:
16+
17+
if value.upper() == entry.value:
18+
return entry
19+
20+
raise ValueError(
21+
f"Could not convert {value} to DataType"
22+
)
23+
24+
@staticmethod
25+
def from_value(value):
26+
27+
if isinstance(value, str):
28+
return DataType.from_string(value)
29+
30+
if isinstance(value, DataType):
31+
32+
for entry in DataType:
33+
34+
if value == entry:
35+
return entry
36+
37+
raise ValueError(
38+
f"Could not convert {value} to TimeFrame"
39+
)
40+
41+
def equals(self, other):
42+
43+
if isinstance(other, Enum):
44+
return self.value == other.value
45+
else:
46+
return DataType.from_string(other) == self

investing_algorithm_framework/domain/models/data_source.py

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)