Skip to content

Commit 31ce192

Browse files
authored
MEDIUM: Add support for prepared queries (#50)
This adds support for prepared queries destinations in addition to service ones: an upstream can now target a prepared query and benefit from features such as fallback between DCs. Since prepared queries do not support bloquing requests this uses polling to periodically check for changes. The default polling rate is 30s: the same as the one consul uses, and can be overriden with the "poll_interval" config option.
1 parent eaa0576 commit 31ce192

File tree

6 files changed

+165
-19
lines changed

6 files changed

+165
-19
lines changed

consul/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type Config struct {
1515
}
1616

1717
type Upstream struct {
18-
Service string
18+
Name string
1919
LocalBindAddress string
2020
LocalBindPort int
2121
Protocol string

consul/watcher.go

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package consul
22

33
import (
44
"crypto/x509"
5+
"fmt"
6+
"reflect"
57
"sync"
68
"time"
79

@@ -13,13 +15,14 @@ const (
1315
defaultDownstreamBindAddr = "0.0.0.0"
1416
defaultUpstreamBindAddr = "127.0.0.1"
1517

16-
errorWaitTime = 5 * time.Second
18+
errorWaitTime = 5 * time.Second
19+
preparedQueryPollInterval = 30 * time.Second
1720
)
1821

1922
type upstream struct {
2023
LocalBindAddress string
2124
LocalBindPort int
22-
Service string
25+
Name string
2326
Datacenter string
2427
Protocol string
2528
Nodes []*api.ServiceEntry
@@ -138,12 +141,18 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
138141

139142
if srv.Proxy != nil {
140143
for _, up := range srv.Proxy.Upstreams {
141-
keep[up.DestinationName] = true
144+
name := fmt.Sprintf("%s_%s", up.DestinationType, up.DestinationName)
145+
keep[name] = true
142146
w.lock.Lock()
143-
_, ok := w.upstreams[up.DestinationName]
147+
_, ok := w.upstreams[name]
144148
w.lock.Unlock()
145149
if !ok {
146-
w.startUpstream(up)
150+
switch up.DestinationType {
151+
case api.UpstreamDestTypePreparedQuery:
152+
w.startUpstreamPreparedQuery(up, name)
153+
default:
154+
w.startUpstreamService(up, name)
155+
}
147156
}
148157
}
149158
}
@@ -159,24 +168,22 @@ func (w *Watcher) handleProxyChange(first bool, srv *api.AgentService) {
159168
}
160169
}
161170

162-
func (w *Watcher) startUpstream(up api.Upstream) {
171+
func (w *Watcher) startUpstreamService(up api.Upstream, name string) {
163172
w.log.Infof("consul: watching upstream for service %s", up.DestinationName)
164173

165174
u := &upstream{
166175
LocalBindAddress: up.LocalBindAddress,
167176
LocalBindPort: up.LocalBindPort,
168-
Service: up.DestinationName,
177+
Name: name,
169178
Datacenter: up.Datacenter,
170179
}
171180

172-
if up.Config["protocol"] != nil {
173-
if p, ok := up.Config["protocol"].(string); ok {
174-
u.Protocol = p
175-
}
181+
if p, ok := up.Config["protocol"].(string); ok {
182+
u.Protocol = p
176183
}
177184

178185
w.lock.Lock()
179-
w.upstreams[up.DestinationName] = u
186+
w.upstreams[name] = u
180187
w.lock.Unlock()
181188

182189
go func() {
@@ -209,6 +216,75 @@ func (w *Watcher) startUpstream(up api.Upstream) {
209216
}()
210217
}
211218

219+
func (w *Watcher) startUpstreamPreparedQuery(up api.Upstream, name string) {
220+
w.log.Infof("consul: watching upstream for prepared_query %s", up.DestinationName)
221+
222+
u := &upstream{
223+
LocalBindAddress: up.LocalBindAddress,
224+
LocalBindPort: up.LocalBindPort,
225+
Name: name,
226+
Datacenter: up.Datacenter,
227+
}
228+
229+
if p, ok := up.Config["protocol"].(string); ok {
230+
u.Protocol = p
231+
}
232+
233+
interval := preparedQueryPollInterval
234+
if p, ok := up.Config["poll_interval"].(string); ok {
235+
dur, err := time.ParseDuration(p)
236+
if err != nil {
237+
w.log.Errorf(
238+
"consul: upstream %s %s: invalid poll interval %s: %s",
239+
up.DestinationType,
240+
up.DestinationName,
241+
p,
242+
err,
243+
)
244+
return
245+
}
246+
interval = dur
247+
}
248+
249+
w.lock.Lock()
250+
w.upstreams[name] = u
251+
w.lock.Unlock()
252+
253+
go func() {
254+
var last []*api.ServiceEntry
255+
for {
256+
if u.done {
257+
return
258+
}
259+
nodes, _, err := w.consul.PreparedQuery().Execute(up.DestinationName, &api.QueryOptions{
260+
Connect: true,
261+
Datacenter: up.Datacenter,
262+
WaitTime: 10 * time.Minute,
263+
})
264+
if err != nil {
265+
w.log.Errorf("consul: error fetching service definition for service %s: %s", up.DestinationName, err)
266+
time.Sleep(errorWaitTime)
267+
continue
268+
}
269+
270+
nodesP := []*api.ServiceEntry{}
271+
for i := range nodes.Nodes {
272+
nodesP = append(nodesP, &nodes.Nodes[i])
273+
}
274+
275+
if !reflect.DeepEqual(last, nodesP) {
276+
w.lock.Lock()
277+
u.Nodes = nodesP
278+
w.lock.Unlock()
279+
w.notifyChanged()
280+
last = nodesP
281+
}
282+
283+
time.Sleep(interval)
284+
}
285+
}()
286+
}
287+
212288
func (w *Watcher) removeUpstream(name string) {
213289
w.log.Infof("consul: removing upstream for service %s", name)
214290

@@ -366,7 +442,7 @@ func (w *Watcher) genCfg() Config {
366442

367443
for _, up := range w.upstreams {
368444
upstream := Upstream{
369-
Service: up.Service,
445+
Name: up.Name,
370446
LocalBindAddress: up.LocalBindAddress,
371447
LocalBindPort: up.LocalBindPort,
372448
Protocol: up.Protocol,

haproxy/state/snapshot_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func GetTestConsulConfig() consul.Config {
2121
},
2222
Upstreams: []consul.Upstream{
2323
consul.Upstream{
24-
Service: "service_1",
24+
Name: "service_1",
2525
LocalBindAddress: "127.0.0.1",
2626
LocalBindPort: 10000,
2727
Nodes: []consul.UpstreamNode{

haproxy/state/upstream.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
)
99

1010
func generateUpstream(opts Options, certStore CertificateStore, cfg consul.Upstream, oldState, newState State) (State, error) {
11-
feName := fmt.Sprintf("front_%s", cfg.Service)
12-
beName := fmt.Sprintf("back_%s", cfg.Service)
11+
feName := fmt.Sprintf("front_%s", cfg.Name)
12+
beName := fmt.Sprintf("back_%s", cfg.Name)
1313
feMode := models.FrontendModeHTTP
1414
beMode := models.BackendModeHTTP
1515

haproxy_test.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io/ioutil"
66
"testing"
7+
"time"
78

89
"net/http"
910

@@ -13,7 +14,7 @@ import (
1314
"github.com/stretchr/testify/require"
1415
)
1516

16-
func TestSetup(t *testing.T) {
17+
func TestService(t *testing.T) {
1718
err := haproxy_cmd.CheckEnvironment(haproxy_cmd.DefaultDataplaneBin, haproxy_cmd.DefaultHAProxyBin)
1819
if err != nil {
1920
t.Skipf("CANNOT Run test because of missing requirement: %s", err.Error())
@@ -64,3 +65,72 @@ func TestSetup(t *testing.T) {
6465
res.Body.Close()
6566
require.Equal(t, "hello connect", string(body))
6667
}
68+
69+
func TestPreparedQuery(t *testing.T) {
70+
err := haproxy_cmd.CheckEnvironment(haproxy_cmd.DefaultDataplaneBin, haproxy_cmd.DefaultHAProxyBin)
71+
if err != nil {
72+
t.Skipf("CANNOT Run test because of missing requirement: %s", err.Error())
73+
}
74+
sd := lib.NewShutdown()
75+
client := startAgent(t, sd)
76+
defer func() {
77+
sd.Shutdown("test end")
78+
sd.Wait()
79+
}()
80+
81+
_, _, err = client.PreparedQuery().Create(&api.PreparedQueryDefinition{
82+
Name: "pq-",
83+
Service: api.ServiceQuery{
84+
Service: "${match(1)}",
85+
OnlyPassing: true,
86+
},
87+
Template: api.QueryTemplate{
88+
Type: "name_prefix_match",
89+
Regexp: "^pq-(.+)$",
90+
},
91+
}, &api.WriteOptions{})
92+
require.NoError(t, err)
93+
94+
csd, _, upstreamPorts := startConnectService(t, sd, client, &api.AgentServiceRegistration{
95+
Name: "source",
96+
ID: "source-1",
97+
98+
Connect: &api.AgentServiceConnect{
99+
SidecarService: &api.AgentServiceRegistration{
100+
Proxy: &api.AgentServiceConnectProxyConfig{
101+
Upstreams: []api.Upstream{
102+
api.Upstream{
103+
DestinationType: api.UpstreamDestTypePreparedQuery,
104+
DestinationName: "pq-target",
105+
Config: map[string]interface{}{
106+
"poll_interval": (100 * time.Millisecond).String(),
107+
},
108+
},
109+
},
110+
},
111+
},
112+
},
113+
})
114+
115+
tsd, servicePort, _ := startConnectService(t, sd, client, &api.AgentServiceRegistration{
116+
Name: "target",
117+
ID: "target-1",
118+
119+
Connect: &api.AgentServiceConnect{
120+
SidecarService: &api.AgentServiceRegistration{
121+
Proxy: &api.AgentServiceConnectProxyConfig{},
122+
},
123+
},
124+
})
125+
126+
startServer(t, sd, servicePort, "hello connect prepared query")
127+
wait(sd, csd, tsd)
128+
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d", upstreamPorts["pq-target"]))
129+
require.NoError(t, err)
130+
require.Equal(t, 200, res.StatusCode)
131+
132+
body, err := ioutil.ReadAll(res.Body)
133+
require.NoError(t, err)
134+
res.Body.Close()
135+
require.Equal(t, "hello connect prepared query", string(body))
136+
}

utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func startServer(t *testing.T, sd *lib.Shutdown, port int, response string) {
139139
sd.Add(1)
140140
go func() {
141141
http.Serve(lis, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
142-
rw.Write([]byte("hello connect"))
142+
rw.Write([]byte(response))
143143
}))
144144
}()
145145
go func() {

0 commit comments

Comments
 (0)