diff --git a/lib/client/address_group.js b/lib/client/address_group.js index 6e9d56f..3eb4a80 100644 --- a/lib/client/address_group.js +++ b/lib/client/address_group.js @@ -7,7 +7,7 @@ const sleep = require('mz-modules/sleep'); const { printAddresses } = require('./utils'); const DynamicConfig = require('./dynamic_config'); const HealthCounter = require('./metric/health_counter'); -const createLoadBalancer = require('./loadbalancer'); +const defaultLoadBalancer = require('./loadbalancer'); const defaultOptions = { loadbalancerClass: 'roundRobin', @@ -36,6 +36,7 @@ class AddressGroup extends Base { this._connectionPoolSize = this.connectionPoolConfig.initConnectionSize; const config = DynamicConfig.instance.metric; this._maxIdleWindow = config.numBuckets * config.bucketSizeInMs; + const createLoadBalancer = options.createLoadBalancer || defaultLoadBalancer; this._loadbalancer = createLoadBalancer(this); // 每个 window 周期更新一遍权重,权重区间 [0, 10],0 代表地址被摘除了 diff --git a/test/client/address_group.test.js b/test/client/address_group.test.js index 003d36d..c40034f 100644 --- a/test/client/address_group.test.js +++ b/test/client/address_group.test.js @@ -6,6 +6,7 @@ const utility = require('utility'); const urlparse = require('url').parse; const sleep = require('mz-modules/sleep'); const AddressGroup = require('../../lib/client/address_group'); +const RandomLoadBalancer = require('../../lib/client/loadbalancer/random'); const DynamicConfig = require('../../lib/client/dynamic_config'); const ConnectionManager = require('../../').client.RpcConnectionMgr; const MockConnection = require('../fixtures/mock_connection'); @@ -1601,4 +1602,68 @@ describe('test/client/address_group.test.js', () => { assert(!addressGroup._loadbalancer._needElasticControl(49)); }); }); + + describe('自定义loadbalancer', () => { + let addressGroup; + let addressList; + const count = 600; + + beforeEach(async function() { + mm(DynamicConfig.instance.metric, 'numBuckets', 5); + mm(DynamicConfig.instance.metric, 'bucketSizeInMs', 100); + mm(DynamicConfig.instance.faultTolerance, 'leastWindowRtMultiple', 3); + + addressList = []; + for (let i = 0; i < count; i++) { + const address = urlparse(`bolt://127.0.0.${i}:12400`, true); + addressList.push(address); + MockConnection.addAvailableAddress(address); + } + + addressGroup = new AddressGroup({ + key: 'com.alipay.TestQueryService:1.0@SOFA@xxxx', + logger, + connectionManager, + connectionClass: MockConnection, + createLoadBalancer: _addressGroup => new RandomLoadBalancer(_addressGroup), + retryFaultInterval: 5000, + connectionPoolConfig: { + minAddressCount: 5, + maxAddressCount: 50, + initConnectionSize: 6, + elasticControl: true, + capacityPerConnection: 300, + }, + }); + addressGroup.addressList = addressList; + await addressGroup.ready(); + + assert(addressGroup.connectionPoolSize === 6); + }); + + afterEach(async function() { + MockConnection.clearAvailableAddress(); + addressGroup.close(); + await connectionManager.closeAllConnections(); + mm.restore(); + }); + + it('各属性值赋值正确', async () => { + assert(addressGroup._allAddressList.length === count); + assert(addressGroup._allAddressList === addressList); + assert(addressGroup.totalSize === count); + assert(addressGroup.choosedSize === 6); + assert(addressGroup.addressList.length === 6); + + for (const address of addressGroup.addressList) { + assert(addressGroup._weightMap.has(address.host)); + assert(addressGroup._weightMap.get(address.host) === 100); + } + + assert(addressGroup._faultAddressMap.size === 0); + assert(addressGroup._degradeEnable); + const connection = await addressGroup.getConnection(req); + assert(connection && connection.isConnected); + }); + }); });