diff --git a/kinsumer.go b/kinsumer.go index b1dcace..cda3fea 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -4,6 +4,7 @@ package kinsumer import ( "fmt" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "sync" "sync/atomic" "time" @@ -118,6 +119,35 @@ func NewWithInterfaces(kinesis kinesisiface.KinesisAPI, dynamodb dynamodbiface.D return consumer, nil } +func (k *Kinsumer) ResetAllToLatest() error { + + shardIDs, err := loadShardIDsFromKinesis(k.kinesis, k.streamName) + if err != nil { + return err + } + + attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{ + ":sn": aws.String("LATEST"), + }) + + for _, shardId := range shardIDs { + _, err := k.dynamodb.UpdateItem( + &dynamodb.UpdateItemInput{ + TableName: aws.String(k.checkpointTableName), + UpdateExpression: aws.String("SET SequenceNumber = :sn"), + Key: map[string]*dynamodb.AttributeValue{ + "Shard": {S: aws.String(shardId)}, + }, + ExpressionAttributeValues: attrVals, + }) + if err != nil { + return err + } + } + + return nil +} + // refreshShards registers our client, refreshes the lists of clients and shards, checks if we // have become/unbecome the leader, and returns whether the shards/clients changed. //TODO: Write unit test - needs dynamo _and_ kinesis mocking @@ -465,6 +495,27 @@ func (k *Kinsumer) Next() (data []byte, err error) { return data, err } +// NextRaw is a blocking function used to get the next record from the kinesis queue, or errors that +// occurred during the processing of kinesis. It's up to the caller to stop processing by calling 'Stop()' +// +// It is different from Next in that it returns the Kinesis record object directly +// +// if err is non nil an error occurred in the system. +// if err is nil and data is nil then kinsumer has been stopped +func (k *Kinsumer) NextRaw() (data *kinesis.Record, err error) { + select { + case err = <-k.errors: + return nil, err + case record, ok := <-k.output: + if ok { + k.config.stats.EventToClient(*record.record.ApproximateArrivalTimestamp, record.retrievedAt) + data = record.record + } + } + + return data, err +} + // CreateRequiredTables will create the required dynamodb tables // based on the applicationName func (k *Kinsumer) CreateRequiredTables() error {