-
Notifications
You must be signed in to change notification settings - Fork 152
Open
Description
nacos-sdk-python==2.0.9
nacos-server=3.0.3
现象:本地注册实例,实例不会下线。测试环境k8s pod注册实例,实例注册一段时间会消失。
报错日志如下:
Task exception was never retrieved
future: <Task finished name='Task-12' coro=<GrpcClient._server_request_watcher() done, defined at /app/__pypackages__/3.12/lib/v2/nacos/transport/grpc_client.py:145> exception=<AioRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"Socket closed", grpc_status:14}"
>>
Traceback (most recent call last):
File "/app/__pypackages__/3.12/lib/v2/nacos/transport/grpc_client.py", line 146, in _server_request_watcher
async for payload in grpc_conn.bi_stream_send():
File "/app/__pypackages__/3.12/lib/grpc/aio/_call.py", line 365, in _fetch_stream_responses
await self._raise_for_status()
File "/app/__pypackages__/3.12/lib/grpc/aio/_call.py", line 272, in _raise_for_status
raise _create_rpc_error(
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "Socket closed"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"Socket closed", grpc_status:14}"
示例代码如下:
import os
import socket
from loguru import logger
from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
ListInstanceParam, NacosConfigService, ConfigParam
def get_local_ip() -> str:
"""获取本地内网IP"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80)) # 不会真的发包
return s.getsockname()[0]
finally:
s.close()
async def register(service_name: str, port: int, enabled: bool = False, k8s_namespace: str = None):
"""
注册服务到 Nacos
:param service_name: 服务名
:param port: 服务端口
:param enabled: 是否注册(默认 True,设置 False 时直接跳过)
:param k8s_namespace: k8s namespace
"""
if not enabled:
logger.info(f"Skip Nacos registration for service={service_name}, port={port}")
return
if k8s_namespace:
# K8s 环境:使用 service 域名
ip_or_host = f"{service_name}.{k8s_namespace}.svc.cluster.local"
else:
# 本地环境:使用内网 IP
ip_or_host = get_local_ip()
client_config = (ClientConfigBuilder()
.server_address(os.getenv("ALGO_NACOS_SERVER")).namespace_id(os.getenv("ALGO_NACOS_NAMESPACE"))
.username(os.getenv("ALGO_NACOS_USERNAME")).password(os.getenv("ALGO_NACOS_PASSWORD"))
.log_level('INFO')
.grpc_config(GRPCConfig(grpc_timeout=5000))
.build())
naming_client = await NacosNamingService.create_naming_service(client_config)
response = await naming_client.register_instance(
request=RegisterInstanceParam(
service_name=service_name,
group_name=os.getenv("ALGO_NACOS_GROUP"),
ip=ip_or_host,
port=port,
ephemeral=True
)
)
if response:
logger.info(
f"✅ Registered service={service_name}, host={ip_or_host}, port={port} to Nacos successfully"
)
else:
logger.error(
f"❌ Failed to register service={service_name}, host={ip_or_host}, port={port} to Nacos"
)
Metadata
Metadata
Assignees
Labels
No labels