Skip to content

Commit 122fca5

Browse files
committed
Major changes to how runners work, implicit service runners.
1 parent 16246d0 commit 122fca5

File tree

14 files changed

+481
-348
lines changed

14 files changed

+481
-348
lines changed

.github/workflows/release.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ jobs:
1515
with:
1616
go-version: "1.22"
1717

18-
- name: Build
19-
run: go build -v .
20-
2118
- name: Run GoReleaser
2219
uses: goreleaser/goreleaser-action@master
2320
with:

autonats/server.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,8 @@ func StartServer(opts Options) (srv *Server, err error) {
340340
// Wait for the server to be ready
341341
go func() {
342342
deadline := time.Now().Add(ServerStartTimeout)
343+
ctx, cancel := context.WithDeadline(context.Background(), deadline)
344+
defer cancel()
343345

344346
var conn *nats.Conn
345347
defer func() {
@@ -386,7 +388,11 @@ func StartServer(opts Options) (srv *Server, err error) {
386388

387389
// Test jetstream node placement
388390
jsc := lo.Must(jetstream.New(conn)) // There's nothing to fail here
389-
kv, err := jsc.CreateOrUpdateKeyValue(context.Background(), jetstream.KeyValueConfig{
391+
if _, err := jsc.AccountInfo(ctx); err != nil {
392+
logger.Info().Err(err).Msg("Init: Waiting for Jetstream to become ready")
393+
continue
394+
}
395+
kv, err := jsc.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
390396
Bucket: "pmesh-probe",
391397
Description: "pmesh-probe",
392398
TTL: 1 * time.Minute,
@@ -398,7 +404,7 @@ func StartServer(opts Options) (srv *Server, err error) {
398404
continue
399405
}
400406

401-
if _, err := kv.Put(context.Background(), "test", []byte{0x1}); err != nil {
407+
if _, err := kv.Put(ctx, "test", []byte{0x1}); err != nil {
402408
logger.Info().Err(err).Msg("Init: Waiting for Jetstream RAFT log to become ready (write)")
403409
time.Sleep(1 * time.Second)
404410
continue

client/api_kv.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"encoding/json"
5+
"fmt"
56

67
"get.pme.sh/pmesh/session"
78
)
@@ -50,15 +51,8 @@ func (c Client) KVList() (res []string, err error) {
5051
err = c.Call("GET /kv", nil, &res)
5152
return
5253
}
53-
func (c Client) KVDailyList() (res []string, err error) {
54-
err = c.Call("GET /kv/d", nil, &res)
55-
return
56-
}
57-
func (c Client) KVWeeklyList() (res []string, err error) {
58-
err = c.Call("GET /kv/w", nil, &res)
59-
return
60-
}
61-
func (c Client) KVMonthlyList() (res []string, err error) {
62-
err = c.Call("GET /kv/m", nil, &res)
54+
func (c Client) Result(stream string, seq int) (res json.RawMessage, err error) {
55+
path := fmt.Sprintf("/result/%s/%d", stream, seq)
56+
err = c.Call(path, nil, &res)
6357
return
6458
}

enats/gateway.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ type Gateway struct {
2929
// Internal resources
3030
PeerKV, SchedulerKV jetstream.KeyValue
3131
// Global resources for the user
32-
DefaultKV, DailyKV, WeeklyKV, MonthlyKV jetstream.KeyValue
32+
DefaultKV, ResultKV jetstream.KeyValue
3333

3434
EventStream jetstream.Stream
3535
}
3636

37+
const EventStreamPrefix = "ev."
38+
3739
func New() (r *Gateway) {
3840
r = &Gateway{}
3941

@@ -119,13 +121,7 @@ func (r *Gateway) Open(ctx context.Context) (err error) {
119121
if makeKV(&r.DefaultKV, "global", 0); err != nil {
120122
return
121123
}
122-
if makeKV(&r.DailyKV, "daily", 24*time.Hour); err != nil {
123-
return
124-
}
125-
if makeKV(&r.WeeklyKV, "weekly", 7*24*time.Hour); err != nil {
126-
return
127-
}
128-
if makeKV(&r.MonthlyKV, "monthly", 30*24*time.Hour); err != nil {
124+
if makeKV(&r.ResultKV, "results", 0); err != nil {
129125
return
130126
}
131127

@@ -142,7 +138,7 @@ func (r *Gateway) Open(ctx context.Context) (err error) {
142138
MaxBytes: -1,
143139
Duplicates: 0,
144140
AllowDirect: true,
145-
Subjects: []string{"ev.>"},
141+
Subjects: []string{EventStreamPrefix + ">"},
146142
})
147143
if err != nil {
148144
return
@@ -166,20 +162,6 @@ func (r *Gateway) Close(ctx context.Context) (err error) {
166162
return
167163
}
168164

169-
// Gets the KV store for the default kv usage given the key.
170-
func (r *Gateway) DefaultKVStore(key string) (kv jetstream.KeyValue) {
171-
if strings.HasPrefix(key, "d.") {
172-
kv = r.DailyKV
173-
} else if strings.HasPrefix(key, "w.") {
174-
kv = r.WeeklyKV
175-
} else if strings.HasPrefix(key, "m.") {
176-
kv = r.MonthlyKV
177-
} else {
178-
kv = r.DefaultKV
179-
}
180-
return
181-
}
182-
183165
// Wrappers
184166
func (r *Client) KVStore(ctx context.Context, cfg jetstream.KeyValueConfig) (kv jetstream.KeyValue, err error) {
185167
kv, err = r.Jet.CreateOrUpdateKeyValue(ctx, cfg)

enats/topics.go

Lines changed: 19 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,39 @@
11
package enats
22

33
import (
4-
"strconv"
54
"strings"
6-
7-
"get.pme.sh/pmesh/config"
8-
"get.pme.sh/pmesh/xlog"
95
)
106

11-
var AnyCastMachineID = config.MachineID(0)
12-
var remoteQueueName = "ev." + AnyCastMachineID.String() + "."
13-
var localQueueName = "ev." + config.GetMachineID().String() + "."
7+
const (
8+
userPrefix = "raw."
9+
)
1410

1511
// Given a nats subject, return the user-specified topic.
16-
func ToTopic(subject string) (topic string, dest config.MachineID) {
17-
dest = AnyCastMachineID
12+
func ToTopic(subject string) string {
13+
return strings.TrimPrefix(subject, EventStreamPrefix)
14+
}
1815

16+
// Given a user-specified topic, return the subject names to use for NATS consumers/publishers.
17+
func ToSubject(topic string) string {
1918
// User specified topic
20-
if strings.HasPrefix(subject, "jet.") {
21-
topic = strings.TrimPrefix(subject, "jet.")
22-
return
23-
}
24-
25-
// Remote topic
26-
if strings.HasPrefix(subject, remoteQueueName) {
27-
topic = subject[len(remoteQueueName):]
28-
return
29-
}
30-
31-
// Local topic
32-
if strings.HasPrefix(subject, localQueueName) {
33-
topic = subject[len(localQueueName):]
34-
dest = config.GetMachineID()
35-
return
36-
}
37-
38-
if strings.HasPrefix(subject, "ev.") {
39-
if len(subject) <= len(remoteQueueName) || subject[len(remoteQueueName)-1] != '.' {
40-
xlog.Warn().Str("subject", subject).Msg("Invalid subject")
41-
return
42-
}
43-
44-
u64, err := strconv.ParseUint(subject[len("ev."):len(remoteQueueName)-1], 16, 32)
45-
if err != nil {
46-
xlog.Warn().Str("subject", subject).Err(err).Msg("Invalid subject")
47-
return
48-
}
49-
50-
dest = config.MachineID(uint32(u64))
51-
topic = subject[len(remoteQueueName):]
52-
return
19+
if subject, ok := strings.CutPrefix(topic, userPrefix); ok {
20+
topic = subject
21+
} else if !strings.HasPrefix(topic, EventStreamPrefix) {
22+
topic = EventStreamPrefix + topic
5323
}
5424

55-
topic = subject
56-
return
57-
}
58-
59-
// Given a user-specified topic, return the subject names to use for NATS consumers.
60-
func ToConsumerSubjects(topic string) []string {
25+
// Wildcard
6126
if strings.HasSuffix(topic, ".") {
6227
topic += ">"
6328
}
64-
if strings.HasPrefix(topic, "jet.") {
65-
return []string{strings.TrimPrefix(topic, "jet.")}
66-
}
67-
if strings.HasPrefix(topic, "$local.") {
68-
return []string{localQueueName + strings.TrimPrefix(topic, "$local.")}
69-
}
70-
return []string{
71-
remoteQueueName + topic,
72-
localQueueName + topic,
73-
}
74-
}
75-
76-
// Given a user-specified topic, return the subject name to use for NATS publishers.
77-
func ToPublisherSubject(topic string) string {
78-
if strings.HasPrefix(topic, "jet.") {
79-
return strings.TrimPrefix(topic, "jet.")
80-
} else if !strings.HasPrefix(topic, "$local.") {
81-
return remoteQueueName + topic
82-
} else {
83-
return localQueueName + strings.TrimPrefix(topic, "$local.")
84-
}
85-
}
86-
func ToPublisherSubjectWithTarget(topic string, target config.MachineID) string {
87-
if strings.HasPrefix(topic, "jet.") {
88-
return strings.TrimPrefix(topic, "jet.")
89-
} else {
90-
return "ev." + config.GetMachineID().String() + "." + strings.TrimPrefix(topic, "$local.")
91-
}
29+
return topic
9230
}
9331

32+
// Given a user-specified topic, return the queue name to use for NATS consumers.
9433
func ToConsumerQueueName(pfx, topic string) string {
95-
queue := strings.ReplaceAll(pfx+topic, ".", "-")
96-
queue = strings.ReplaceAll(queue, "*", "all")
97-
queue = strings.ReplaceAll(queue, ">", "matchall")
34+
queue := pfx + ToSubject(topic)
35+
queue = strings.ReplaceAll(queue, ".", "-")
36+
queue = strings.ReplaceAll(queue, "*", "any")
37+
queue = strings.ReplaceAll(queue, ">", "all")
9838
return queue
9939
}

retry/policy.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"get.pme.sh/pmesh/util"
77
)
88

9+
const RetrierMaxDelayCoeff = 20
10+
911
type Policy struct {
1012
Attempts int `json:"attempts,omitempty" yaml:"attempts,omitempty"` // The maximum number of retries.
1113
Backoff util.Duration `json:"backoff,omitempty" yaml:"backoff,omitempty"` // The base delay between retries.
@@ -27,13 +29,33 @@ func Long() Policy {
2729
}
2830
}
2931

30-
func (p Policy) adjust() Policy {
31-
if p.Attempts == 0 {
32-
p.Attempts = 8
32+
func (retry Policy) WithDefaults() Policy {
33+
if retry.Attempts == 0 {
34+
retry.Attempts = 8
35+
}
36+
retry.Backoff = retry.Backoff.Or(150 * time.Millisecond)
37+
retry.Timeout = retry.Timeout.Or(30 * time.Second)
38+
return retry
39+
}
40+
41+
func (retry Policy) MaxDelay() time.Duration {
42+
return retry.Backoff.Duration() * RetrierMaxDelayCoeff
43+
}
44+
45+
func (retry Policy) StepN(n int) (delay time.Duration, err error) {
46+
if retry.Attempts > 0 && n >= retry.Attempts {
47+
return 0, ErrMaxAttemptsExceeded
48+
}
49+
delay = retry.Backoff.Duration()
50+
maxdelay := retry.MaxDelay()
51+
for ; n > 0; n-- {
52+
delay += retry.Backoff.Duration()
53+
delay = delay + (delay >> 1) // Exponential backoff
54+
if delay > maxdelay {
55+
return maxdelay, nil
56+
}
3357
}
34-
p.Backoff = p.Backoff.Or(150 * time.Millisecond)
35-
p.Timeout = p.Timeout.Or(30 * time.Second)
36-
return p
58+
return
3759
}
3860

3961
func (retry Policy) Step(step *int, delay *time.Duration) error {
@@ -50,6 +72,7 @@ func (retry Policy) Step(step *int, delay *time.Duration) error {
5072
} else {
5173
*delay += retry.Backoff.Duration()
5274
*delay = *delay + (*delay >> 1) // Exponential backoff
75+
*delay = min(*delay, retry.MaxDelay())
5376
}
5477
return nil
5578
}

retry/retrier.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@ type Retrier struct {
1414
Deadline time.Time
1515
}
1616

17-
const RetrierMaxDelayCoeff = 20
18-
1917
// Creates a retries with the given policy and context
2018
func (policy Policy) RetrierContext(ctx context.Context) Retrier {
2119
rt := Retrier{
2220
Context: ctx,
23-
Policy: policy.adjust(),
21+
Policy: policy.WithDefaults(),
2422
}
2523
rt.Deadline = time.Now().Add(rt.Policy.Timeout.Duration())
2624
if deadline, ok := ctx.Deadline(); ok && deadline.Before(rt.Deadline) {
@@ -38,7 +36,6 @@ func (policy Policy) Retrier() Retrier {
3836
func (retry *Retrier) NextDelay() (delay time.Duration, err error) {
3937
err = retry.Policy.Step(&retry.Step, &retry.Delay)
4038
if err == nil {
41-
retry.Delay = min(retry.Delay, retry.Policy.Backoff.Duration()*RetrierMaxDelayCoeff)
4239
delay = retry.Delay
4340
}
4441
return

session/api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ func writeOutput(req *http.Request, w http.ResponseWriter, result any, e error)
9898
output = result
9999
}
100100

101+
if raw, ok := output.(json.RawMessage); ok {
102+
w.Write(raw)
103+
return
104+
}
105+
101106
enc := json.NewEncoder(w)
102107
enc.SetEscapeHTML(false)
103108
if req.URL.Query().Has("pretty") {

session/api_control.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"get.pme.sh/pmesh/vhttp"
1414
"get.pme.sh/pmesh/xpost"
1515

16-
"github.com/nats-io/nats.go/jetstream"
16+
"github.com/nats-io/nats.go"
1717
)
1818

1919
func init() {
@@ -67,14 +67,23 @@ func init() {
6767
res = session.Peerlist.List(true)
6868
return
6969
})
70-
Match("/publish/{topic}", func(session *Session, r *http.Request, p json.RawMessage) (ack jetstream.PubAck, err error) {
71-
subject := enats.ToPublisherSubject(r.PathValue("topic"))
72-
a, e := session.Nats.Jet.Publish(r.Context(), subject, p)
73-
if e != nil {
74-
err = e
70+
Match("/publish/{topic}", func(session *Session, r *http.Request, p json.RawMessage) (ack json.RawMessage, err error) {
71+
subject := enats.ToSubject(r.PathValue("topic"))
72+
73+
deadline, ok := r.Context().Deadline()
74+
if !ok {
75+
deadline = time.Now().Add(30 * time.Second)
76+
}
77+
res, err := session.Nats.RequestMsg(&nats.Msg{
78+
Subject: subject,
79+
Data: p,
80+
Header: nats.Header(r.Header),
81+
}, time.Until(deadline))
82+
if err != nil {
7583
return
7684
}
77-
return *a, nil
85+
ack = res.Data
86+
return
7887
})
7988
MatchLocked("/reload", func(session *Session, r *http.Request, p ServiceInvalidate) (_ any, err error) {
8089
err = session.ReloadLocked(p.Invalidate)

0 commit comments

Comments
 (0)