Skip to content

Commit 1c8212d

Browse files
committed
Merge pull request #27 from cfieber/cassandra_config_update
Cassandra config update
2 parents 486b426 + 1bb9885 commit 1c8212d

File tree

10 files changed

+457
-46
lines changed

10 files changed

+457
-46
lines changed

kork-cassandra/kork-cassandra.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
dependencies {
2+
compile project(":kork-core")
23
compile spinnaker.dependency('bootAutoConfigure')
4+
runtime 'org.hibernate:hibernate-validator:5.2.2.Final'
35
spinnaker.group('cassandra')
46
testRuntime spinnaker.dependency('slf4jSimple')
57
spinnaker.group('spockBase')

kork-cassandra/src/main/java/com/netflix/spinnaker/kork/astyanax/AstyanaxComponents.java

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,26 @@
1717
package com.netflix.spinnaker.kork.astyanax;
1818

1919
import com.google.common.collect.ImmutableMap;
20+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2021
import com.netflix.astyanax.AstyanaxConfiguration;
2122
import com.netflix.astyanax.Keyspace;
2223
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
23-
import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
2424
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
2525
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
2626
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
2727
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolType;
28-
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
2928
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
29+
import com.netflix.astyanax.model.ConsistencyLevel;
3030
import com.netflix.astyanax.test.EmbeddedCassandra;
31+
import com.netflix.discovery.DiscoveryClient;
32+
import com.netflix.spectator.api.Registry;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
35+
import org.springframework.beans.factory.annotation.Qualifier;
3336
import org.springframework.beans.factory.annotation.Value;
34-
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
35-
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
36-
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
37-
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
37+
import org.springframework.boot.autoconfigure.condition.*;
38+
import org.springframework.boot.context.properties.ConfigurationProperties;
39+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3840
import org.springframework.context.annotation.Bean;
3941
import org.springframework.context.annotation.Configuration;
4042

@@ -44,62 +46,109 @@
4446
import java.io.IOException;
4547
import java.net.Socket;
4648
import java.util.Map;
47-
import java.util.concurrent.Callable;
48-
import java.util.concurrent.Executors;
49-
import java.util.concurrent.Future;
50-
import java.util.concurrent.TimeUnit;
49+
import java.util.concurrent.*;
5150

5251
@Configuration
53-
@ConditionalOnMissingClass(name = {"com.netflix.cassandra.NFAstyanaxManager"})
52+
@EnableConfigurationProperties
5453
@ConditionalOnClass(AstyanaxConfiguration.class)
5554
public class AstyanaxComponents {
5655

56+
@Value("${cassandra.host:127.0.0.1}")
57+
String seeds;
58+
59+
@Bean
60+
public ExecutorService cassandraAsyncExecutor(@Value("${cassandra.asyncExecutorPoolSize:5}") int asyncPoolSize) {
61+
return Executors.newFixedThreadPool(asyncPoolSize,
62+
new ThreadFactoryBuilder().setDaemon(true)
63+
.setNameFormat("AstyanaxAsync-%d")
64+
.build());
65+
}
66+
5767
@Bean
58-
public AstyanaxConfiguration astyanaxConfiguration() {
68+
@ConditionalOnMissingBean(AstyanaxConfiguration.class)
69+
@ConfigurationProperties("cassandra")
70+
public AstyanaxConfiguration astyanaxConfiguration(@Qualifier("cassandraAsyncExecutor") ExecutorService cassandraAsyncExecutor) {
5971
return new AstyanaxConfigurationImpl()
72+
.setDefaultReadConsistencyLevel(ConsistencyLevel.CL_LOCAL_QUORUM)
73+
.setDefaultWriteConsistencyLevel(ConsistencyLevel.CL_LOCAL_QUORUM)
6074
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
6175
.setConnectionPoolType(ConnectionPoolType.TOKEN_AWARE)
6276
.setCqlVersion("3.0.0")
63-
.setTargetCassandraVersion("2.0");
77+
.setTargetCassandraVersion("2.0")
78+
.setAsyncExecutor(cassandraAsyncExecutor);
6479
}
6580

81+
82+
6683
@Bean
67-
public ConnectionPoolMonitor connectionPoolMonitor() {
68-
return new CountingConnectionPoolMonitor();
84+
@ConditionalOnBean(DiscoveryClient.class)
85+
@ConditionalOnProperty("cassandra.eureka.enabled")
86+
public ClusterHostSupplierFactory eurekaHostSupplier(DiscoveryClient discoveryClient) {
87+
return EurekaHostSupplier.factory(discoveryClient);
6988
}
7089

7190
@Bean
72-
public ConnectionPoolConfiguration connectionPoolConfiguration(@Value("${cassandra.port:9160}") int port, @Value("${cassandra.host:127.0.0.1}") String seeds, @Value("${cassandra.maxConns:3}") int maxConns) {
73-
return new ConnectionPoolConfigurationImpl("cpConfig").setPort(port).setSeeds(seeds).setMaxConns(maxConns);
91+
@ConditionalOnMissingBean(ClusterHostSupplierFactory.class)
92+
public ClusterHostSupplierFactory clusterHostSupplierFactory() {
93+
return ClusterHostSupplierFactory.nullSupplierFactory();
94+
}
95+
96+
@Bean
97+
@ConditionalOnBean(Registry.class)
98+
public KeyspaceConnectionPoolMonitorFactory spectatorConnectionPoolMonitor(Registry registry) {
99+
return SpectatorConnectionPoolMonitor.factory(registry);
100+
}
101+
102+
@Bean
103+
@ConditionalOnMissingBean(KeyspaceConnectionPoolMonitorFactory.class)
104+
public KeyspaceConnectionPoolMonitorFactory countingConnectionPoolMonitor() {
105+
return KeyspaceConnectionPoolMonitorFactory.defaultFactory();
106+
}
107+
108+
@Bean
109+
@ConfigurationProperties("cassandra")
110+
public ConnectionPoolConfiguration connectionPoolConfiguration() {
111+
return new ConnectionPoolConfigurationImpl("cpConfig").setSeeds(seeds);
74112
}
75113

76114
@Bean
77115
public AstyanaxKeyspaceFactory keyspaceFactory(AstyanaxConfiguration config,
78116
ConnectionPoolConfiguration poolConfig,
79-
ConnectionPoolMonitor poolMonitor) {
80-
return new DefaultAstyanaxKeyspaceFactory(config, poolConfig, poolMonitor);
117+
KeyspaceConnectionPoolMonitorFactory connectionPoolMonitorFactory,
118+
ClusterHostSupplierFactory clusterHostSupplierFactory,
119+
KeyspaceInitializer keyspaceInitializer) {
120+
return new DefaultAstyanaxKeyspaceFactory(config, poolConfig, connectionPoolMonitorFactory, clusterHostSupplierFactory, keyspaceInitializer);
81121
}
82122

83123
@ConditionalOnExpression("${cassandra.embedded:true} and '${cassandra.host:127.0.0.1}' == '127.0.0.1'")
124+
@Bean
84125
@ConditionalOnBean(Keyspace.class)
126+
public KeyspaceInitializer embeddedCassandra(@Value("${cassandra.port:9160}") int port,
127+
@Value("${cassandra.storagePort:7000}") int storagePort,
128+
@Value("${cassandra.host:127.0.0.1}") String host) {
129+
return new EmbeddedCassandraRunner(port, storagePort, host);
130+
}
131+
132+
@ConditionalOnMissingBean(KeyspaceInitializer.class)
85133
@Bean
86-
public EmbeddedCassandraRunner embeddedCassandra(Keyspace keyspace, @Value("${cassandra.port:9160}") int port,
87-
@Value("${cassandra.storagePort:7000}") int storagePort,
88-
@Value("${cassandra.host:127.0.0.1}") String host) {
89-
return new EmbeddedCassandraRunner(keyspace, port, storagePort, host);
134+
public KeyspaceInitializer noopKeyspaceInitializer() {
135+
return new KeyspaceInitializer() {
136+
@Override
137+
public void initKeyspace(Keyspace keyspace) throws ConnectionException {
138+
//noop
139+
}
140+
};
90141
}
91142

92-
public static class EmbeddedCassandraRunner {
143+
public static class EmbeddedCassandraRunner implements KeyspaceInitializer {
93144
private static final Logger log = LoggerFactory.getLogger(EmbeddedCassandraRunner.class);
94145

95-
private final Keyspace keyspace;
96146
private final int port;
97147
private final int storagePort;
98148
private final String host;
99149
private EmbeddedCassandra embeddedCassandra;
100150

101-
public EmbeddedCassandraRunner(Keyspace keyspace, int port, int storagePort, String host) {
102-
this.keyspace = keyspace;
151+
public EmbeddedCassandraRunner(int port, int storagePort, String host) {
103152
this.port = port;
104153
this.storagePort = storagePort;
105154
this.host = host;
@@ -126,14 +175,18 @@ public Object call() throws Exception {
126175
});
127176
waitForCassandraFuture.get(60, TimeUnit.SECONDS);
128177
log.info("Embedded cassandra started.");
178+
}
179+
180+
@Override
181+
public void initKeyspace(Keyspace keyspace) throws ConnectionException {
129182
try {
130183
keyspace.describeKeyspace();
131184
} catch (ConnectionException e) {
132185
Map<String, Object> options = ImmutableMap.<String, Object>builder()
133-
.put("name", keyspace.getKeyspaceName())
134-
.put("strategy_class", "SimpleStrategy")
135-
.put("strategy_options", ImmutableMap.of("replication_factor", "1"))
136-
.build();
186+
.put("name", keyspace.getKeyspaceName())
187+
.put("strategy_class", "SimpleStrategy")
188+
.put("strategy_options", ImmutableMap.of("replication_factor", "1"))
189+
.build();
137190
keyspace.createKeyspace(options);
138191
}
139192
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License")
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.spinnaker.kork.astyanax;
18+
19+
import com.google.common.base.Supplier;
20+
import com.netflix.astyanax.connectionpool.Host;
21+
22+
import java.util.List;
23+
24+
public interface ClusterHostSupplierFactory {
25+
static ClusterHostSupplierFactory nullSupplierFactory() {
26+
return new ClusterHostSupplierFactory() {
27+
@Override
28+
public Supplier<List<Host>> createHostSupplier(String clusterName) {
29+
return null;
30+
}
31+
};
32+
}
33+
Supplier<List<Host>> createHostSupplier(String clusterName);
34+
}

kork-cassandra/src/main/java/com/netflix/spinnaker/kork/astyanax/DefaultAstyanaxKeyspaceFactory.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717
package com.netflix.spinnaker.kork.astyanax;
1818

1919
import com.google.common.base.Preconditions;
20+
import com.google.common.base.Supplier;
2021
import com.google.common.cache.*;
22+
import com.google.common.util.concurrent.UncheckedExecutionException;
2123
import com.netflix.astyanax.AstyanaxConfiguration;
2224
import com.netflix.astyanax.AstyanaxContext;
2325
import com.netflix.astyanax.Keyspace;
2426
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
25-
import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
27+
import com.netflix.astyanax.connectionpool.Host;
2628
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
2729
import com.netflix.astyanax.connectionpool.exceptions.UnknownException;
30+
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
2831
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
2932

3033
import javax.annotation.PreDestroy;
34+
import java.util.List;
3135
import java.util.concurrent.ExecutionException;
3236

3337
public class DefaultAstyanaxKeyspaceFactory implements AstyanaxKeyspaceFactory {
@@ -36,11 +40,13 @@ public class DefaultAstyanaxKeyspaceFactory implements AstyanaxKeyspaceFactory {
3640

3741
public DefaultAstyanaxKeyspaceFactory(final AstyanaxConfiguration astyanaxConfiguration,
3842
final ConnectionPoolConfiguration connectionPoolConfiguration,
39-
final ConnectionPoolMonitor connectionPoolMonitor) {
43+
final KeyspaceConnectionPoolMonitorFactory connectionPoolMonitorFactory,
44+
final ClusterHostSupplierFactory clusterHostSupplierFactory,
45+
final KeyspaceInitializer keyspaceInitializer) {
4046
keyspaces = CacheBuilder
4147
.newBuilder()
4248
.removalListener(createRemovalListener())
43-
.build(createCacheLoader(astyanaxConfiguration, connectionPoolConfiguration, connectionPoolMonitor));
49+
.build(createCacheLoader(astyanaxConfiguration, connectionPoolConfiguration, connectionPoolMonitorFactory, clusterHostSupplierFactory, keyspaceInitializer));
4450
}
4551

4652
@Override
@@ -52,6 +58,9 @@ public Keyspace getKeyspace(final String clusterName, final String keyspaceName)
5258
throw (ConnectionException) ex.getCause();
5359
}
5460
throw new UnknownException(ex.getCause());
61+
} catch (UncheckedExecutionException uee) {
62+
uee.printStackTrace();
63+
throw uee;
5564
}
5665
}
5766

@@ -114,18 +123,27 @@ public void onRemoval(RemovalNotification<KeyspaceKey, AstyanaxContext<Keyspace>
114123

115124
CacheLoader<KeyspaceKey, AstyanaxContext<Keyspace>> createCacheLoader(final AstyanaxConfiguration astyanaxConfiguration,
116125
final ConnectionPoolConfiguration connectionPoolConfiguration,
117-
final ConnectionPoolMonitor connectionPoolMonitor) {
126+
final KeyspaceConnectionPoolMonitorFactory connectionPoolMonitorFactory,
127+
final ClusterHostSupplierFactory clusterHostSupplierFactory,
128+
final KeyspaceInitializer keyspaceInitializer) {
118129
return new CacheLoader<KeyspaceKey, AstyanaxContext<Keyspace>>() {
119130
@Override
120131
public AstyanaxContext<Keyspace> load(KeyspaceKey key) throws Exception {
132+
Supplier<List<Host>> hostSupplier = clusterHostSupplierFactory.createHostSupplier(key.getClusterName());
133+
if (hostSupplier != null) {
134+
hostSupplier.get();
135+
((ConnectionPoolConfigurationImpl) connectionPoolConfiguration).setSeeds(null);
136+
}
121137
AstyanaxContext<Keyspace> context = new AstyanaxContext.Builder()
122138
.forCluster(key.getClusterName())
123139
.forKeyspace(key.getKeyspaceName())
124140
.withAstyanaxConfiguration(astyanaxConfiguration)
125141
.withConnectionPoolConfiguration(connectionPoolConfiguration)
126-
.withConnectionPoolMonitor(connectionPoolMonitor)
142+
.withConnectionPoolMonitor(connectionPoolMonitorFactory.createMonitorForKeyspace(key.getClusterName(), key.getKeyspaceName()))
143+
.withHostSupplier(hostSupplier)
127144
.buildKeyspace(ThriftFamilyFactory.getInstance());
128145
context.start();
146+
keyspaceInitializer.initKeyspace(context.getClient());
129147
return context;
130148
}
131149
};

0 commit comments

Comments
 (0)