Skip to content

UNKNOWN:Error received from peer {grpc_message:"Socket closed", grpc_status:14} #252

@cxhello

Description

@cxhello
nacos-sdk-python==2.0.9
nacos-server=3.0.3

现象:本地注册实例,实例不会下线。测试环境k8s pod注册实例,实例注册一段时间会消失。

报错日志如下:

Task exception was never retrieved
future: <Task finished name='Task-12' coro=<GrpcClient._server_request_watcher() done, defined at /app/__pypackages__/3.12/lib/v2/nacos/transport/grpc_client.py:145> exception=<AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Socket closed", grpc_status:14}"
>>
Traceback (most recent call last):
  File "/app/__pypackages__/3.12/lib/v2/nacos/transport/grpc_client.py", line 146, in _server_request_watcher
    async for payload in grpc_conn.bi_stream_send():
  File "/app/__pypackages__/3.12/lib/grpc/aio/_call.py", line 365, in _fetch_stream_responses
    await self._raise_for_status()
  File "/app/__pypackages__/3.12/lib/grpc/aio/_call.py", line 272, in _raise_for_status
    raise _create_rpc_error(
grpc.aio._call.AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "Socket closed"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Socket closed", grpc_status:14}"

示例代码如下:

import os
import socket
from loguru import logger

from v2.nacos import NacosNamingService, ClientConfigBuilder, GRPCConfig, Instance, SubscribeServiceParam, \
    RegisterInstanceParam, DeregisterInstanceParam, BatchRegisterInstanceParam, GetServiceParam, ListServiceParam, \
    ListInstanceParam, NacosConfigService, ConfigParam

def get_local_ip() -> str:
    """获取本地内网IP"""
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    try:
        s.connect(("8.8.8.8", 80))  # 不会真的发包
        return s.getsockname()[0]
    finally:
        s.close()

async def register(service_name: str, port: int, enabled: bool = False, k8s_namespace: str = None):
    """
    注册服务到 Nacos
    :param service_name: 服务名
    :param port: 服务端口
    :param enabled: 是否注册(默认 True,设置 False 时直接跳过)
    :param k8s_namespace: k8s namespace
    """
    if not enabled:
        logger.info(f"Skip Nacos registration for service={service_name}, port={port}")
        return

    if k8s_namespace:
        # K8s 环境:使用 service 域名
        ip_or_host = f"{service_name}.{k8s_namespace}.svc.cluster.local"
    else:
        # 本地环境:使用内网 IP
        ip_or_host = get_local_ip()


    client_config = (ClientConfigBuilder()
                     .server_address(os.getenv("ALGO_NACOS_SERVER")).namespace_id(os.getenv("ALGO_NACOS_NAMESPACE"))
                     .username(os.getenv("ALGO_NACOS_USERNAME")).password(os.getenv("ALGO_NACOS_PASSWORD"))
                     .log_level('INFO')
                     .grpc_config(GRPCConfig(grpc_timeout=5000))
                     .build())

    naming_client = await NacosNamingService.create_naming_service(client_config)

    response = await naming_client.register_instance(
        request=RegisterInstanceParam(
            service_name=service_name,
            group_name=os.getenv("ALGO_NACOS_GROUP"),
            ip=ip_or_host,
            port=port,
            ephemeral=True
        )
    )

    if response:
        logger.info(
            f"✅ Registered service={service_name}, host={ip_or_host}, port={port} to Nacos successfully"
        )
    else:
        logger.error(
            f"❌ Failed to register service={service_name}, host={ip_or_host}, port={port} to Nacos"
        )

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions