From 06f5121a4a52170debf0c8dc356d3eda77980f9f Mon Sep 17 00:00:00 2001 From: Yoav Tock Date: Tue, 15 Jul 2025 11:54:38 +0300 Subject: [PATCH] A connection source to hold shard and consensus endpoints Intended use: Used by assemblers to connect to shards Used by assemblers to connect to consensus Used by batchers to connect to other batchers Used by batchers to connect to consensus Used by consenter to synch from other consensus nodes Signed-off-by: Yoav Tock --- node/delivery/client/orderers/connection.go | 199 +++ .../client/orderers/connection_factory.go | 32 + .../orderers/connection_factory_test.go | 25 + .../client/orderers/connection_test.go | 1234 +++++++++++++++++ 4 files changed, 1490 insertions(+) create mode 100644 node/delivery/client/orderers/connection.go create mode 100644 node/delivery/client/orderers/connection_factory.go create mode 100644 node/delivery/client/orderers/connection_factory_test.go create mode 100644 node/delivery/client/orderers/connection_test.go diff --git a/node/delivery/client/orderers/connection.go b/node/delivery/client/orderers/connection.go new file mode 100644 index 00000000..8becdaa0 --- /dev/null +++ b/node/delivery/client/orderers/connection.go @@ -0,0 +1,199 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers + +import ( + "bytes" + "crypto/md5" + "fmt" + "math/rand" + "sync" + + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-x-orderer/common/types" + "github.com/pkg/errors" +) + +// Endpoint represents a source of replication items, e.g. batches or decisions. +// The refreshed channel is closed when the ConnectionSource is updated. +type Endpoint struct { + Address string + RootCerts [][]byte + Refreshed chan struct{} +} + +func (e *Endpoint) String() string { + if e == nil { + return "" + } + + certHashStr := "" + + if e.RootCerts != nil { + hasher := md5.New() + for _, cert := range e.RootCerts { + hasher.Write(cert) + } + hash := hasher.Sum(nil) + certHashStr = fmt.Sprintf("%X", hash) + } + + return fmt.Sprintf("Address: %s, CertHash: %s", e.Address, certHashStr) +} + +type SourceEndpoint struct { + Address string + RootCerts [][]byte +} + +type Party2SourceEndpoint map[types.PartyID]*SourceEndpoint + +type ConnectionSource struct { + party types.PartyID // The party holding the object + targetShard types.ShardID // The target shard or consensus cluster + partOfTarget bool // Whether the holder of the object is part of the target shard + + mutex sync.RWMutex + allEndpoints []*Endpoint // All endpoints, excluding the self-endpoint. + partyToEndpoints map[types.PartyID]*Endpoint // All endpoints, including self party, used to detect changes + logger *flogging.FabricLogger +} + +func NewConnectionSource(logger *flogging.FabricLogger, selfParty types.PartyID, targetShard types.ShardID, partOfTarget bool) *ConnectionSource { + return &ConnectionSource{ + partyToEndpoints: make(map[types.PartyID]*Endpoint), + logger: logger, + party: selfParty, + targetShard: targetShard, + partOfTarget: partOfTarget, + } +} + +// PartyEndpoint returns a random endpoint. +func (cs *ConnectionSource) PartyEndpoint(party types.PartyID) (*Endpoint, error) { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + + if len(cs.partyToEndpoints) == 0 { + return nil, errors.Errorf("no endpoints currently defined") + } + + ep, ok := cs.partyToEndpoints[party] + if !ok { + return nil, errors.Errorf("not found") + } + + return ep, nil +} + +// RandomEndpoint returns a random endpoint. +func (cs *ConnectionSource) RandomEndpoint() (*Endpoint, error) { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + if len(cs.allEndpoints) == 0 { + return nil, errors.Errorf("no endpoints currently defined") + } + return cs.allEndpoints[rand.Intn(len(cs.allEndpoints))], nil +} + +func (cs *ConnectionSource) Endpoints() []*Endpoint { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + + return cs.allEndpoints +} + +// ShuffledEndpoints returns a shuffled array of endpoints in a new slice. +func (cs *ConnectionSource) ShuffledEndpoints() []*Endpoint { + cs.mutex.RLock() + defer cs.mutex.RUnlock() + + n := len(cs.allEndpoints) + returnedSlice := make([]*Endpoint, n) + indices := rand.Perm(n) + for i, idx := range indices { + returnedSlice[i] = cs.allEndpoints[idx] + } + return returnedSlice +} + +// Update calculates whether there was a change in the endpoints or certificates, and updates the endpoints if there was +// a change. When endpoints are updated, all the 'refreshed' channels of the old endpoints are closed and a new set of +// endpoints is prepared. +// +// Update skips the self-endpoint (if part od the target cluster) when preparing the endpoint array. However, changes to the +// self-endpoint do trigger the refresh of all the endpoints. +func (cs *ConnectionSource) Update(party2SourceEndpoint map[types.PartyID]*SourceEndpoint) { + cs.mutex.Lock() + defer cs.mutex.Unlock() + cs.logger.Infof("Processing updates for shard %d endpoints", cs.targetShard) + + anyChange := cs.detectChange(party2SourceEndpoint) + + if !anyChange { + cs.logger.Debugf("No sourceEndpoint addresses or TLS certs were changed") + // No TLS certs changed, no org specified endpoints changed, + // and if we are using global endpoints, they are the same + // as our last set. No need to update anything. + return + } + + for _, endpoint := range cs.allEndpoints { + // Alert any existing consumers that have a reference to the old endpoints + // that their reference is now stale, and they should get a new one. + // This is done even for endpoints which have the same TLS certs and address. + close(endpoint.Refreshed) + } + + cs.allEndpoints = nil + cs.partyToEndpoints = make(map[types.PartyID]*Endpoint) + for party, sourceEndpoint := range party2SourceEndpoint { + endpoint := &Endpoint{ + Address: sourceEndpoint.Address, + RootCerts: sourceEndpoint.RootCerts, + Refreshed: make(chan struct{}), + } + cs.partyToEndpoints[party] = endpoint + + if cs.partOfTarget && cs.party == party { + cs.logger.Debugf("Skipping self sourceEndpoint [%s] of party %d ", sourceEndpoint.Address, party) + continue + } + cs.allEndpoints = append(cs.allEndpoints, endpoint) + } + + cs.logger.Infof("Processed updates for shard %d endpoints: %v", cs.targetShard, cs.partyToEndpoints) +} + +func (cs *ConnectionSource) detectChange(party2SourceEndpoint map[types.PartyID]*SourceEndpoint) bool { + if len(party2SourceEndpoint) != len(cs.partyToEndpoints) { + return true + } + + for party, sourceEP := range party2SourceEndpoint { + ep, ok := cs.partyToEndpoints[party] + if !ok { + return true + } + + if ep.Address != sourceEP.Address { + return true + } + + if len(ep.RootCerts) != len(sourceEP.RootCerts) { + return true + } + + for i, cert := range ep.RootCerts { + if !bytes.Equal(cert, sourceEP.RootCerts[i]) { + return true + } + } + } + + return false +} diff --git a/node/delivery/client/orderers/connection_factory.go b/node/delivery/client/orderers/connection_factory.go new file mode 100644 index 00000000..3ab04bc6 --- /dev/null +++ b/node/delivery/client/orderers/connection_factory.go @@ -0,0 +1,32 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers + +import ( + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-x-orderer/common/types" +) + +type ConnectionSourcer interface { + PartyEndpoint(party types.PartyID) (*Endpoint, error) + RandomEndpoint() (*Endpoint, error) + ShuffledEndpoints() []*Endpoint + Update(partyToEndpoints map[types.PartyID]*SourceEndpoint) +} + +type ConnectionSourceCreator interface { + // CreateConnectionSource creates a ConnectionSourcer implementation. + // In a peer, selfEndpoint == ""; + // In an orderer selfEndpoint carries the (replication service) endpoint of the orderer. + CreateConnectionSource(logger *flogging.FabricLogger, selfParty types.PartyID, targetShard types.ShardID, partOfTarget bool) ConnectionSourcer +} + +type ConnectionSourceFactory struct{} + +func (f *ConnectionSourceFactory) CreateConnectionSource(logger *flogging.FabricLogger, selfParty types.PartyID, targetShard types.ShardID, partOfTarget bool) ConnectionSourcer { + return NewConnectionSource(logger, selfParty, targetShard, partOfTarget) +} diff --git a/node/delivery/client/orderers/connection_factory_test.go b/node/delivery/client/orderers/connection_factory_test.go new file mode 100644 index 00000000..9e760f52 --- /dev/null +++ b/node/delivery/client/orderers/connection_factory_test.go @@ -0,0 +1,25 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers_test + +import ( + "testing" + + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-x-orderer/node/delivery/client/orderers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreateConnectionSource(t *testing.T) { + factory := &orderers.ConnectionSourceFactory{} + require.NotNil(t, factory) + lg := flogging.MustGetLogger("test") + connSource := factory.CreateConnectionSource(lg, 1, 2, false) + require.NotNil(t, connSource) + assert.IsType(t, &orderers.ConnectionSource{}, connSource) +} diff --git a/node/delivery/client/orderers/connection_test.go b/node/delivery/client/orderers/connection_test.go new file mode 100644 index 00000000..ebba3dd3 --- /dev/null +++ b/node/delivery/client/orderers/connection_test.go @@ -0,0 +1,1234 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package orderers_test + +import ( + "bytes" + "crypto/rand" + "sort" + "strings" + "testing" + + "github.com/hyperledger/fabric-lib-go/common/flogging" + "github.com/hyperledger/fabric-x-orderer/common/types" + "github.com/hyperledger/fabric-x-orderer/node/delivery/client/orderers" + "github.com/stretchr/testify/assert" +) + +type testSetup struct { + cert1 []byte + cert2 []byte + cert3 []byte + cert4 []byte + cert5 []byte + cert6 []byte + + party2source map[types.PartyID]*orderers.SourceEndpoint +} + +func newTestSetup(t *testing.T) *testSetup { + s := &testSetup{ + cert1: make([]byte, 847), + cert2: make([]byte, 847), + cert3: make([]byte, 847), + cert4: make([]byte, 847), + cert5: make([]byte, 847), + cert6: make([]byte, 847), + party2source: make(map[types.PartyID]*orderers.SourceEndpoint), + } + + rand.Read(s.cert1) + rand.Read(s.cert2) + rand.Read(s.cert3) + rand.Read(s.cert4) + rand.Read(s.cert5) + rand.Read(s.cert6) + + s.party2source[1] = &orderers.SourceEndpoint{ + Address: "party1-address", + RootCerts: [][]byte{s.cert1, s.cert5}, + } + + s.party2source[2] = &orderers.SourceEndpoint{ + Address: "party2-address", + RootCerts: [][]byte{s.cert2, s.cert6}, + } + + s.party2source[3] = &orderers.SourceEndpoint{ + Address: "party3-address", + RootCerts: [][]byte{s.cert3}, + } + + s.party2source[4] = &orderers.SourceEndpoint{ + Address: "party4-address", + RootCerts: [][]byte{s.cert4}, + } + + return s +} + +func TestConnectionSource_NotPartOf(t *testing.T) { + lg := flogging.MustGetLogger("arma.test") + setup := newTestSetup(t) + + cs := orderers.NewConnectionSource(lg, 1, 2, false) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Run("all endpoints are as defined", func(t *testing.T) { + consistsOf(t, expectedEndpoints, endpoints) + }) + + t.Run("endpoints are not marked as refreshed", func(t *testing.T) { + for _, ep := range endpoints { + select { + case <-ep.Refreshed: + t.FailNow() + default: + } + } + }) + + t.Run("endpoint stringer works", func(t *testing.T) { + for _, endpoint := range endpoints { + assert.Regexp(t, "Address: party[1234]-address", endpoint.String()) + assert.Regexp(t, "CertHash: [A-F0-9]+", endpoint.String()) + } + e := &orderers.Endpoint{Address: "localhost"} + assert.Equal(t, "Address: localhost, CertHash: ", e.String()) + e = nil + assert.Equal(t, "", e.String()) + }) + + t.Run("returns shuffled endpoints", func(t *testing.T) { // there is a chance of failure here, but it is very small. + combinationSet := make(map[string]bool) + for i := 0; i < 10000; i++ { + shuffledEndpoints := cs.ShuffledEndpoints() + + consistsOf(t, endpoints, shuffledEndpoints) + key := strings.Builder{} + for _, ep := range shuffledEndpoints { + key.WriteString(ep.Address) + key.WriteString(" ") + } + combinationSet[key.String()] = true + } + + assert.Len(t, combinationSet, (4 * 3 * 2 * 1)) + }) + + t.Run("returns random endpoint", func(t *testing.T) { // there is a chance of failure here, but it is very small. + combinationMap := make(map[string]*orderers.Endpoint) + for i := 0; i < 10000; i++ { + r, _ := cs.RandomEndpoint() + combinationMap[r.Address] = r + } + var all []*orderers.Endpoint + for _, ep := range combinationMap { + all = append(all, ep) + } + consistsOf(t, endpoints, all) + }) + + t.Run("returns endpoint by party", func(t *testing.T) { + var all []*orderers.Endpoint + for i := 1; i <= 4; i++ { + ep, err := cs.PartyEndpoint(types.PartyID(i)) + assert.NoError(t, err) + all = append(all, ep) + } + consistsOf(t, endpoints, all) + + ep, err := cs.PartyEndpoint(5) + assert.Nil(t, ep) + assert.EqualError(t, err, "not found") + }) + + t.Run("an update that does not change the endpoints", func(t *testing.T) { + cs.Update(setup.party2source) + + t.Log("endpoints are not refreshed") + endpointsNotRefreshed(t, endpoints) + + t.Log("endpoints do not change") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + }) +} + +func TestConnectionSource_Update(t *testing.T) { + lg := flogging.MustGetLogger("arma.test") + + t.Run("an update to a party CA cert", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, false) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("drop cert5 from party 1") + setup.party2source[1].RootCerts = [][]byte{setup.cert1} // drop cert5 + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + }) + + t.Run("an update to a party address", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, false) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("change party 1 address") + setup.party2source[1].Address = "party1-address-new" + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address-new", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + }) + + t.Run("remove a party", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, false) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("delete party 1") + delete(setup.party2source, 1) + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + ep, err := cs.PartyEndpoint(1) + assert.EqualError(t, err, "not found") + assert.Nil(t, ep) + }) + + t.Run("add a party", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, false) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("add party 5") + setup.party2source[5] = &orderers.SourceEndpoint{ + Address: "party5-address", + RootCerts: [][]byte{setup.cert5}, + } + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + { + Address: "party5-address", + RootCerts: [][]byte{setup.cert5}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + + ep, err := cs.PartyEndpoint(5) + assert.NoError(t, err) + assert.Equal(t, "party5-address", ep.Address) + }) +} + +func TestConnectionSource_PartOf(t *testing.T) { + lg := flogging.MustGetLogger("arma.test") + setup := newTestSetup(t) + + cs := orderers.NewConnectionSource(lg, 1, 2, true) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Run("all endpoints do not include self party", func(t *testing.T) { + consistsOf(t, expectedEndpoints[1:], endpoints) + }) + + t.Run("endpoints are not marked as refreshed", func(t *testing.T) { + for _, ep := range endpoints { + select { + case <-ep.Refreshed: + t.FailNow() + default: + } + } + }) + + t.Run("shuffled endpoints do not include self party", func(t *testing.T) { // there is a chance of failure here, but it is very small. + combinationSet := make(map[string]bool) + for i := 0; i < 10000; i++ { + shuffledEndpoints := cs.ShuffledEndpoints() + + consistsOf(t, endpoints, shuffledEndpoints) + key := strings.Builder{} + for _, ep := range shuffledEndpoints { + key.WriteString(ep.Address) + key.WriteString(" ") + } + combinationSet[key.String()] = true + } + + assert.Len(t, combinationSet, (3 * 2 * 1)) + }) + + t.Run("random endpoint does not include self party", func(t *testing.T) { // there is a chance of failure here, but it is very small. + combinationMap := make(map[string]*orderers.Endpoint) + for i := 0; i < 10000; i++ { + r, _ := cs.RandomEndpoint() + combinationMap[r.Address] = r + } + var all []*orderers.Endpoint + for _, ep := range combinationMap { + all = append(all, ep) + } + consistsOf(t, endpoints, all) + }) + + t.Run("returns endpoint by party including self party", func(t *testing.T) { + var all []*orderers.Endpoint + for i := 1; i <= 4; i++ { + ep, err := cs.PartyEndpoint(types.PartyID(i)) + assert.NoError(t, err) + all = append(all, ep) + } + consistsOf(t, expectedEndpoints, all) + + ep, err := cs.PartyEndpoint(5) + assert.Nil(t, ep) + assert.EqualError(t, err, "not found") + }) + + t.Run("an update that does not change the endpoints", func(t *testing.T) { + cs.Update(setup.party2source) + + t.Log("endpoints are not refreshed") + endpointsNotRefreshed(t, endpoints) + + t.Log("endpoints do not change") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints[1:], newEndpoints) + ep, err := cs.PartyEndpoint(1) + assert.NoError(t, err) + assert.Equal(t, "party1-address", ep.Address) + }) +} + +func TestConnectionSource_Update_PartOf(t *testing.T) { + lg := flogging.MustGetLogger("arma.test") + + t.Run("an update to a party CA cert", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, true) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("drop cert5 from party 1") + setup.party2source[1].RootCerts = [][]byte{setup.cert1} // drop cert5 + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints[1:], newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + ep, err := cs.PartyEndpoint(1) + assert.NoError(t, err) + assert.Len(t, ep.RootCerts, 1) + assert.Equal(t, setup.cert1, ep.RootCerts[0]) + }) + + t.Run("an update to a party address", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, true) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("change party 2 address") + setup.party2source[2].Address = "party2-address-new" + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address-new", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints[1:], newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + }) + + t.Run("remove a party", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, true) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("delete party 1") + delete(setup.party2source, 1) + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints, newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + ep, err := cs.PartyEndpoint(1) + assert.EqualError(t, err, "not found") + assert.Nil(t, ep) + }) + + t.Run("add a party", func(t *testing.T) { + setup := newTestSetup(t) + cs := orderers.NewConnectionSource(lg, 1, 2, true) + assert.NotNil(t, cs) + + cs.Update(setup.party2source) + endpoints := cs.Endpoints() + t.Log("add party 5") + setup.party2source[5] = &orderers.SourceEndpoint{ + Address: "party5-address", + RootCerts: [][]byte{setup.cert5}, + } + cs.Update(setup.party2source) + + t.Log("endpoints are refreshed") + endpointsRefreshed(t, endpoints) + + expectedEndpoints := []*orderers.Endpoint{ + { + Address: "party1-address", + RootCerts: [][]byte{setup.cert1, setup.cert5}, + }, + { + Address: "party2-address", + RootCerts: [][]byte{setup.cert2, setup.cert6}, + }, + { + Address: "party3-address", + RootCerts: [][]byte{setup.cert3}, + }, + { + Address: "party4-address", + RootCerts: [][]byte{setup.cert4}, + }, + { + Address: "party5-address", + RootCerts: [][]byte{setup.cert5}, + }, + } + + t.Log("new endpoints are as expected") + newEndpoints := cs.Endpoints() + consistsOf(t, expectedEndpoints[1:], newEndpoints) + endpointsNotRefreshed(t, newEndpoints) + + ep, err := cs.PartyEndpoint(5) + assert.NoError(t, err) + assert.Equal(t, "party5-address", ep.Address) + }) +} + +func endpointsRefreshed(t *testing.T, endpoints []*orderers.Endpoint) { + for _, ep := range endpoints { + endpointRefreshed(t, ep) + } +} + +func endpointsNotRefreshed(t *testing.T, endpoints []*orderers.Endpoint) { + for _, ep := range endpoints { + endpointNotRefreshed(t, ep) + } +} + +func endpointNotRefreshed(t *testing.T, ep *orderers.Endpoint) { + select { + case <-ep.Refreshed: + t.FailNow() + default: + } +} + +func endpointRefreshed(t *testing.T, ep *orderers.Endpoint) { + select { + case <-ep.Refreshed: + default: + t.FailNow() + } +} + +func consistsOf(t *testing.T, expected, actual []*orderers.Endpoint) { + assert.Equal(t, len(expected), len(actual)) + strippedExpected := stripEndpoints(expected) + for _, ep := range stripEndpoints(actual) { + assert.Contains(t, strippedExpected, ep) + } +} + +// stripEndpoints makes a comparable version of the endpoints specified. This +// is necessary because the endpoint contains a channel which is not +// comparable. +func stripEndpoints(endpoints []*orderers.Endpoint) []orderers.SourceEndpoint { + endpointsWithChannelStripped := make([]orderers.SourceEndpoint, len(endpoints)) + for i, endpoint := range endpoints { + certs := endpoint.RootCerts + sort.Slice(certs, func(i, j int) bool { + return bytes.Compare(certs[i], certs[j]) >= 0 + }) + endpointsWithChannelStripped[i].Address = endpoint.Address + endpointsWithChannelStripped[i].RootCerts = certs + } + return endpointsWithChannelStripped +} + +// It("does not include the self endpoint in random endpoint", func() { // there is a chance of failure here, but it is very small. +// combinationMap := make(map[string]*orderers.Endpoint) +// for i := 0; i < 10000; i++ { +// r, _ := cs.RandomEndpoint() +// combinationMap[r.Address] = r +// } +// var all []*orderers.Endpoint +// for _, ep := range combinationMap { +// all = append(all, ep) +// } +// Expect(stripEndpoints(all)).To(ConsistOf( +// stripEndpoints(endpoints), +// )) +// }) +// +// It("does not mark any of the endpoints as refreshed", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).NotTo(BeClosed()) +// } +// }) +// +// When("an update does not modify the endpoint set", func() { +// BeforeEach(func() { +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("does not update the endpoints", func() { +// newEndpoints := cs.Endpoints() +// Expect(newEndpoints).To(Equal(endpoints)) +// }) +// +// It("does not close any of the refresh channels", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).NotTo(BeClosed()) +// } +// }) +// }) +// +// When("an update changes an org's TLS CA", func() { +// BeforeEach(func() { +// org1.RootCerts = [][]byte{cert1} +// +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates a new set of orderer endpoints yet skips the self-endpoint", func() { +// newOrg1Certs := [][]byte{cert1} +// +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "org1-address2", +// RootCerts: newOrg1Certs, +// }, +// { +// Address: "org2-address1", +// RootCerts: org2Certs, +// }, +// { +// Address: "org2-address2", +// RootCerts: org2Certs, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("an update changes an org's endpoint addresses", func() { +// BeforeEach(func() { +// org1.Addresses = []string{"org1-address1", "org1-address3"} +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates a new set of orderer endpoints, yet skips the self-endpoint", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "org1-address3", +// RootCerts: org1Certs, +// }, +// { +// Address: "org2-address1", +// RootCerts: org2Certs, +// }, +// { +// Address: "org2-address2", +// RootCerts: org2Certs, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("an update removes an ordering organization", func() { +// BeforeEach(func() { +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org2": org2, +// }) +// }) +// +// It("creates a new set of orderer endpoints, self-endpoint matches nothing", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "org2-address1", +// RootCerts: org2Certs, +// }, +// { +// Address: "org2-address2", +// RootCerts: org2Certs, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// +// When("the org is added back", func() { +// BeforeEach(func() { +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("returns to the set of orderer endpoints, yet skips the self-endpoint", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "org1-address2", +// RootCerts: org1Certs, +// }, +// { +// Address: "org2-address1", +// RootCerts: org2Certs, +// }, +// { +// Address: "org2-address2", +// RootCerts: org2Certs, +// }, +// }), +// )) +// }) +// }) +// }) +// +// When("an update modifies the global endpoints but does not affect the org endpoints", func() { +// BeforeEach(func() { +// cs.Update(nil, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("does not update the endpoints", func() { +// newEndpoints := cs.Endpoints() +// Expect(newEndpoints).To(Equal(endpoints)) +// }) +// +// It("does not close any of the refresh channels", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).NotTo(BeClosed()) +// } +// }) +// }) +// +// When("the configuration does not contain orderer org endpoints", func() { +// var globalCerts [][]byte +// +// BeforeEach(func() { +// org1.Addresses = nil +// org2.Addresses = nil +// +// globalCerts = [][]byte{cert1, cert2, cert3} +// +// cs.Update([]string{"global-addr1", "global-addr2"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr1", +// RootCerts: globalCerts, +// }, +// { +// Address: "global-addr2", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// +// When("the global list of addresses grows", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "global-addr2", "global-addr3"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr1", +// RootCerts: globalCerts, +// }, +// { +// Address: "global-addr2", +// RootCerts: globalCerts, +// }, +// { +// Address: "global-addr3", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("the global set of addresses shrinks", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr1", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("the global set of addresses is modified", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "global-addr3"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr1", +// RootCerts: globalCerts, +// }, +// { +// Address: "global-addr3", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("an update to the global addrs references an overridden org endpoint address", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "override-address"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }, +// ) +// }) +// +// It("creates a new set of orderer endpoints with overrides", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr1", +// RootCerts: globalCerts, +// }, +// { +// Address: "re-mapped-address", +// RootCerts: overrideCerts, +// }, +// }), +// )) +// }) +// }) +// +// When("an orderer org adds an endpoint", func() { +// BeforeEach(func() { +// org1.Addresses = []string{"new-org1-address"} +// cs.Update([]string{"global-addr1", "global-addr2"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("removes the global endpoints and uses only the org level ones", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "new-org1-address", +// RootCerts: org1Certs, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// }) +// +// When("global endpoints are in effect and self-endpoint is from them", func() { +// var globalCerts [][]byte +// +// BeforeEach(func() { +// cs = orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), +// map[string]*orderers.Endpoint{ +// "override-address": { +// Address: "re-mapped-address", +// RootCerts: overrideCerts, +// }, +// }, +// "global-addr1") //<< self-endpoint from global endpoints +// +// org1.Addresses = nil +// org2.Addresses = nil +// +// globalCerts = [][]byte{cert1, cert2, cert3} +// +// cs.Update([]string{"global-addr1", "global-addr2"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// +// endpoints = cs.Endpoints() +// }) +// +// It("creates endpoints for the global endpoints, yet skips the self-endpoint", func() { +// Expect(stripEndpoints(endpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr2", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// When("the global list of addresses grows", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "global-addr2", "global-addr3"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global endpoints, yet skips the self-endpoint", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr2", +// RootCerts: globalCerts, +// }, +// { +// Address: "global-addr3", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("the global set of addresses shrinks, removing self endpoint", func() { +// flogging.ActivateSpec("debug") +// BeforeEach(func() { +// cs.Update([]string{"global-addr2"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr2", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("does not close the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).NotTo(BeClosed()) +// } +// }) +// }) +// +// When("the global set of addresses is modified", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "global-addr3"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("creates endpoints for the global addrs", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "global-addr3", +// RootCerts: globalCerts, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// +// When("an update to the global addrs references an overridden org endpoint address", func() { +// BeforeEach(func() { +// cs.Update([]string{"global-addr1", "override-address"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }, +// ) +// }) +// +// It("creates a new set of orderer endpoints with overrides", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "re-mapped-address", +// RootCerts: overrideCerts, +// }, +// }), +// )) +// }) +// }) +// +// When("an orderer org adds an endpoint", func() { +// BeforeEach(func() { +// org1.Addresses = []string{"new-org1-address"} +// cs.Update([]string{"global-addr1", "global-addr2"}, map[string]orderers.OrdererOrg{ +// "org1": org1, +// "org2": org2, +// }) +// }) +// +// It("removes the global endpoints and uses only the org level ones", func() { +// newEndpoints := cs.Endpoints() +// Expect(stripEndpoints(newEndpoints)).To(ConsistOf( +// stripEndpoints([]*orderers.Endpoint{ +// { +// Address: "new-org1-address", +// RootCerts: org1Certs, +// }, +// }), +// )) +// }) +// +// It("closes the refresh channel for all of the old endpoints", func() { +// for _, endpoint := range endpoints { +// Expect(endpoint.Refreshed).To(BeClosed()) +// } +// }) +// }) +// }) +// }) +//})