Skip to content

Commit d07a23f

Browse files
authored
Merge pull request #65 from salesforce/feature/instance-mode
Instance mode WIP
2 parents d198f7d + 8b9feaa commit d07a23f

File tree

10 files changed

+748
-0
lines changed

10 files changed

+748
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.instancemode;
9+
10+
import io.grpc.*;
11+
12+
import java.util.Collection;
13+
import java.util.function.Supplier;
14+
15+
/**
16+
* {@code PerCallService} forces gRPC to instantiate a fresh service implementation object with every request,
17+
* instead of reusing a well-known singleton instance.
18+
*
19+
* <p>{@code PerCallService} is useful when you need total isolation between requests. Since every request instantiates
20+
* a fresh service implementation object, there is no opportunity for shared state to leak between requests. However,
21+
* isolation comes at a cost to performance. Service implementation initialization time is added to every request. If
22+
* initialization is costly or time consuming, gRPC throughput will noticeably degrade.
23+
*
24+
* <p>{@code PerCallService} is also useful when you only want to hold references to expensive resources for the
25+
* duration of a single operation -- for example, database connections or file handles. With a traditional singleton
26+
* gRPC service implementation, you would be responsible for acquiring and freeing resources manually with every
27+
* request. Using a {@code PerCallService}, you can use more traditional Object Oriented patterns for resource
28+
* management, like constructors and {@link AutoCloseable}.
29+
*
30+
* <p>If the decorated service instance implements {@link AutoCloseable}, the instance's {@link AutoCloseable#close()}
31+
* method will be called upon completion or cancellation of each gRPC request. Use this opportunity to free any
32+
* shared resources.
33+
*
34+
* @param <T> a {@code BindableService} implementation to decorate
35+
*/
36+
public class PerCallService<T extends BindableService> implements BindableService {
37+
private ServerServiceDefinition perCallBinding;
38+
39+
/**
40+
* Create a {@code PerCallService} for a provided service implementation class, generated by a factory method.
41+
*
42+
* @param factory A factory that will initialize a new service implementation object for every call.
43+
*/
44+
public PerCallService(Supplier<T> factory) {
45+
perCallBinding = bindService(factory);
46+
}
47+
48+
/**
49+
* Create a {@code PerCallService} for a provided service implementation class. The provided class must have a
50+
* default constructor.
51+
*
52+
* @param clazz The service implementation class to decorate.
53+
*/
54+
public PerCallService(Class<T> clazz) {
55+
this (() -> {
56+
try {
57+
return clazz.newInstance();
58+
} catch (ReflectiveOperationException e) {
59+
throw new IllegalArgumentException("Class " + clazz.getName() + " must have a public default constructor", e);
60+
}
61+
});
62+
}
63+
64+
@SuppressWarnings("unchecked")
65+
private ServerServiceDefinition bindService(Supplier<T> factory) {
66+
ServerServiceDefinition baseDefinition = factory.get().bindService();
67+
ServiceDescriptor descriptor = baseDefinition.getServiceDescriptor();
68+
Collection<ServerMethodDefinition<?, ?>> methods = baseDefinition.getMethods();
69+
70+
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(descriptor);
71+
methods.forEach(method -> builder.addMethod(ServerMethodDefinition.create(method.getMethodDescriptor(), new PerCallServerCallHandler(factory))));
72+
return builder.build();
73+
}
74+
75+
@Override
76+
public ServerServiceDefinition bindService() {
77+
return perCallBinding;
78+
}
79+
80+
/**
81+
* Internal class implementing the per-call service pattern.
82+
*/
83+
private class PerCallServerCallHandler implements ServerCallHandler {
84+
private Supplier<T> factory;
85+
86+
PerCallServerCallHandler(Supplier<T> factory) {
87+
this.factory = factory;
88+
}
89+
90+
@Override
91+
@SuppressWarnings("unchecked")
92+
public ServerCall.Listener startCall(ServerCall call, Metadata headers) {
93+
BindableService instance = factory.get();
94+
ServerServiceDefinition definition = instance.bindService();
95+
ServerMethodDefinition method = definition.getMethod(call.getMethodDescriptor().getFullMethodName());
96+
97+
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<T>(method.getServerCallHandler().startCall(call, headers)) {
98+
@Override
99+
public void onCancel() {
100+
super.onCancel();
101+
close();
102+
}
103+
104+
@Override
105+
public void onComplete() {
106+
super.onComplete();
107+
close();
108+
}
109+
110+
private void close() {
111+
if (instance instanceof AutoCloseable) {
112+
try {
113+
((AutoCloseable) instance).close();
114+
} catch (Throwable t) {
115+
throw new RuntimeException(t);
116+
}
117+
}
118+
}
119+
};
120+
}
121+
}
122+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.instancemode;
9+
10+
import com.salesforce.grpc.contrib.session.ClientSessionTransportFilter;
11+
import com.salesforce.grpc.contrib.session.SessionLifecycleEvent;
12+
import com.salesforce.grpc.contrib.session.SessionLifecycleEventListener;
13+
import com.salesforce.grpc.contrib.session.SessionLifecycleEventSource;
14+
import io.grpc.*;
15+
16+
import java.util.Collection;
17+
import java.util.Map;
18+
import java.util.UUID;
19+
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.function.Supplier;
21+
22+
import static com.google.common.base.Preconditions.checkNotNull;
23+
24+
/**
25+
* {@code PerSessionService} forces gRPC to instantiate a fresh service implementation object for each unique client
26+
* session connecting to a service.
27+
*
28+
* <p>{@code PerSessionService} is useful when you want to share state between service operations on a per-session
29+
* basis. Each time a client {@code ManagedChannel} connects to a {@code PerSessionService}, a new service
30+
* implementation instance is created. This instance will be used for all calls made by the {@code ManagedChannel}, but
31+
* will be isolated from calls made from other clients. However, isolation comes at a cost to performance. Service
32+
* implementation initialization time is added to every request. If initialization is costly or time consuming, gRPC
33+
* throughput will noticeably degrade. Additionally, resource consumption will grow linearly with the number of
34+
* concurrent connections. Resource exhaustion and poor scalability will happen the service is not implemented with
35+
* care.
36+
*
37+
* <p>If the decorated service instance implements {@link AutoCloseable}, the instance's {@link AutoCloseable#close()}
38+
* method will be called when the client's connection is closed. Use this opportunity to free any shared resources.
39+
*
40+
* @param <T> a {@code BindableService} implementation to decorate
41+
*/
42+
public class PerSessionService<T extends BindableService> implements BindableService, SessionLifecycleEventListener {
43+
private ServerServiceDefinition perSessionBinding;
44+
private Map<UUID, T> sessionServices = new ConcurrentHashMap<>();
45+
46+
/**
47+
* Create a {@code PerSessionService} for a provided service implementation class, generated by a factory method.
48+
*
49+
* @param factory A factory that will initialize a new service implementation object for every call.
50+
* @param transportFilter A {@link SessionLifecycleEventSource} emitting {@link SessionLifecycleEvent}s.
51+
*/
52+
public PerSessionService(Supplier<T> factory, SessionLifecycleEventSource transportFilter) {
53+
checkNotNull(factory, "factory");
54+
checkNotNull(transportFilter, "transportFilter");
55+
56+
perSessionBinding = bindFactory(factory);
57+
transportFilter.addSessionEventListener(this);
58+
}
59+
60+
/**
61+
* Create a {@code PerSessionService} for a provided service implementation class. The provided class must have a
62+
* default constructor.
63+
*
64+
* @param clazz The service implementation class to decorate.
65+
*/
66+
public PerSessionService(Class<T> clazz, SessionLifecycleEventSource transportFilter) {
67+
this (() -> {
68+
try {
69+
checkNotNull(clazz, "clazz");
70+
return clazz.newInstance();
71+
} catch (ReflectiveOperationException e) {
72+
throw new IllegalArgumentException("Class " + clazz.getName() + " must have a public default constructor", e);
73+
}
74+
}, transportFilter);
75+
}
76+
77+
@SuppressWarnings("unchecked")
78+
private ServerServiceDefinition bindFactory(Supplier<T> factory) {
79+
ServerServiceDefinition baseDefinition = factory.get().bindService();
80+
ServiceDescriptor descriptor = baseDefinition.getServiceDescriptor();
81+
Collection<ServerMethodDefinition<?, ?>> methods = baseDefinition.getMethods();
82+
83+
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(descriptor);
84+
methods.forEach(method -> builder.addMethod(ServerMethodDefinition.create(method.getMethodDescriptor(), new PerSessionServerCallHandler(factory))));
85+
return builder.build();
86+
}
87+
88+
@Override
89+
public ServerServiceDefinition bindService() {
90+
return perSessionBinding;
91+
}
92+
93+
@Override
94+
public void sessionStart(SessionLifecycleEvent event) {
95+
96+
}
97+
98+
@Override
99+
public void sessionEnd(SessionLifecycleEvent event) {
100+
T instance = sessionServices.remove(event.getSessionId());
101+
if (instance instanceof AutoCloseable) {
102+
try {
103+
((AutoCloseable) instance).close();
104+
} catch (Throwable t) {
105+
throw new RuntimeException(t);
106+
}
107+
}
108+
}
109+
110+
/**
111+
* Internal class implementing the per-session service pattern.
112+
*/
113+
private class PerSessionServerCallHandler implements ServerCallHandler {
114+
private Supplier<T> factory;
115+
116+
PerSessionServerCallHandler(Supplier<T> factory) {
117+
this.factory = factory;
118+
}
119+
120+
@Override
121+
@SuppressWarnings("unchecked")
122+
public ServerCall.Listener startCall(ServerCall call, Metadata headers) {
123+
UUID sessionId = call.getAttributes().get(ClientSessionTransportFilter.TRANSPORT_ATTRIBUTES_SESSION_ID);
124+
if (sessionId != null) {
125+
if (!sessionServices.containsKey(sessionId)) {
126+
T instance = factory.get();
127+
sessionServices.put(sessionId, instance);
128+
129+
ServerServiceDefinition definition = instance.bindService();
130+
ServerMethodDefinition method = definition.getMethod(call.getMethodDescriptor().getFullMethodName());
131+
132+
return method.getServerCallHandler().startCall(call, headers);
133+
} else {
134+
T instance = sessionServices.get(sessionId);
135+
ServerServiceDefinition definition = instance.bindService();
136+
ServerMethodDefinition method = definition.getMethod(call.getMethodDescriptor().getFullMethodName());
137+
138+
return method.getServerCallHandler().startCall(call, headers);
139+
}
140+
} else {
141+
throw new IllegalStateException("ClientSessionTransportFilter was not registered with " +
142+
"ServerBuilder.addTransportFilter(new ClientSessionTransportFilter())");
143+
}
144+
}
145+
}
146+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.interceptor;
9+
10+
import com.salesforce.grpc.contrib.session.ClientSessionTransportFilter;
11+
import io.grpc.*;
12+
13+
import java.util.UUID;
14+
15+
/**
16+
* The {@code SessionIdServerInterceptor} is used in conjunction with {@link ClientSessionTransportFilter} to attach a
17+
* unique {@link UUID} SessionID to all requests from a common client session. The SessionID for a request is stored
18+
* in the gRPC {@code Session} and is available via the {@link #SESSION_ID} context key.
19+
*
20+
* <p>A client session is created each time a client-side {@code ManagedChannel} connects to the server.
21+
*/
22+
public class SessionIdServerInterceptor implements ServerInterceptor {
23+
/**
24+
* The gRPC {@code Context.Key} used to access SessionID.
25+
*/
26+
public static final Context.Key<UUID> SESSION_ID = Context.key("SESSION_ID");
27+
28+
@Override
29+
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
30+
UUID sessionId = call.getAttributes().get(ClientSessionTransportFilter.TRANSPORT_ATTRIBUTES_SESSION_ID);
31+
if (sessionId != null) {
32+
return Contexts.interceptCall(Context.current().withValue(SESSION_ID, sessionId), call, headers, next);
33+
} else {
34+
throw new IllegalStateException("ClientSessionTransportFilter was not registered with " +
35+
"ServerBuilder.addTransportFilter(new ClientSessionTransportFilter())");
36+
}
37+
}
38+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2017, salesforce.com, inc.
3+
* All rights reserved.
4+
* Licensed under the BSD 3-Clause license.
5+
* For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
6+
*/
7+
8+
package com.salesforce.grpc.contrib.session;
9+
10+
import io.grpc.Attributes;
11+
import io.grpc.ServerTransportFilter;
12+
13+
import java.util.Set;
14+
import java.util.UUID;
15+
import java.util.concurrent.CopyOnWriteArraySet;
16+
17+
import static com.google.common.base.Preconditions.checkNotNull;
18+
19+
/**
20+
* {@code ClientSessionTransportFilter} is a gRPC {@code TransportFilter} that attaches a unique SessionID to all requests
21+
* from a common client session. A client session is created each time a client-side {@code ManagedChannel} connects
22+
* to the server. For easy access to the SessionID from service implementations, add a
23+
* {@link com.salesforce.grpc.contrib.interceptor.SessionIdServerInterceptor} to the request chain.
24+
*
25+
* <p>The {@code ClientSessionTransportFilter} is installed using {@code "ServerBuilder.addTransportFilter(new ClientSessionTransportFilter())}.
26+
* When installed, {@code ClientSessionTransportFilter} attaches the SessionID to gRPC's {@code ServerCall} transport
27+
* attributes.
28+
*/
29+
public class ClientSessionTransportFilter extends ServerTransportFilter implements SessionLifecycleEventSource {
30+
/**
31+
* The key used to retrieve a SessionID from gRPC's {@code ServerCall} transport attributes.
32+
*/
33+
public static final Attributes.Key<UUID> TRANSPORT_ATTRIBUTES_SESSION_ID =
34+
Attributes.Key.of("TRANSPORT_ATTRIBUTES_SESSION_ID");
35+
36+
private final Set<SessionLifecycleEventListener> sessionLifecycleEventListeners = new CopyOnWriteArraySet<>();
37+
38+
@Override
39+
public void addSessionEventListener(SessionLifecycleEventListener listener) {
40+
checkNotNull(listener, "listener");
41+
sessionLifecycleEventListeners.add(listener);
42+
}
43+
44+
@Override
45+
public void removeSessionEventListener(SessionLifecycleEventListener listener) {
46+
checkNotNull(listener, "listener");
47+
sessionLifecycleEventListeners.remove(listener);
48+
}
49+
50+
@Override
51+
public Attributes transportReady(Attributes transportAttrs) {
52+
checkNotNull(transportAttrs, "transportAttrs");
53+
54+
UUID sessionId = UUID.randomUUID();
55+
SessionLifecycleEvent event = new SessionLifecycleEvent(this, sessionId);
56+
sessionLifecycleEventListeners.forEach(listener -> listener.sessionStart(event));
57+
58+
return Attributes.newBuilder(transportAttrs).set(TRANSPORT_ATTRIBUTES_SESSION_ID, UUID.randomUUID()).build();
59+
}
60+
61+
@Override
62+
public void transportTerminated(Attributes transportAttrs) {
63+
checkNotNull(transportAttrs, "transportAttrs");
64+
65+
UUID sessionId = transportAttrs.get(TRANSPORT_ATTRIBUTES_SESSION_ID);
66+
if (sessionId != null) {
67+
SessionLifecycleEvent event = new SessionLifecycleEvent(this, sessionId);
68+
sessionLifecycleEventListeners.forEach(listener -> listener.sessionEnd(event));
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)