Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions api/api_gomux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

"github.com/gorilla/websocket"
httpSwagger "github.com/swaggo/http-swagger"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/v2/bson"

"github.com/square/etre"
"github.com/square/etre/app"
Expand Down Expand Up @@ -494,13 +494,13 @@ func (api *API) id(next http.Handler) http.Handler {
rc := ctx.Value(reqKey).(*req)

var err error
var entityId primitive.ObjectID
var entityId bson.ObjectID

id := r.PathValue("id") // 1. from URL
if id == "" {
err = ErrMissingParam.New("missing id param")
} else {
entityId, err = primitive.ObjectIDFromHex(id) // 2. convert to/validate as ObjectID
entityId, err = bson.ObjectIDFromHex(id) // 2. convert to/validate as ObjectID
if err != nil {
err = ErrInvalidParam.New("id '%s' is not a valid ObjectID: %v", id, err)
}
Expand Down Expand Up @@ -1309,8 +1309,8 @@ func (api *API) WriteResult(rc *req, w http.ResponseWriter, ids interface{}, err
diffs := ids.([]etre.Entity)
writes = make([]etre.Write, len(diffs))
for i, diff := range diffs {
// _id from db is primitive.ObjectID, convert to string
id := diff["_id"].(primitive.ObjectID).Hex()
// _id from db is bson.ObjectID, convert to string
id := diff["_id"].(bson.ObjectID).Hex()
writes[i] = etre.Write{
EntityId: id,
URI: api.addr + etre.API_ROOT + "/entity/" + id,
Expand All @@ -1336,9 +1336,9 @@ func (api *API) WriteResult(rc *req, w http.ResponseWriter, ids interface{}, err
// Entity from DeleteLabel
diff := ids.(etre.Entity)

// _id from db is primitive.ObjectID, convert to string
// _id from db is bson.ObjectID, convert to string
if diff != nil && diff["_id"] != nil {
id = diff["_id"].(primitive.ObjectID).Hex()
id = diff["_id"].(bson.ObjectID).Hex()
}
writes = []etre.Write{
{
Expand Down
8 changes: 4 additions & 4 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/v2/bson"

"github.com/square/etre"
"github.com/square/etre/api"
Expand Down Expand Up @@ -60,9 +60,9 @@ var testEntities = []etre.Entity{
var testEntityIds = []string{"59f10d2a5669fc79103a0000", "59f10d2a5669fc79103a1111", "59f10d2a5669fc79103a2222"}

var (
testEntityId0, _ = primitive.ObjectIDFromHex(testEntityIds[0])
testEntityId1, _ = primitive.ObjectIDFromHex(testEntityIds[1])
testEntityId2, _ = primitive.ObjectIDFromHex(testEntityIds[2])
testEntityId0, _ = bson.ObjectIDFromHex(testEntityIds[0])
testEntityId1, _ = bson.ObjectIDFromHex(testEntityIds[1])
testEntityId2, _ = bson.ObjectIDFromHex(testEntityIds[2])
)

var testEntitiesWithObjectIDs = []etre.Entity{
Expand Down
4 changes: 2 additions & 2 deletions cdc/changestream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

"github.com/square/etre"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions cdc/changestream/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"

"github.com/square/etre"
"github.com/square/etre/cdc"
Expand Down
12 changes: 7 additions & 5 deletions cdc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"sort"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"

"github.com/square/etre"
)
Expand Down Expand Up @@ -107,8 +107,10 @@ func (s *store) Read(f Filter) ([]etre.CDCEvent, error) {
// etre.CDC to match so, below, cursor.All() doesn't have to realloc the
// slice. For small fetches, this is overkill, but it makes large fetchs
// (>100k events) very quick and efficient.
opts := options.Count().SetMaxTime(5 * time.Second)
count, err := s.coll.CountDocuments(context.TODO(), q, opts)
opts := options.Count()
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
count, err := s.coll.CountDocuments(cctx, q, opts)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"

"github.com/square/etre"
"github.com/square/etre/cdc"
Expand Down Expand Up @@ -47,7 +47,7 @@ func setup(t *testing.T, fallbackFile string, wrp cdc.RetryPolicy) cdc.Store {
// First time, create unique index on "x"
if coll == nil {
iv := cdcColl.Indexes()
if _, err := iv.DropAll(context.TODO()); err != nil {
if err := iv.DropAll(context.TODO()); err != nil {
t.Fatal(err)
}
idx := mongo.IndexModel{
Expand Down
22 changes: 4 additions & 18 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
package db

import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"log"
"time"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"

"github.com/square/etre/config"
)
Expand Down Expand Up @@ -68,24 +67,11 @@ func (d Default) Connect(cfg config.DatasourceConfig) (*mongo.Client, error) {
log.Printf("WARNING: No database username for %s specified in config. Authentication will fail unless MongoDB access control is disabled.", cfg.URL)
}

client, err := mongo.NewClient(opts)
if err != nil {
return nil, err
}

// mongo.Connect() does not actually connect:
// The Client.Connect method starts background goroutines to monitor the
// state of the deployment and does not do any I/O in the main goroutine to
// prevent the main goroutine from blocking. Therefore, it will not error if
// the deployment is down.
// https://pkg.go.dev/go.mongodb.org/mongo-driver/mongo?tab=doc#Connect
// mongo.Connect() does not actually connect to the database.
// The caller must call client.Ping() to actually connect. Consequently,
// we don't need a context here. As long as there's not a bug in the mongo
// driver, this won't block.
if err := client.Connect(context.Background()); err != nil {
return nil, err
}
return client, nil
return mongo.Connect(opts)
}

func loadTLS(cfg config.DatasourceConfig) (*tls.Config, error) {
Expand Down
13 changes: 6 additions & 7 deletions entity/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"github.com/square/etre"
"github.com/square/etre/query"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)

type DbError struct {
Expand Down Expand Up @@ -66,16 +65,16 @@ func Filter(q query.Query) bson.M {
if p.Label == etre.META_LABEL_ID {
switch p.Value.(type) {
case string:
id, _ := primitive.ObjectIDFromHex(p.Value.(string))
id, _ := bson.ObjectIDFromHex(p.Value.(string))
filter[p.Label] = bson.M{operatorMap[p.Operator]: id}
case []string:
vals := p.Value.([]string)
oids := make([]primitive.ObjectID, len(vals))
oids := make([]bson.ObjectID, len(vals))
for i, v := range vals {
oids[i], _ = primitive.ObjectIDFromHex(v)
oids[i], _ = bson.ObjectIDFromHex(v)
}
filter[p.Label] = bson.M{operatorMap[p.Operator]: oids}
case primitive.ObjectID:
case bson.ObjectID:
filter[p.Label] = bson.M{operatorMap[p.Operator]: p.Value}
default:
panic(fmt.Sprintf("invalid _id value type: %T", p.Value))
Expand Down
36 changes: 23 additions & 13 deletions entity/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ package entity

import (
"context"
"errors"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"

"github.com/square/etre"
"github.com/square/etre/cdc"
Expand Down Expand Up @@ -58,7 +58,17 @@ func (s store) ReadEntities(ctx context.Context, entityType string, q query.Quer
// "es -u node.metacluster zone=pd" returns a list of unique metacluster names.
// This is 10x faster than "es node.metacluster zone=pd | sort -u".
if len(f.ReturnLabels) == 1 && f.Distinct {
values, err := c.Distinct(ctx, f.ReturnLabels[0], Filter(q))
dr := c.Distinct(ctx, f.ReturnLabels[0], Filter(q))
if err := dr.Err(); err != nil {
nfe := mongo.ErrNoDocuments
if errors.Is(err, nfe) {
// No documents found, return empty slice
return []etre.Entity{}, nil
}
return nil, s.dbError(ctx, err, "db-read-distinct")
}
var values []string
err := dr.Decode(&values)
if err != nil {
return nil, s.dbError(ctx, err, "db-read-distinct")
}
Expand Down Expand Up @@ -122,7 +132,7 @@ func (s store) CreateEntities(ctx context.Context, wo WriteOp, entities []etre.E

now := time.Now().UnixNano()
for i := range entities {
entities[i]["_id"] = primitive.NewObjectID()
entities[i]["_id"] = bson.NewObjectID()
entities[i]["_type"] = wo.EntityType
entities[i]["_rev"] = int64(0)
entities[i]["_created"] = now
Expand All @@ -132,7 +142,7 @@ func (s store) CreateEntities(ctx context.Context, wo WriteOp, entities []etre.E
if err != nil {
return newIds, s.dbError(ctx, err, "db-insert")
}
id := res.InsertedID.(primitive.ObjectID)
id := res.InsertedID.(bson.ObjectID)
newIds = append(newIds, id.Hex())

// Create a CDC event.
Expand Down Expand Up @@ -195,7 +205,7 @@ func (s store) UpdateEntities(ctx context.Context, wo WriteOp, q query.Query, pa
}
opts := options.FindOneAndUpdate().SetProjection(p)

nextId := map[string]primitive.ObjectID{}
nextId := map[string]bson.ObjectID{}
for cursor.Next(ctx) {
if err := cursor.Decode(&nextId); err != nil {
return diffs, s.dbError(ctx, err, "db-cursor-decode")
Expand All @@ -222,7 +232,7 @@ func (s store) UpdateEntities(ctx context.Context, wo WriteOp, q query.Query, pa

cp := cdcPartial{
op: "u",
id: orig["_id"].(primitive.ObjectID),
id: orig["_id"].(bson.ObjectID),
rev: orig.Rev() + 1,
old: &old,
new: &patch,
Expand Down Expand Up @@ -266,7 +276,7 @@ func (s store) DeleteEntities(ctx context.Context, wo WriteOp, q query.Query) ([
deleted = append(deleted, old)
ce := cdcPartial{
op: "d",
id: old["_id"].(primitive.ObjectID),
id: old["_id"].(bson.ObjectID),
old: &old,
new: nil,
rev: old.Rev() + 1,
Expand All @@ -286,7 +296,7 @@ func (s store) DeleteLabel(ctx context.Context, wo WriteOp, label string) (etre.
panic("invalid entity type passed to DeleteLabel: " + wo.EntityType)
}

id, _ := primitive.ObjectIDFromHex(wo.EntityId)
id, _ := bson.ObjectIDFromHex(wo.EntityId)
filter := bson.M{"_id": id}
update := bson.M{
"$unset": bson.M{label: ""}, // removes label, Mongo expects "" (see $unset docs)
Expand Down Expand Up @@ -322,7 +332,7 @@ func (s store) DeleteLabel(ctx context.Context, wo WriteOp, label string) (etre.

cp := cdcPartial{
op: "u",
id: old["_id"].(primitive.ObjectID),
id: old["_id"].(bson.ObjectID),
new: &new,
old: &old,
rev: old.Rev() + 1,
Expand Down Expand Up @@ -355,7 +365,7 @@ func (s store) dbError(ctx context.Context, err error, errType string) error {
// which makes a complete CDCEvent from the partial and a WriteOp.
type cdcPartial struct {
op string
id primitive.ObjectID
id bson.ObjectID
old *etre.Entity
new *etre.Entity
rev int64
Expand Down
Loading