1
+ """
2
+ Graceful Degradation Manager
3
+ Handles service failures and provides fallback functionality
4
+ """
5
+
6
+ from enum import IntEnum
7
+ from typing import Dict , Any , Optional , List
8
+ import logging
9
+ from datetime import datetime , timedelta
10
+
11
+ logger = logging .getLogger (__name__ )
12
+
13
+
14
+ class DegradationLevel (IntEnum ):
15
+ """Service degradation levels (lower is better)"""
16
+ FULL = 0 # All features available
17
+ NO_VECTOR = 1 # Qdrant down, keyword search only
18
+ NO_PERSISTENCE = 2 # Storage issues, memory-only mode
19
+ READONLY = 3 # Critical issues, read-only mode
20
+ MAINTENANCE = 4 # Maintenance mode, minimal functionality
21
+
22
+
23
+ class ServiceStatus :
24
+ """Track individual service status"""
25
+
26
+ def __init__ (self , name : str ):
27
+ self .name = name
28
+ self .is_healthy = True
29
+ self .last_check = datetime .now ()
30
+ self .consecutive_failures = 0
31
+ self .error_message : Optional [str ] = None
32
+ self .retry_after : Optional [datetime ] = None
33
+
34
+ def mark_healthy (self ):
35
+ """Mark service as healthy"""
36
+ self .is_healthy = True
37
+ self .consecutive_failures = 0
38
+ self .error_message = None
39
+ self .retry_after = None
40
+ self .last_check = datetime .now ()
41
+
42
+ def mark_unhealthy (self , error : str ):
43
+ """Mark service as unhealthy with exponential backoff"""
44
+ self .is_healthy = False
45
+ self .consecutive_failures += 1
46
+ self .error_message = error
47
+ self .last_check = datetime .now ()
48
+
49
+ # Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, max 60s
50
+ backoff_seconds = min (2 ** self .consecutive_failures , 60 )
51
+ self .retry_after = datetime .now () + timedelta (seconds = backoff_seconds )
52
+
53
+ def should_retry (self ) -> bool :
54
+ """Check if we should retry this service"""
55
+ if self .is_healthy :
56
+ return False
57
+ if self .retry_after and datetime .now () < self .retry_after :
58
+ return False
59
+ return True
60
+
61
+
62
+ class DegradationManager :
63
+ """Manages graceful degradation of services"""
64
+
65
+ def __init__ (self ):
66
+ self .services : Dict [str , ServiceStatus ] = {
67
+ "qdrant" : ServiceStatus ("qdrant" ),
68
+ "persistence" : ServiceStatus ("persistence" ),
69
+ "openai" : ServiceStatus ("openai" ),
70
+ "anthropic" : ServiceStatus ("anthropic" ),
71
+ }
72
+ self .current_level = DegradationLevel .FULL
73
+ self .features_disabled : List [str ] = []
74
+
75
+ def check_service (self , name : str , check_func ) -> bool :
76
+ """
77
+ Check if a service is available
78
+
79
+ Args:
80
+ name: Service name
81
+ check_func: Function that returns True if healthy, raises exception if not
82
+
83
+ Returns:
84
+ True if service is healthy
85
+ """
86
+ if name not in self .services :
87
+ logger .warning (f"Unknown service: { name } " )
88
+ return False
89
+
90
+ service = self .services [name ]
91
+
92
+ # Skip if in backoff period
93
+ if not service .should_retry ():
94
+ return service .is_healthy
95
+
96
+ try :
97
+ # Attempt health check
98
+ if check_func ():
99
+ service .mark_healthy ()
100
+ logger .info (f"Service { name } is healthy" )
101
+ return True
102
+ else :
103
+ raise Exception ("Health check returned False" )
104
+ except Exception as e :
105
+ service .mark_unhealthy (str (e ))
106
+ logger .warning (
107
+ f"Service { name } unhealthy (attempt { service .consecutive_failures } ): { e } "
108
+ )
109
+ return False
110
+
111
+ def update_degradation_level (self ):
112
+ """Update the current degradation level based on service status"""
113
+ old_level = self .current_level
114
+
115
+ # Determine new level based on service availability
116
+ if not self .services ["persistence" ].is_healthy :
117
+ self .current_level = DegradationLevel .NO_PERSISTENCE
118
+ self .features_disabled = ["persistence" , "export" , "import" ]
119
+
120
+ elif not self .services ["qdrant" ].is_healthy :
121
+ self .current_level = DegradationLevel .NO_VECTOR
122
+ self .features_disabled = ["semantic_search" , "embeddings" ]
123
+
124
+ elif not (self .services ["openai" ].is_healthy or
125
+ self .services ["anthropic" ].is_healthy ):
126
+ self .current_level = DegradationLevel .READONLY
127
+ self .features_disabled = ["ai_features" , "auto_tagging" , "importance_scoring" ]
128
+
129
+ else :
130
+ self .current_level = DegradationLevel .FULL
131
+ self .features_disabled = []
132
+
133
+ # Log level changes
134
+ if old_level != self .current_level :
135
+ if self .current_level > old_level :
136
+ logger .warning (
137
+ f"Degradation level increased: { old_level .name } -> { self .current_level .name } "
138
+ )
139
+ else :
140
+ logger .info (
141
+ f"Degradation level improved: { old_level .name } -> { self .current_level .name } "
142
+ )
143
+
144
+ def is_feature_available (self , feature : str ) -> bool :
145
+ """Check if a feature is available at current degradation level"""
146
+ return feature not in self .features_disabled
147
+
148
+ def get_status (self ) -> Dict [str , Any ]:
149
+ """Get current degradation status"""
150
+ return {
151
+ "level" : self .current_level .name ,
152
+ "level_value" : self .current_level .value ,
153
+ "services" : {
154
+ name : {
155
+ "healthy" : service .is_healthy ,
156
+ "last_check" : service .last_check .isoformat (),
157
+ "consecutive_failures" : service .consecutive_failures ,
158
+ "error" : service .error_message ,
159
+ "retry_after" : service .retry_after .isoformat () if service .retry_after else None
160
+ }
161
+ for name , service in self .services .items ()
162
+ },
163
+ "features_disabled" : self .features_disabled ,
164
+ "can_write" : self .current_level < DegradationLevel .READONLY ,
165
+ "can_persist" : self .current_level < DegradationLevel .NO_PERSISTENCE ,
166
+ "can_use_ai" : self .current_level < DegradationLevel .READONLY ,
167
+ "can_vector_search" : self .current_level == DegradationLevel .FULL
168
+ }
169
+
170
+ async def perform_health_checks (self ):
171
+ """Perform health checks on all services"""
172
+ # Check persistence
173
+ self .check_service ("persistence" , self ._check_persistence )
174
+
175
+ # Check Qdrant (if configured)
176
+ if self ._is_qdrant_configured ():
177
+ self .check_service ("qdrant" , self ._check_qdrant )
178
+
179
+ # Check AI services
180
+ self .check_service ("openai" , self ._check_openai )
181
+ self .check_service ("anthropic" , self ._check_anthropic )
182
+
183
+ # Update degradation level
184
+ self .update_degradation_level ()
185
+
186
+ def _check_persistence (self ) -> bool :
187
+ """Check if persistence is available"""
188
+ import os
189
+ persist_path = os .getenv ("MEMORY_PERSIST_PATH" , "/data" )
190
+
191
+ # Check if path exists and is writable
192
+ if os .path .exists (persist_path ):
193
+ test_file = os .path .join (persist_path , ".write_test" )
194
+ try :
195
+ with open (test_file , "w" ) as f :
196
+ f .write ("test" )
197
+ os .remove (test_file )
198
+ return True
199
+ except :
200
+ return False
201
+ else :
202
+ # Try to create directory
203
+ try :
204
+ os .makedirs (persist_path , exist_ok = True )
205
+ return True
206
+ except :
207
+ return False
208
+
209
+ def _is_qdrant_configured (self ) -> bool :
210
+ """Check if Qdrant is configured"""
211
+ import os
212
+ return bool (os .getenv ("QDRANT_URL" ))
213
+
214
+ def _check_qdrant (self ) -> bool :
215
+ """Check if Qdrant is available"""
216
+ import os
217
+ import requests
218
+
219
+ qdrant_url = os .getenv ("QDRANT_URL" , "http://localhost:6333" )
220
+ try :
221
+ response = requests .get (f"{ qdrant_url } /collections" , timeout = 2 )
222
+ return response .status_code == 200
223
+ except :
224
+ return False
225
+
226
+ def _check_openai (self ) -> bool :
227
+ """Check if OpenAI API is available"""
228
+ import os
229
+ return bool (os .getenv ("OPENAI_API_KEY" ))
230
+
231
+ def _check_anthropic (self ) -> bool :
232
+ """Check if Anthropic API is available"""
233
+ import os
234
+ return bool (os .getenv ("ANTHROPIC_API_KEY" ))
235
+
236
+
237
+ # Global degradation manager instance
238
+ _degradation_manager : Optional [DegradationManager ] = None
239
+
240
+
241
+ def get_degradation_manager () -> DegradationManager :
242
+ """Get or create the global degradation manager"""
243
+ global _degradation_manager
244
+ if _degradation_manager is None :
245
+ _degradation_manager = DegradationManager ()
246
+ return _degradation_manager
0 commit comments