Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.8
python-version: 3.12
cache: 'pip'

- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.8" ]
python-version: [ "3.12" ]

steps:
# Check out the repo with credentials that can bypass branch protection, and fetch git history instead of just latest commit
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.8", "3.12" ]
python-version: [ "3.12" ]

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{matrix.os}}
strategy:
matrix:
python-version: [ "3.8", "3.12" ]
python-version: [ "3.12" ]
os: [ubuntu-latest, windows-latest]

steps:
Expand Down
61 changes: 53 additions & 8 deletions devcycle_python_sdk/managers/config_manager.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import json
import logging
import threading
import time
import logging
import json
from datetime import datetime
from typing import Optional

from devcycle_python_sdk.options import DevCycleLocalOptions
from devcycle_python_sdk.api.local_bucketing import LocalBucketing
import ld_eventsource.actions

from devcycle_python_sdk.api.config_client import ConfigAPIClient
from devcycle_python_sdk.api.local_bucketing import LocalBucketing
from devcycle_python_sdk.exceptions import (
CloudClientUnauthorizedError,
CloudClientError,
)
from wsgiref.handlers import format_date_time
from devcycle_python_sdk.options import DevCycleLocalOptions
from devcycle_python_sdk.managers.sse_manager import SSEManager

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +32,9 @@ def __init__(
self._sdk_key = sdk_key
self._options = options
self._local_bucketing = local_bucketing

self._sse_manager: Optional[SSEManager] = None
self._sse_polling_interval = 1000 * 60 * 15 * 60
self._sse_connected = False
self._config: Optional[dict] = None
self._config_etag: Optional[str] = None
self._config_lastmodified: Optional[str] = None
Expand All @@ -41,10 +48,15 @@ def __init__(
def is_initialized(self) -> bool:
return self._config is not None

def _get_config(self):
def _get_config(self, last_modified: Optional[float] = None):
try:
lm_header = self._config_lastmodified
if last_modified is not None:
lm_timestamp = datetime.fromtimestamp(last_modified)
lm_header = format_date_time(time.mktime(lm_timestamp.timetuple()))

new_config, new_etag, new_lastmodified = self._config_api_client.get_config(
config_etag=self._config_etag, last_modified=self._config_lastmodified
config_etag=self._config_etag, last_modified=lm_header
)

# Abort early if the last modified is before the sent one.
Expand All @@ -68,6 +80,14 @@ def _get_config(self):

json_config = json.dumps(self._config)
self._local_bucketing.store_config(json_config)
if self._options.enable_beta_realtime_updates:
if self._sse_manager is None:
self._sse_manager = SSEManager(
self.sse_state,
self.sse_error,
self.sse_message,
)
self._sse_manager.update(self._config)

if (
trigger_on_client_initialized
Expand Down Expand Up @@ -98,7 +118,32 @@ def run(self):
logger.warning(
f"DevCycle: Error polling for config changes: {str(e)}"
)
time.sleep(self._options.config_polling_interval_ms / 1000.0)
if self._sse_connected:
time.sleep(self._sse_polling_interval / 1000.0)
else:
time.sleep(self._options.config_polling_interval_ms / 1000.0)

def sse_message(self, message: ld_eventsource.actions.Event):
if self._sse_connected is False:
self._sse_connected = True
logger.info("DevCycle: Connected to SSE stream")
logger.info(f"DevCycle: Received message: {message.data}")
sse_message = json.loads(message.data)
dvc_data = json.loads(sse_message.get("data"))
if (
dvc_data.get("type") == "refetchConfig"
or dvc_data.get("type") == ""
or dvc_data.get("type") is None
):
logger.info("DevCycle: Received refetchConfig message - updating config")
self._get_config(dvc_data["lastModified"])

def sse_error(self, error: ld_eventsource.actions.Fault):
logger.warning(f"DevCycle: Received SSE error: {error}")

def sse_state(self, state: ld_eventsource.actions.Start):
self._sse_connected = True
logger.info("DevCycle: Connected to SSE stream")

def close(self):
self._polling_enabled = False
63 changes: 63 additions & 0 deletions devcycle_python_sdk/managers/sse_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import threading

import ld_eventsource
import ld_eventsource.actions
import ld_eventsource.config
from typing import Callable


class SSEManager:
def __init__(
self,
handlestate: Callable[[ld_eventsource.actions.Start], None],
handleerror: Callable[[ld_eventsource.actions.Fault], None],
handlemessage: Callable[[ld_eventsource.actions.Event], None],
):
self.client: ld_eventsource.SSEClient = None
self.url = ""
self.handlestate = handlestate
self.handleerror = handleerror
self.handlemessage = handlemessage

self.read_thread = threading.Thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the threading here work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSE Manager itself creates a thread managed by it - when a new url is found - it creates/updates the current thread used for processing the events incoming.

This is mostly required due to the eventsource client must run in its own thread - and you must read from the same thread that you started the client in.

target=self.read_events,
args=(self.handlestate, self.handleerror, self.handlemessage),
)

def read_events(
self,
handlestate: Callable[[ld_eventsource.actions.Start], None],
handleerror: Callable[[ld_eventsource.actions.Fault], None],
handlemessage: Callable[[ld_eventsource.actions.Event], None],
):
self.client.start()
for event in self.client.all:
if isinstance(event, ld_eventsource.actions.Start):
handlestate(event)
elif isinstance(event, ld_eventsource.actions.Fault):
handleerror(event)
elif isinstance(event, ld_eventsource.actions.Event):
handlemessage(event)

def update(self, config: dict):
if self.use_new_config(config["sse"]):
self.url = config["sse"]["hostname"] + config["sse"]["path"]
if self.client is not None:
self.client.close()
if self.read_thread.is_alive():
self.read_thread.join()
self.client = ld_eventsource.SSEClient(
connect=ld_eventsource.config.ConnectStrategy.http(self.url),
error_strategy=ld_eventsource.config.ErrorStrategy.CONTINUE,
)
self.read_thread = threading.Thread(
target=self.read_events,
args=(self.handlestate, self.handleerror, self.handlemessage),
)
self.read_thread.start()

def use_new_config(self, config: dict) -> bool:
new_url = config["hostname"] + config["path"]
if self.url == "" or self.url is None and new_url != "":
return True
return self.url != new_url
2 changes: 2 additions & 0 deletions devcycle_python_sdk/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(
event_retry_delay_ms: int = 200, # milliseconds
disable_automatic_event_logging: bool = False,
disable_custom_event_logging: bool = False,
enable_beta_realtime_updates: bool = False,
):
self.events_api_uri = events_api_uri
self.config_cdn_uri = config_cdn_uri
Expand All @@ -60,6 +61,7 @@ def __init__(
self.on_client_initialized = on_client_initialized
self.event_request_timeout_ms = event_request_timeout_ms
self.event_retry_delay_ms = event_retry_delay_ms
self.enable_beta_realtime_updates = enable_beta_realtime_updates

if self.flush_event_queue_size >= self.max_event_queue_size:
logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion example/local_bucketing_client_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def main():

# create an instance of the DevCycle Client object
server_sdk_key = os.environ["DEVCYCLE_SERVER_SDK_KEY"]
options = DevCycleLocalOptions()
options = DevCycleLocalOptions(enable_beta_realtime_updates=True)
client = DevCycleLocalClient(server_sdk_key, options)

# Wait for DevCycle to initialize and load the configuration
Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ ignore_missing_imports = true
module = 'test.openfeature.*'
ignore_errors = true

[[tool.mypy.overrides]]
module = 'ld_eventsource.*'
ignore_errors = true
ignore_missing_imports = true


[[tool.mypy.overrides]]
module = 'openfeature.*'
ignore_errors = true
Expand Down
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ urllib3 >= 1.15.1
requests ~= 2.31
wasmtime == 23.0.0
protobuf >= 4.23.3
openfeature-sdk >= 0.7.0
openfeature-sdk >= 0.7.0
launchdarkly-eventsource >= 1.2.0
responses~=0.23.1
dataclasses~=0.6
Loading