@@ -24,6 +24,7 @@ import ( //nolint:gci // False positive, seemingly caused by the CouchDB driver
2424
2525const (
2626 couchDBUsersTable = "_users"
27+ revIDFieldKey = "_rev"
2728
2829 designDocumentName = "AriesStorageDesignDocument"
2930 payloadFieldKey = "payload"
@@ -34,19 +35,28 @@ const (
3435 databaseNotFoundErrMsgFromKivik = "Not Found: Database does not exist."
3536 documentUpdateConflictErrMsgFromKivik = "Conflict: Document update conflict."
3637
37- failGetDatabaseHandle = "failed to get database handle: %w"
38- failGetExistingIndexes = "failed to get existing indexes: %w"
39- failureWhileScanningRow = "failure while scanning row: %w"
40- failGetTagsFromRawDoc = "failed to get tags from raw CouchDB document: %w"
41- failGetRevisionID = "failed to get revision ID: %w"
42- failPutValueViaClient = "failed to put value via client: %w"
38+ failGetDatabaseHandle = "failed to get database handle: %w"
39+ failGetExistingIndexes = "failed to get existing indexes: %w"
40+ failureWhileScanningRow = "failure while scanning row: %w"
41+ failGetTagsFromRawDoc = "failed to get tags from raw CouchDB document: %w"
42+ failGetRevisionID = "failed to get revision ID: %w"
43+ failPutValueViaClient = "failed to put value via client: %w"
44+ failWhileScanResultRows = "failure while scanning result rows: %w"
45+ failSendRequestToFindEndpoint = "failure while sending request to CouchDB find endpoint: %w"
46+
47+ expressionTagNameOnlyLength = 1
48+ expressionTagNameAndValueLength = 2
4349)
4450
51+ var errInvalidQueryExpressionFormat = errors .New ("invalid expression format. " +
52+ "it must be in the following format: TagName:TagValue" )
53+
4554type marshalFunc func (interface {}) ([]byte , error )
4655
4756type db interface {
4857 Get (ctx context.Context , docID string , options ... kivik.Options ) * kivik.Row
4958 Put (ctx context.Context , docID string , doc interface {}, options ... kivik.Options ) (rev string , err error )
59+ Find (ctx context.Context , query interface {}, options ... kivik.Options ) (* kivik.Rows , error )
5060 Delete (ctx context.Context , docID , rev string , options ... kivik.Options ) (newRev string , err error )
5161 Close (ctx context.Context ) error
5262}
@@ -260,6 +270,8 @@ type Store struct {
260270// Put stores the key + value pair along with the (optional) tags.
261271// TODO (#40) Values stored under keys containing special URL characters like `/`
262272// are not retrievable due to a bug in the underlying Kivik library.
273+ // TODO (#44) Tags do not have to be defined in the store config prior to storing data that uses them.
274+ // Should all store implementations require tags to be defined in store config before allowing them to be used?
263275func (s * Store ) Put (k string , v []byte , tags ... newstorage.Tag ) error {
264276 if k == "" {
265277 return errors .New ("key cannot be empty" )
@@ -314,12 +326,12 @@ func (s *Store) Get(k string) ([]byte, error) {
314326 return nil , fmt .Errorf (failureWhileScanningRow , err )
315327 }
316328
317- storedValue , err := s .getStoredValueFromRawDoc (rawDoc )
329+ storedValue , err := s .getValueFromRawDoc (rawDoc , payloadFieldKey )
318330 if err != nil {
319331 return nil , fmt .Errorf ("failed to get payload from raw document: %w" , err )
320332 }
321333
322- return storedValue , nil
334+ return [] byte ( storedValue ) , nil
323335}
324336
325337// GetTags fetches all tags associated with the given key.
@@ -359,11 +371,66 @@ func (s *Store) GetBulk(keys ...string) ([][]byte, error) {
359371// If TagValue is not provided, then all data associated with the TagName will be returned.
360372// For now, expression can only be a single tag Name + Value pair.
361373// If no options are provided, then defaults will be used.
374+ // For improved performance, ensure that the tag name you are querying is included in the store config, as this
375+ // will ensure that it's indexed in CouchDB.
376+ // TODO (#44) Should we make the store config mandatory?
362377func (s * Store ) Query (expression string , options ... newstorage.QueryOption ) (newstorage.Iterator , error ) {
363- return & couchDBResultsIterator {}, errors .New ("not implemented" )
378+ if expression == "" {
379+ return & couchDBResultsIterator {}, errInvalidQueryExpressionFormat
380+ }
381+
382+ queryOptions := getQueryOptions (options )
383+
384+ expressionSplit := strings .Split (expression , ":" )
385+
386+ switch len (expressionSplit ) {
387+ case expressionTagNameOnlyLength :
388+ expressionTagName := expressionSplit [0 ]
389+
390+ findQuery := fmt .Sprintf (`{"selector":{"%s":{"$exists":true}},"limit":%d}` ,
391+ expressionTagName , queryOptions .PageSize )
392+
393+ resultRows , err := s .db .Find (context .Background (), findQuery )
394+ if err != nil {
395+ return nil , fmt .Errorf (failSendRequestToFindEndpoint , err )
396+ }
397+
398+ queryWithPageSizeAndBookmarkPlaceholders := `{"selector":{"` +
399+ expressionTagName + `":{"$exists":true}},"limit":%d,"bookmark":"%s"}`
400+
401+ return & couchDBResultsIterator {
402+ store : s ,
403+ resultRows : resultRows ,
404+ pageSize : queryOptions .PageSize ,
405+ queryWithPageSizeAndBookmarkPlaceholders : queryWithPageSizeAndBookmarkPlaceholders ,
406+ }, nil
407+ case expressionTagNameAndValueLength :
408+ expressionTagName := expressionSplit [0 ]
409+ expressionTagValue := expressionSplit [1 ]
410+
411+ findQuery := fmt .Sprintf (`{"selector":{"%s":"%s"},"limit":%d}` ,
412+ expressionTagName , expressionTagValue , queryOptions .PageSize )
413+
414+ queryWithPageSizeAndBookmarkPlaceholders := `{"selector":{"` +
415+ expressionTagName + `":"` + expressionTagValue + `"},"limit":%d,"bookmark":"%s"}`
416+
417+ resultRows , err := s .db .Find (context .Background (), findQuery )
418+ if err != nil {
419+ return nil , fmt .Errorf (failSendRequestToFindEndpoint , err )
420+ }
421+
422+ return & couchDBResultsIterator {
423+ store : s ,
424+ resultRows : resultRows ,
425+ pageSize : queryOptions .PageSize ,
426+ queryWithPageSizeAndBookmarkPlaceholders : queryWithPageSizeAndBookmarkPlaceholders ,
427+ }, nil
428+ default :
429+ return & couchDBResultsIterator {}, errInvalidQueryExpressionFormat
430+ }
364431}
365432
366- // Delete deletes the key + value pair (and all tags) associated with key .
433+ // Delete deletes the key + value pair (and all tags) associated with k .
367434func (s * Store ) Delete (k string ) error {
368435 if k == "" {
369436 return errors .New ("key is mandatory" )
@@ -412,7 +479,7 @@ func (s *Store) put(k string, value []byte) error {
412479 }
413480
414481 if revID != "" {
415- value = []byte (`{"_rev ":"` + revID + `",` + string (value [1 :]))
482+ value = []byte (`{"` + revIDFieldKey + ` ":"` + revID + `",` + string (value [1 :]))
416483 }
417484
418485 _ , err = s .db .Put (context .Background (), k , value )
@@ -456,61 +523,161 @@ func (s *Store) getRevID(k string) (string, error) {
456523 return "" , err
457524 }
458525
459- revID , ok := rawDoc ["_rev" ]
460- if ! ok {
461- return "" , errors .New ("revision ID was missing from the raw document" )
462- }
463-
464- revIDString , ok := revID .(string )
465- if ! ok {
466- return "" , errors .New ("unable to assert revision ID as a string" )
526+ revID , err := s .getValueFromRawDoc (rawDoc , revIDFieldKey )
527+ if err != nil {
528+ return "" , fmt .Errorf ("failed to get revision ID from the raw document: %w" , err )
467529 }
468530
469- return revIDString , nil
531+ return revID , nil
470532}
471533
472- func (s * Store ) getStoredValueFromRawDoc (rawDoc map [string ]interface {}) ([] byte , error ) {
473- storedValue , ok := rawDoc [payloadFieldKey ]
534+ func (s * Store ) getValueFromRawDoc (rawDoc map [string ]interface {}, rawDocKey string ) (string , error ) {
535+ value , ok := rawDoc [rawDocKey ]
474536 if ! ok {
475- return nil , errors . New ( "payload was unexpectedly missing from raw document" )
537+ return "" , fmt . Errorf ( `"%s" is missing from the raw document` , rawDocKey )
476538 }
477539
478- storedValueString , ok := storedValue .(string )
540+ valueString , ok := value .(string )
479541 if ! ok {
480- return nil , errors .New ("stored value could not be asserted as a string" )
542+ return "" ,
543+ fmt .Errorf (`value associated with the "%s" key in the raw document ` +
544+ `could not be asserted as a string` , rawDocKey )
481545 }
482546
483- return [] byte ( storedValueString ) , nil
547+ return valueString , nil
484548}
485549
486550type couchDBResultsIterator struct {
551+ store * Store
552+ resultRows * kivik.Rows
553+ pageSize int
554+ queryWithPageSizeAndBookmarkPlaceholders string
555+ numDocumentsReturnedInThisPage int
487556}
488557
489558// Next moves the pointer to the next value in the iterator. It returns false if the iterator is exhausted.
490559// Note that the Kivik library automatically closes the kivik.Rows iterator if the iterator is exhausted.
491560func (i * couchDBResultsIterator ) Next () (bool , error ) {
492- return false , errors .New ("not implemented" )
561+ nextCallResult := i .resultRows .Next ()
562+
563+ // If no applicable index could be found to speed up the query, then we will receive a warning here.
564+ // This most likely reasons for no index being found is that either the Provider's StoreConfiguration
565+ // was never set, or it was set but was missing the queried tag name.
566+ // This value is only set by Kivik on the final iteration (once all the rows have been iterated through).
567+ logAnyWarning (i )
568+
569+ err := i .resultRows .Err ()
570+ if err != nil {
571+ return nextCallResult , fmt .Errorf ("failure during iteration of result rows: %w" , err )
572+ }
573+
574+ if nextCallResult {
575+ i .numDocumentsReturnedInThisPage ++
576+ } else {
577+ if i .numDocumentsReturnedInThisPage < i .pageSize {
578+ // All documents have been returned - no need to attempt fetching any more pages.
579+ return false , nil
580+ }
581+
582+ err := i .resultRows .Close ()
583+ if err != nil {
584+ return false , fmt .Errorf ("failed to close result rows before fetching new page: %w" , err )
585+ }
586+
587+ // Try fetching another page of documents.
588+ // Kivik only sets the bookmark value after all result rows have been enumerated via the Next call.
589+ // Note that the presence of a bookmark doesn't guarantee that there are more results.
590+ // It's necessary to instead compare the number of returned documents against the page size (done above)
591+ // See https://docs.couchdb.org/en/stable/api/database/find.html#pagination for more information.
592+ newPageNextCallResult , err := i .fetchAnotherPage ()
593+ if err != nil {
594+ return false , errors .New ("failure while fetching new page: %w" )
595+ }
596+
597+ return newPageNextCallResult , nil
598+ }
599+
600+ return nextCallResult , nil
493601}
494602
495603// Release releases associated resources. Release should always result in success
496604// and can be called multiple times without causing an error.
497605func (i * couchDBResultsIterator ) Release () error {
498- return errors .New ("not implemented" )
606+ err := i .resultRows .Close ()
607+ if err != nil {
608+ return fmt .Errorf ("failed to close result rows: %w" , err )
609+ }
610+
611+ return nil
499612}
500613
501614// Key returns the key of the current key-value pair.
502615// A nil error likely means that the key list is exhausted.
503616func (i * couchDBResultsIterator ) Key () (string , error ) {
504- return "" , errors .New ("not implemented" )
617+ rawDoc := make (map [string ]interface {})
618+
619+ err := i .resultRows .ScanDoc (& rawDoc )
620+ if err != nil {
621+ return "" , fmt .Errorf (failWhileScanResultRows , err )
622+ }
623+
624+ key , err := i .store .getValueFromRawDoc (rawDoc , "_id" )
625+ if err != nil {
626+ return "" , fmt .Errorf ("failure while getting key from the raw document: %w" , err )
627+ }
628+
629+ return key , nil
505630}
506631
507632// Value returns the value of the current key-value pair.
508633func (i * couchDBResultsIterator ) Value () ([]byte , error ) {
509- return nil , errors .New ("not implemented" )
634+ rawDoc := make (map [string ]interface {})
635+
636+ err := i .resultRows .ScanDoc (& rawDoc )
637+ if err != nil {
638+ return nil , fmt .Errorf (failWhileScanResultRows , err )
639+ }
640+
641+ value , err := i .store .getValueFromRawDoc (rawDoc , payloadFieldKey )
642+ if err != nil {
643+ return nil , fmt .Errorf ("failure while getting value from the raw document: %w" , err )
644+ }
645+
646+ return []byte (value ), nil
510647}
511648
512649func (i * couchDBResultsIterator ) Tags () ([]newstorage.Tag , error ) {
513- return nil , errors .New ("not implemented" )
650+ rawDoc := make (map [string ]interface {})
651+
652+ err := i .resultRows .ScanDoc (& rawDoc )
653+ if err != nil {
654+ return nil , fmt .Errorf (failWhileScanResultRows , err )
655+ }
656+
657+ tags , err := getTagsFromRawDoc (rawDoc )
658+ if err != nil {
659+ return nil , fmt .Errorf (failGetTagsFromRawDoc , err )
660+ }
661+
662+ return tags , nil
663+ }
664+
665+ func (i * couchDBResultsIterator ) fetchAnotherPage () (bool , error ) {
666+ var err error
667+
668+ i .resultRows , err = i .store .db .Find (context .Background (),
669+ fmt .Sprintf (i .queryWithPageSizeAndBookmarkPlaceholders , i .pageSize , i .resultRows .Bookmark ()))
670+ if err != nil {
671+ return false , fmt .Errorf ("failure while sending request to CouchDB find endpoint: %w" , err )
672+ }
673+
674+ followupNextCallResult := i .resultRows .Next ()
675+
676+ if followupNextCallResult {
677+ i .numDocumentsReturnedInThisPage = 1
678+ }
679+
680+ return followupNextCallResult , nil
514681}
515682
516683func validateTagNames (config newstorage.StoreConfiguration ) error {
@@ -581,12 +748,26 @@ func createIndexes(db *kivik.DB, tagNamesNeedIndexCreation []string) error {
581748 return nil
582749}
583750
751+ func getQueryOptions (options []newstorage.QueryOption ) newstorage.QueryOptions {
752+ var queryOptions newstorage.QueryOptions
753+
754+ for _ , option := range options {
755+ option (& queryOptions )
756+ }
757+
758+ if queryOptions .PageSize == 0 {
759+ queryOptions .PageSize = 25
760+ }
761+
762+ return queryOptions
763+ }
764+
584765func getTagsFromRawDoc (rawDoc map [string ]interface {}) ([]newstorage.Tag , error ) {
585766 var tags []newstorage.Tag
586767
587768 for key , value := range rawDoc {
588769 // Any key that isn't one of the reserved keywords below must be a tag.
589- if key != "_id" && key != "_rev" && key != payloadFieldKey {
770+ if key != "_id" && key != revIDFieldKey && key != payloadFieldKey {
590771 valueString , ok := value .(string )
591772 if ! ok {
592773 return nil , errors .New ("failed to assert tag value as string" )
@@ -601,3 +782,11 @@ func getTagsFromRawDoc(rawDoc map[string]interface{}) ([]newstorage.Tag, error)
601782
602783 return tags , nil
603784}
785+
786+ func logAnyWarning (i * couchDBResultsIterator ) {
787+ warningMsg := i .resultRows .Warning ()
788+
789+ if warningMsg != "" {
790+ i .store .logger .Warnf (warningMsg )
791+ }
792+ }
0 commit comments