Skip to content

Commit c2b2de2

Browse files
committed
rewind feature to stream method
1 parent 452245e commit c2b2de2

25 files changed

+1134
-791
lines changed

.bumpversion.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.13.2
2+
current_version = 0.13.3
33
commit = False
44
tag = False
55

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6060

6161
## [0.12.1] - 2020-12-23
6262
- PutDoc, PutDocs, PutConnection, PutConnections for full create-or-replace functionality
63+
64+
## [0.13.2] - 2020-12-28
65+
- Add "rewind" feature to all stream method to capture historical messages

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version := "0.13.2"
1+
version := "0.13.3"
22

33
.DEFAULT_GOAL := help
44

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ https://graphikdb.github.io/graphik/
77

88
`git clone git@github.com:graphikDB/graphik.git`
99

10-
`docker pull graphikdb/graphik:v0.13.2`
10+
`docker pull graphikdb/graphik:v0.13.3`
1111

1212
Graphik is a Backend as a Service implemented as an identity-aware, permissioned, persistant document/graph database & pubsub server written in Go.
1313

@@ -910,7 +910,7 @@ add this docker-compose.yml to ${pwd}:
910910
version: '3.7'
911911
services:
912912
graphik:
913-
image: graphikdb/graphik:v0.13.2
913+
image: graphikdb/graphik:v0.13.3
914914
env_file:
915915
- .env
916916
ports:

database/constants.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var (
2828
dbConstraints = []byte("constraints")
2929
dbIndexDocs = []byte("indexedDocs")
3030
dbIndexConnections = []byte("indexedConnections")
31+
dbMessages = []byte("messages")
3132
// An error indicating a given key does not exist
3233
ErrNotFound = errors.New("not found")
3334
ErrAlreadyExists = errors.New("already exists")

database/graph.go

+60-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package database
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"fmt"
@@ -23,6 +24,7 @@ import (
2324
"google.golang.org/grpc/codes"
2425
"google.golang.org/grpc/metadata"
2526
"google.golang.org/grpc/status"
27+
"google.golang.org/protobuf/proto"
2628
"google.golang.org/protobuf/types/known/structpb"
2729
"google.golang.org/protobuf/types/known/timestamppb"
2830
"io/ioutil"
@@ -39,11 +41,11 @@ import (
3941
type Graph struct {
4042
// db is the underlying handle to the db.
4143
db *bbolt.DB
44+
pubsub *bbolt.DB
4245
jwksMu sync.RWMutex
4346
jwksSet *jwk.Set
4447
jwtCache *generic.Cache
4548
openID *openIDConnect
46-
path string
4749
mu sync.RWMutex
4850
connectionsTo map[string]map[string]struct{}
4951
connectionsFrom map[string]map[string]struct{}
@@ -63,21 +65,24 @@ type Graph struct {
6365
// NewGraph takes a file path and returns a connected Raft backend.
6466
func NewGraph(ctx context.Context, flgs *apipb.Flags, lgger *logger.Logger) (*Graph, error) {
6567
os.MkdirAll(flgs.StoragePath, 0700)
66-
path := filepath.Join(flgs.StoragePath, "graph.db")
67-
handle, err := bbolt.Open(path, dbFileMode, nil)
68+
graphDB, err := bbolt.Open(filepath.Join(flgs.StoragePath, "graph.db"), dbFileMode, nil)
69+
if err != nil {
70+
return nil, err
71+
}
72+
pubsubDB, err := bbolt.Open(filepath.Join(flgs.StoragePath, "pubsub.db"), dbFileMode, nil)
6873
if err != nil {
6974
return nil, err
7075
}
7176

7277
var closers []func()
7378
m := machine.New(ctx, machine.WithMaxRoutines(100000))
7479
g := &Graph{
75-
db: handle,
80+
db: graphDB,
7681
jwksMu: sync.RWMutex{},
7782
jwksSet: nil,
7883
jwtCache: generic.NewCache(5 * time.Minute),
7984
openID: nil,
80-
path: path,
85+
pubsub: pubsubDB,
8186
mu: sync.RWMutex{},
8287
connectionsTo: map[string]map[string]struct{}{},
8388
connectionsFrom: map[string]map[string]struct{}{},
@@ -153,6 +158,16 @@ func NewGraph(ctx context.Context, flgs *apipb.Flags, lgger *logger.Logger) (*Gr
153158
if err != nil {
154159
return nil, err
155160
}
161+
if err := pubsubDB.Update(func(tx *bbolt.Tx) error {
162+
_, err = tx.CreateBucketIfNotExists(dbMessages)
163+
if err != nil {
164+
return errors.Wrap(err, "failed to create messages bucket")
165+
}
166+
return nil
167+
}); err != nil {
168+
return nil, err
169+
}
170+
156171
if err := g.cacheConnectionRefs(); err != nil {
157172
return nil, err
158173
}
@@ -860,7 +875,7 @@ func (g *Graph) Broadcast(ctx context.Context, message *apipb.OutboundMessage) (
860875
return nil, prepareError(ErrFailedToGetUser)
861876
}
862877
if message.GetChannel() == changeChannel {
863-
return nil, status.Error(codes.PermissionDenied, "forbidden from publishing to the changes channel")
878+
return nil, status.Error(codes.PermissionDenied, "forbidden from broadcasting to the state channel")
864879
}
865880
_, err := g.applyCommand(&apipb.RaftCommand{
866881
User: user,
@@ -912,6 +927,41 @@ func (g *Graph) Stream(filter *apipb.StreamFilter, server apipb.DatabaseService_
912927
return true
913928
}
914929
}
930+
if filter.GetRewind() != "" {
931+
dur, err := time.ParseDuration(filter.GetRewind())
932+
if err != nil {
933+
return status.Error(codes.InvalidArgument, err.Error())
934+
}
935+
if err := g.pubsub.View(func(tx *bbolt.Tx) error {
936+
msgsBucket := tx.Bucket(dbMessages)
937+
bucket := msgsBucket.Bucket([]byte(filter.Channel))
938+
if bucket == nil {
939+
bucket, err = msgsBucket.CreateBucketIfNotExists([]byte(filter.Channel))
940+
if err != nil {
941+
return err
942+
}
943+
}
944+
c := bucket.Cursor()
945+
946+
min := helpers.Uint64ToBytes(uint64(time.Now().Truncate(dur).UnixNano()))
947+
max := helpers.Uint64ToBytes(uint64(time.Now().UnixNano()))
948+
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
949+
var msg = &apipb.Message{}
950+
if err := proto.Unmarshal(v, msg); err != nil {
951+
return errors.Wrap(err, "failed to unmarshal message")
952+
}
953+
if filterFunc(msg) {
954+
if err := server.Send(msg); err != nil {
955+
return err
956+
}
957+
}
958+
}
959+
return nil
960+
}); err != nil {
961+
return prepareError(err)
962+
}
963+
}
964+
915965
if err := g.machine.PubSub().SubscribeFilter(server.Context(), filter.GetChannel(), filterFunc, func(msg interface{}) {
916966
if err, ok := msg.(error); ok && err != nil {
917967
g.logger.Error("failed to send subscription", zap.Error(err))
@@ -939,7 +989,10 @@ func (b *Graph) Close() {
939989
}
940990
b.machine.Wait()
941991
if err := b.db.Close(); err != nil {
942-
b.logger.Error("failed to close db", zap.Error(err))
992+
b.logger.Error("failed to close graph db", zap.Error(err))
993+
}
994+
if err := b.pubsub.Close(); err != nil {
995+
b.logger.Error("failed to close pubsub db", zap.Error(err))
943996
}
944997
})
945998
}

database/raft.go

+21
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
apipb "github.com/graphikDB/graphik/gen/grpc/go"
7+
"github.com/graphikDB/graphik/helpers"
78
"github.com/graphikDB/raft/fsm"
89
"github.com/hashicorp/raft"
910
"github.com/pkg/errors"
@@ -128,6 +129,26 @@ func (g *Graph) fsm() *fsm.FSM {
128129
}
129130
}
130131
if cmd.GetSendMessage() != nil {
132+
if err := g.pubsub.Update(func(tx *bbolt.Tx) error {
133+
msgsBucket := tx.Bucket(dbMessages)
134+
bucket := msgsBucket.Bucket([]byte(cmd.SendMessage.Channel))
135+
if bucket == nil {
136+
bucket, err = msgsBucket.CreateBucketIfNotExists([]byte(cmd.SendMessage.Channel))
137+
if err != nil {
138+
return err
139+
}
140+
}
141+
bits, err := proto.Marshal(cmd.SendMessage)
142+
if err != nil {
143+
return err
144+
}
145+
if err := bucket.Put(helpers.Uint64ToBytes(uint64(cmd.SendMessage.Timestamp.AsTime().UnixNano())), bits); err != nil {
146+
return err
147+
}
148+
return nil
149+
}); err != nil {
150+
return err
151+
}
131152
if err := g.machine.PubSub().Publish(cmd.SendMessage.Channel, cmd.SendMessage); err != nil {
132153
return status.Error(codes.Internal, err.Error())
133154
}

docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: '3.7'
22
services:
33
graphik:
4-
image: graphikdb/graphik:v0.13.2
4+
image: graphikdb/graphik:v0.13.3
55
env_file:
66
- .env
77
ports:

gen/gql/docs/streamfilter.doc.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ <h2 id="graphql-schema-definition" class="graphdoc-section__title slds-text-head
464464
</a>
465465
GraphQL Schema definition
466466
</h2>
467-
<code class="highlight"><ul class="code" style="padding-left:28px"><li><span class="keyword operator ts">input</span> <span class="identifier">StreamFilter</span> {</li><li><span class="tab"><li><span class="tab"><span class="comment line"># channel is the target channel to listen on</span></span></li><li><span class="tab"><span class="meta">channel</span>: <a class="support type" href="string.doc.html">String</a>!</span></li></span></li><li><span class="tab"><li><span class="tab"><span class="comment line"># expression is a CEL expression used to filter messages</span></span></li><li><span class="tab"><span class="meta">expression</span>: <a class="support type" href="string.doc.html">String</a></span></li></span></li><li>}</li></ul></code>
467+
<code class="highlight"><ul class="code" style="padding-left:28px"><li><span class="keyword operator ts">input</span> <span class="identifier">StreamFilter</span> {</li><li><span class="tab"><li><span class="tab"><span class="comment line"># channel is the target channel to listen on</span></span></li><li><span class="tab"><span class="meta">channel</span>: <a class="support type" href="string.doc.html">String</a>!</span></li></span></li><li><span class="tab"><li><span class="tab"><span class="comment line"># expression is a CEL expression used to filter messages</span></span></li><li><span class="tab"><span class="meta">expression</span>: <a class="support type" href="string.doc.html">String</a></span></li></span></li><li><span class="tab"><li><span class="tab"><span class="comment line"># rewind time by a specified duration to capture messages from history</span></span></li><li><span class="tab"><span class="meta">rewind</span>: <a class="support type" href="string.doc.html">String</a></span></li></span></li><li>}</li></ul></code>
468468
</div>
469469
</section>
470470
<section>

gen/gql/go/generated/generated.go

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gen/gql/go/model/models_gen.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)