1919import com .tencent .polaris .api .exception .PolarisException ;
2020import com .tencent .polaris .api .listener .ServiceListener ;
2121import com .tencent .polaris .api .pojo .Instance ;
22- import com .tencent .polaris .api .pojo .ServiceChangeEvent ;
2322import com .tencent .polaris .api .utils .StringUtils ;
24- import com .tencent .polaris .client .util .NamedThreadFactory ;
2523import com .tencent .polaris .common .registry .BaseBootConfigHandler ;
2624import com .tencent .polaris .common .registry .BootConfigHandler ;
2725import com .tencent .polaris .common .registry .Consts ;
2826import com .tencent .polaris .common .registry .ConvertUtils ;
2927import com .tencent .polaris .common .registry .PolarisOperator ;
3028import com .tencent .polaris .common .registry .PolarisOperators ;
31- import com .tencent .polaris .common .utils .ExtensionConsts ;
29+ import org .apache .dubbo .common .URL ;
30+ import org .apache .dubbo .common .URLBuilder ;
31+ import org .apache .dubbo .common .constants .CommonConstants ;
32+ import org .apache .dubbo .common .utils .CollectionUtils ;
33+ import org .apache .dubbo .common .utils .ConcurrentHashSet ;
34+ import org .apache .dubbo .registry .NotifyListener ;
35+ import org .apache .dubbo .registry .support .FailbackRegistry ;
36+ import org .apache .dubbo .rpc .cluster .Constants ;
37+ import org .slf4j .Logger ;
38+ import org .slf4j .LoggerFactory ;
39+
3240import java .util .ArrayList ;
3341import java .util .Collection ;
3442import java .util .Collections ;
3745import java .util .Map ;
3846import java .util .Set ;
3947import java .util .concurrent .ConcurrentHashMap ;
40- import java .util .concurrent .ExecutorService ;
41- import java .util .concurrent .Executors ;
4248import java .util .concurrent .atomic .AtomicBoolean ;
43- import org .apache .dubbo .common .URL ;
44- import org .apache .dubbo .common .constants .CommonConstants ;
45- import org .apache .dubbo .common .constants .RegistryConstants ;
46- import org .apache .dubbo .common .extension .ExtensionLoader ;
47- import org .apache .dubbo .common .utils .ConcurrentHashSet ;
48- import org .apache .dubbo .registry .NotifyListener ;
49- import org .apache .dubbo .registry .support .FailbackRegistry ;
50- import org .apache .dubbo .rpc .Filter ;
51- import org .apache .dubbo .rpc .cluster .Constants ;
52- import org .apache .dubbo .rpc .cluster .RouterFactory ;
53- import org .slf4j .Logger ;
54- import org .slf4j .LoggerFactory ;
49+
50+ import static org .apache .dubbo .common .constants .RegistryConstants .CATEGORY_KEY ;
51+ import static org .apache .dubbo .common .constants .RegistryConstants .DEFAULT_CATEGORY ;
52+ import static org .apache .dubbo .common .constants .RegistryConstants .EMPTY_PROTOCOL ;
5553
5654public class PolarisRegistry extends FailbackRegistry {
5755
5856 private static final Logger LOGGER = LoggerFactory .getLogger (PolarisRegistry .class );
5957
60- private static final TaskScheduler taskScheduler = new TaskScheduler ();
61-
6258 private final Set <URL > registeredInstances = new ConcurrentHashSet <>();
6359
6460 private final AtomicBoolean destroyed = new AtomicBoolean (false );
@@ -67,54 +63,22 @@ public class PolarisRegistry extends FailbackRegistry {
6763
6864 private final PolarisOperator polarisOperator ;
6965
70- private final boolean hasCircuitBreaker ;
71-
72- private final boolean hasRouter ;
73-
7466 public PolarisRegistry (URL url ) {
7567 this (url , new BaseBootConfigHandler ());
7668 }
7769
7870 // for test
7971 public PolarisRegistry (URL url , BootConfigHandler ... handlers ) {
8072 super (url );
81- polarisOperator = new PolarisOperator (url .getHost (), url .getPort (), url .getParameters (), handlers );
82- PolarisOperators .INSTANCE .addPolarisOperator (polarisOperator );
83- ExtensionLoader <RouterFactory > routerExtensionLoader = ExtensionLoader .getExtensionLoader (RouterFactory .class );
84- hasRouter = routerExtensionLoader .hasExtension (ExtensionConsts .PLUGIN_ROUTER_NAME );
85- ExtensionLoader <Filter > filterExtensionLoader = ExtensionLoader .getExtensionLoader (Filter .class );
86- hasCircuitBreaker = filterExtensionLoader .hasExtension (ExtensionConsts .PLUGIN_CIRCUITBREAKER_NAME );
87- }
88-
89- private URL buildRouterURL (URL consumerUrl ) {
90- URL routerURL = null ;
91- if (hasRouter ) {
92- URL registryURL = getUrl ();
93- routerURL = new URL (RegistryConstants .ROUTE_PROTOCOL , registryURL .getHost (), registryURL .getPort ());
94- routerURL = routerURL .setServiceInterface (CommonConstants .ANY_VALUE );
95- routerURL = routerURL .addParameter (Constants .ROUTER_KEY , ExtensionConsts .PLUGIN_ROUTER_NAME );
96- String consumerGroup = consumerUrl .getParameter (CommonConstants .GROUP_KEY );
97- String consumerVersion = consumerUrl .getParameter (CommonConstants .VERSION_KEY );
98- String consumerClassifier = consumerUrl .getParameter (CommonConstants .CLASSIFIER_KEY );
99- if (null != consumerGroup ) {
100- routerURL = routerURL .addParameter (CommonConstants .GROUP_KEY , consumerGroup );
101- }
102- if (null != consumerVersion ) {
103- routerURL = routerURL .addParameter (CommonConstants .VERSION_KEY , consumerVersion );
104- }
105- if (null != consumerClassifier ) {
106- routerURL = routerURL .addParameter (CommonConstants .CLASSIFIER_KEY , consumerClassifier );
107- }
108- }
109- return routerURL ;
73+ polarisOperator = PolarisOperators .INSTANCE .loadOrStore (url .getHost (), url .getPort (), url .getParameters (), handlers );
11074 }
11175
11276 @ Override
11377 public void doRegister (URL url ) {
11478 if (!shouldRegister (url )) {
11579 return ;
11680 }
117- LOGGER .info ("[POLARIS] register service to polaris: {}" , url . toString () );
81+ LOGGER .info ("[POLARIS] register service to polaris: {}" , url );
11882 Map <String , String > metadata = new HashMap <>(url .getParameters ());
11983 metadata .put (CommonConstants .PATH_KEY , url .getPath ());
12084 int port = url .getPort ();
@@ -138,7 +102,7 @@ public void doUnregister(URL url) {
138102 if (!shouldRegister (url )) {
139103 return ;
140104 }
141- LOGGER .info ("[POLARIS] unregister service from polaris: {}" , url . toString () );
105+ LOGGER .info ("[POLARIS] unregister service from polaris: {}" , url );
142106 int port = url .getPort ();
143107 if (port > 0 ) {
144108 polarisOperator .deregister (url .getServiceInterface (), url .getHost (), url .getPort ());
@@ -155,79 +119,27 @@ public void destroy() {
155119 doUnregister (url );
156120 }
157121 polarisOperator .destroy ();
158- taskScheduler .destroy ();
159122 }
160123 }
161124
162125 @ Override
163126 public void doSubscribe (URL url , NotifyListener listener ) {
164127 String service = url .getServiceInterface ();
165- Instance [] instances = polarisOperator .getAvailableInstances (service , ! hasCircuitBreaker );
128+ Instance [] instances = polarisOperator .getAvailableInstances (service , true );
166129 onInstances (url , listener , instances );
167130 LOGGER .info ("[POLARIS] submit watch task for service {}" , service );
168- taskScheduler .submitWatchTask (new WatchTask (url , listener , service ));
169- }
170-
171- private class WatchTask implements Runnable {
172-
173- private final String service ;
174-
175- private final ServiceListener serviceListener ;
176-
177- private final NotifyListener listener ;
178-
179- private final FetchTask fetchTask ;
180-
181- public WatchTask (URL url , NotifyListener listener ,
182- String service ) {
183- this .service = service ;
184- this .listener = listener ;
185- fetchTask = new FetchTask (url , listener );
186- serviceListener = new ServiceListener () {
187- @ Override
188- public void onEvent (ServiceChangeEvent event ) {
189- PolarisRegistry .taskScheduler .submitFetchTask (fetchTask );
131+ serviceListeners .computeIfAbsent (listener , notifyListener -> {
132+ ServiceListener serviceListener = event -> {
133+ try {
134+ Instance [] curInstances = polarisOperator .getAvailableInstances (service , true );
135+ onInstances (url , listener , curInstances );
136+ } catch (PolarisException e ) {
137+ LOGGER .error ("[POLARIS] fail to fetch instances for service {}: {}" , service , e .toString ());
190138 }
191139 };
192- }
193-
194- @ Override
195- public void run () {
196- boolean result = polarisOperator .watchService (service , serviceListener );
197- if (result ) {
198- serviceListeners .put (listener , serviceListener );
199- PolarisRegistry .taskScheduler .submitFetchTask (fetchTask );
200- return ;
201- }
202- PolarisRegistry .taskScheduler .submitWatchTask (this );
203- }
204- }
205-
206- private class FetchTask implements Runnable {
207-
208- private final String service ;
209-
210- private final URL url ;
211-
212- private final NotifyListener listener ;
213-
214- public FetchTask (URL url , NotifyListener listener ) {
215- this .service = url .getServiceInterface ();
216- this .url = url ;
217- this .listener = listener ;
218- }
219-
220- @ Override
221- public void run () {
222- Instance [] instances ;
223- try {
224- instances = polarisOperator .getAvailableInstances (service , !hasCircuitBreaker );
225- } catch (PolarisException e ) {
226- LOGGER .error ("[POLARIS] fail to fetch instances for service {}: {}" , service , e .toString ());
227- return ;
228- }
229- onInstances (url , listener , instances );
230- }
140+ polarisOperator .watchService (service , serviceListener );
141+ return serviceListener ;
142+ });
231143 }
232144
233145 private void onInstances (URL url , NotifyListener listener , Instance [] instances ) {
@@ -240,11 +152,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances)
240152 urls .add (instanceToURL (requireInterface , instance ));
241153 }
242154 }
243- URL routerURL = buildRouterURL (url );
244- if (null != routerURL ) {
245- urls .add (routerURL );
246- }
247- PolarisRegistry .this .notify (url , listener , urls );
155+ notify (url , listener , toUrlWithEmpty (url , urls ));
248156 }
249157
250158 private static URL instanceToURL (String requireInterface , Instance instance ) {
@@ -275,68 +183,25 @@ private static URL instanceToURL(String requireInterface, Instance instance) {
275183 newMetadata );
276184 }
277185
278- private static class TaskScheduler {
279-
280- private final ExecutorService fetchExecutor = Executors
281- .newSingleThreadExecutor (new NamedThreadFactory ("agent-fetch" ));
282-
283- private final ExecutorService watchExecutor = Executors
284- .newSingleThreadExecutor (new NamedThreadFactory ("agent-retry-watch" ));
285-
286- private final AtomicBoolean executorDestroyed = new AtomicBoolean (false );
287-
288- private final Object lock = new Object ();
289-
290- void submitFetchTask (Runnable fetchTask ) {
291- if (executorDestroyed .get ()) {
292- return ;
293- }
294- synchronized (lock ) {
295- if (executorDestroyed .get ()) {
296- return ;
297- }
298- fetchExecutor .submit (fetchTask );
299- }
300- }
301-
302- void submitWatchTask (Runnable watchTask ) {
303- if (executorDestroyed .get ()) {
304- return ;
305- }
306- synchronized (lock ) {
307- if (executorDestroyed .get ()) {
308- return ;
309- }
310- watchExecutor .submit (watchTask );
311- }
312- }
313-
314- boolean isDestroyed () {
315- return executorDestroyed .get ();
316- }
317-
318- void destroy () {
319- synchronized (lock ) {
320- if (executorDestroyed .compareAndSet (false , true )) {
321- fetchExecutor .shutdown ();
322- watchExecutor .shutdown ();
323- }
324- }
186+ private List <URL > toUrlWithEmpty (URL providerUrl , List <URL > urls ) {
187+ if (CollectionUtils .isEmpty (urls )) {
188+ LOGGER .warn ("[POLARIS] received empty url address list, will clear current available addresses" );
189+ URL empty = URLBuilder .from (providerUrl )
190+ .setProtocol (EMPTY_PROTOCOL )
191+ .addParameter (CATEGORY_KEY , DEFAULT_CATEGORY )
192+ .build ();
193+ urls .add (empty );
325194 }
195+ return urls ;
326196 }
327197
328198 @ Override
329199 public void doUnsubscribe (URL url , NotifyListener listener ) {
330200 LOGGER .info ("[polaris] unsubscribe service: {}" , url .toString ());
331- taskScheduler .submitWatchTask (new Runnable () {
332- @ Override
333- public void run () {
334- ServiceListener serviceListener = serviceListeners .get (listener );
335- if (null != serviceListener ) {
336- polarisOperator .unwatchService (url .getServiceInterface (), serviceListener );
337- }
338- }
339- });
201+ ServiceListener serviceListener = serviceListeners .get (listener );
202+ if (null != serviceListener ) {
203+ polarisOperator .unwatchService (url .getServiceInterface (), serviceListener );
204+ }
340205 }
341206
342207 @ Override
0 commit comments