Skip to content

Commit c4cdc70

Browse files
committed
larger refactor of catalogd's structure
Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
1 parent 12c2ca9 commit c4cdc70

33 files changed

+1117
-1609
lines changed

cmd/catalogd/main.go

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,21 @@ import (
4646
"sigs.k8s.io/controller-runtime/pkg/client"
4747
"sigs.k8s.io/controller-runtime/pkg/healthz"
4848
"sigs.k8s.io/controller-runtime/pkg/log"
49-
"sigs.k8s.io/controller-runtime/pkg/metrics"
5049
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
5150
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
5251
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
5352

5453
ocv1 "github.com/operator-framework/operator-controller/api/v1"
55-
corecontrollers "github.com/operator-framework/operator-controller/internal/catalogd/controllers/core"
54+
"github.com/operator-framework/operator-controller/internal/catalogd/controllers"
5655
"github.com/operator-framework/operator-controller/internal/catalogd/features"
5756
"github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection"
58-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers"
59-
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
60-
"github.com/operator-framework/operator-controller/internal/catalogd/serverutil"
57+
"github.com/operator-framework/operator-controller/internal/catalogd/handler"
58+
v1 "github.com/operator-framework/operator-controller/internal/catalogd/handler/api/v1"
6159
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
6260
"github.com/operator-framework/operator-controller/internal/catalogd/webhook"
6361
sharedcontrollers "github.com/operator-framework/operator-controller/internal/shared/controllers"
6462
fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs"
63+
http2 "github.com/operator-framework/operator-controller/internal/shared/util/http"
6564
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
6665
"github.com/operator-framework/operator-controller/internal/shared/util/pullsecretcache"
6766
sautil "github.com/operator-framework/operator-controller/internal/shared/util/sa"
@@ -328,61 +327,60 @@ func run(ctx context.Context) error {
328327
},
329328
}
330329

331-
var localStorage storage.Instance
332-
metrics.Registry.MustRegister(catalogdmetrics.RequestDurationMetric)
333-
334330
storeDir := filepath.Join(cfg.cacheDir, storageDir)
335331
if err := os.MkdirAll(storeDir, 0700); err != nil {
336332
setupLog.Error(err, "unable to create storage directory for catalogs")
337333
return err
338334
}
339335

340-
baseStorageURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", cfg.externalAddr))
336+
const catalogsSubPath = "catalogs"
337+
baseCatalogsURL, err := url.Parse(fmt.Sprintf("%s/%s", cfg.externalAddr, catalogsSubPath))
341338
if err != nil {
342339
setupLog.Error(err, "unable to create base storage URL")
343340
return err
344341
}
345342

346-
indexer := storage.NewIndexer()
347-
handlersMap := map[string]http.Handler{
348-
"/all": handlers.V1AllHandler(indexer),
349-
}
350-
351-
if features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler) {
352-
handlersMap["/metas"] = handlers.V1MetasHandler(indexer)
353-
}
354-
355-
if features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler) {
356-
handlersMap["/graphql"] = handlers.V1GraphQLHandler()
357-
}
358-
359-
localStorage = &storage.LocalDirV1{
360-
Indexer: indexer,
361-
Handlers: handlersMap,
362-
RootDir: storeDir,
363-
RootURL: baseStorageURL,
364-
}
343+
storageInstances := configureStorage(storeDir)
344+
catalogdHandler := handler.NewStandardHandler(
345+
newAPIV1Handler(catalogsSubPath, storageInstances),
346+
)
365347

366348
// Config for the catalogd web server
367-
catalogServerConfig := serverutil.CatalogServerConfig{
368-
ExternalAddr: cfg.externalAddr,
369-
CatalogAddr: cfg.catalogServerAddr,
370-
CertFile: cfg.certFile,
371-
KeyFile: cfg.keyFile,
372-
LocalStorage: localStorage,
349+
catalogServerConfig := http2.ServerConfig{
350+
Name: "catalogs",
351+
OnlyServeWhenLeader: true,
352+
ListenAddr: cfg.catalogServerAddr,
353+
Server: &http.Server{
354+
Handler: catalogdHandler,
355+
ReadTimeout: 5 * time.Second,
356+
WriteTimeout: 5 * time.Minute,
357+
},
358+
}
359+
if cfg.certFile != "" && cfg.keyFile != "" {
360+
catalogServerConfig.TLSConfig = &tls.Config{
361+
GetCertificate: cw.GetCertificate,
362+
MinVersion: tls.VersionTLS12,
363+
}
373364
}
374365

375-
err = serverutil.AddCatalogServerToManager(mgr, catalogServerConfig, cw)
366+
catalogServer, err := http2.NewManagerServer(catalogServerConfig)
376367
if err != nil {
377368
setupLog.Error(err, "unable to configure catalog server")
378369
return err
379370
}
371+
if err := mgr.Add(catalogServer); err != nil {
372+
setupLog.Error(err, "unable to add catalog server to manager")
373+
return err
374+
}
380375

381-
if err = (&corecontrollers.ClusterCatalogReconciler{
376+
if err = (&controllers.ClusterCatalogReconciler{
382377
Client: mgr.GetClient(),
383378
ImageCache: imageCache,
384379
ImagePuller: imagePuller,
385-
Storage: localStorage,
380+
Storage: storageInstances,
381+
GetBaseURL: func(catalogName string) string {
382+
return fmt.Sprintf("%s/%s", baseCatalogsURL, catalogName)
383+
},
386384
}).SetupWithManager(mgr); err != nil {
387385
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
388386
return err
@@ -452,3 +450,33 @@ func podNamespace() string {
452450
}
453451
return string(namespace)
454452
}
453+
454+
func configureStorage(storeDir string) *storage.Instances {
455+
metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler)
456+
graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler)
457+
needsIndices := metasEnabled || graphqlEnabled
458+
459+
// Setup storage instances
460+
storageInstances := storage.Instances{}
461+
storageInstances.Files = storage.NewFiles(storeDir)
462+
463+
if needsIndices {
464+
storageInstances.Indices = storage.NewIndices(storeDir)
465+
}
466+
if graphqlEnabled {
467+
storageInstances.GraphQLSchemas = storage.NewGraphQLSchemas()
468+
}
469+
470+
return &storageInstances
471+
}
472+
473+
func newAPIV1Handler(baseURLPath string, si *storage.Instances) *v1.APIV1Handler {
474+
metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler)
475+
graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler)
476+
477+
return v1.NewAPIV1Handler(baseURLPath, si,
478+
v1.WithAllHandler(true),
479+
v1.WithMetasHandler(metasEnabled),
480+
v1.WithGraphQLHandler(graphqlEnabled),
481+
)
482+
}

internal/catalogd/controllers/core/clustercatalog_controller.go renamed to internal/catalogd/controllers/clustercatalog_controller.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package core
17+
package controllers
1818

1919
import (
2020
"context" // #nosec
2121
"errors"
2222
"fmt"
23+
"io/fs"
24+
"iter"
2325
"slices"
2426
"sync"
2527
"time"
@@ -38,6 +40,8 @@ import (
3840
"sigs.k8s.io/controller-runtime/pkg/log"
3941
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4042

43+
"github.com/operator-framework/operator-registry/alpha/declcfg"
44+
4145
ocv1 "github.com/operator-framework/operator-controller/api/v1"
4246
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
4347
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
@@ -57,7 +61,8 @@ type ClusterCatalogReconciler struct {
5761
ImageCache imageutil.Cache
5862
ImagePuller imageutil.Puller
5963

60-
Storage storage.Instance
64+
Storage storage.Instance
65+
GetBaseURL func(catalogName string) string
6166

6267
finalizers crfinalizer.Finalizers
6368

@@ -224,7 +229,7 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
224229
case !hasStoredCatalog:
225230
l.Info("unpack required: no cached catalog metadata found for this catalog")
226231
needsUnpack = true
227-
case !r.Storage.ContentExists(catalog.Name):
232+
case !r.Storage.Exists(catalog.Name):
228233
l.Info("unpack required: no stored content found for this catalog")
229234
needsUnpack = true
230235
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus):
@@ -265,12 +270,12 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
265270
// TODO: We should check to see if the unpacked result has the same content
266271
// as the already unpacked content. If it does, we should skip this rest
267272
// of the unpacking steps.
268-
if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil {
273+
if err := r.Storage.Store(ctx, catalog.Name, walkMetasFSIterator(ctx, fsys)); err != nil {
269274
storageErr := fmt.Errorf("error storing fbc: %v", err)
270275
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
271276
return ctrl.Result{}, storageErr
272277
}
273-
baseURL := r.Storage.BaseURL(catalog.Name)
278+
baseURL := r.GetBaseURL(catalog.Name)
274279

275280
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
276281
updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration())
@@ -296,8 +301,8 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *ocv1.ClusterCatalog)
296301

297302
// Set expected status based on what we see in the stored catalog
298303
clearUnknownConditions(expectedStatus)
299-
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
300-
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
304+
if hasStoredCatalog && r.Storage.Exists(catalog.Name) {
305+
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.GetBaseURL(catalog.Name), storedCatalog.observedGeneration)
301306
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
302307
}
303308

@@ -458,7 +463,7 @@ func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) {
458463
}
459464

460465
func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catalog *ocv1.ClusterCatalog) error {
461-
if err := r.Storage.Delete(catalog.Name); err != nil {
466+
if err := r.Storage.Delete(ctx, catalog.Name); err != nil {
462467
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
463468
return err
464469
}
@@ -470,3 +475,12 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal
470475
r.deleteStoredCatalog(catalog.Name)
471476
return nil
472477
}
478+
479+
func walkMetasFSIterator(ctx context.Context, fsys fs.FS) iter.Seq2[*declcfg.Meta, error] {
480+
return func(yield func(*declcfg.Meta, error) bool) {
481+
_ = declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error {
482+
yield(meta, err)
483+
return nil
484+
}, declcfg.WithConcurrency(1))
485+
}
486+
}

internal/catalogd/controllers/core/clustercatalog_controller_test.go renamed to internal/catalogd/controllers/clustercatalog_controller_test.go

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
package core
1+
package controllers
22

33
import (
44
"context"
55
"errors"
66
"fmt"
7-
"io/fs"
8-
"net/http"
7+
"iter"
98
"testing"
109
"testing/fstest"
1110
"time"
@@ -21,6 +20,8 @@ import (
2120
ctrl "sigs.k8s.io/controller-runtime"
2221
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2322

23+
"github.com/operator-framework/operator-registry/alpha/declcfg"
24+
2425
ocv1 "github.com/operator-framework/operator-controller/api/v1"
2526
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
2627
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
@@ -32,29 +33,21 @@ type MockStore struct {
3233
shouldError bool
3334
}
3435

35-
func (m MockStore) Store(_ context.Context, _ string, _ fs.FS) error {
36+
func (m MockStore) Store(_ context.Context, _ string, _ iter.Seq2[*declcfg.Meta, error]) error {
3637
if m.shouldError {
3738
return errors.New("mockstore store error")
3839
}
3940
return nil
4041
}
4142

42-
func (m MockStore) Delete(_ string) error {
43+
func (m MockStore) Delete(_ context.Context, _ string) error {
4344
if m.shouldError {
4445
return errors.New("mockstore delete error")
4546
}
4647
return nil
4748
}
4849

49-
func (m MockStore) BaseURL(_ string) string {
50-
return "URL"
51-
}
52-
53-
func (m MockStore) StorageServerHandler() http.Handler {
54-
panic("not needed")
55-
}
56-
57-
func (m MockStore) ContentExists(_ string) bool {
50+
func (m MockStore) Exists(_ string) bool {
5851
return true
5952
}
6053

@@ -807,6 +800,7 @@ func TestCatalogdControllerReconcile(t *testing.T) {
807800
ImagePuller: tt.puller,
808801
ImageCache: tt.cache,
809802
Storage: tt.store,
803+
GetBaseURL: func(catalogName string) string { return "URL" },
810804
storedCatalogs: map[string]storedCatalogData{},
811805
}
812806
if reconciler.ImageCache == nil {
@@ -915,7 +909,8 @@ func TestPollingRequeue(t *testing.T) {
915909
ImageFS: &fstest.MapFS{},
916910
Ref: ref,
917911
},
918-
Storage: &MockStore{},
912+
Storage: &MockStore{},
913+
GetBaseURL: func(catalogName string) string { return "URL" },
919914
storedCatalogs: map[string]storedCatalogData{
920915
tc.catalog.Name: {
921916
ref: ref,
@@ -1140,6 +1135,7 @@ func TestPollingReconcilerUnpack(t *testing.T) {
11401135
Client: nil,
11411136
ImagePuller: &imageutil.MockPuller{Error: errors.New("mockpuller error")},
11421137
Storage: &MockStore{},
1138+
GetBaseURL: func(catalogName string) string { return "URL" },
11431139
storedCatalogs: scd,
11441140
}
11451141
require.NoError(t, reconciler.setupFinalizers())
Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,36 @@
1-
package handlers
1+
package v1
22

33
import (
4-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/handlerutil"
54
"net/http"
65

76
"k8s.io/klog/v2"
87

8+
"github.com/operator-framework/operator-controller/internal/catalogd/handler/internal/handlerutil"
99
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
1010
)
1111

12-
func V1AllHandler(indexer *storage.Indexer) http.Handler {
13-
return handlerutil.AllowedMethodsHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
12+
func apiV1AllHandler(files *storage.Files) http.Handler {
13+
allHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1414
catalog := r.PathValue("catalog")
1515
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
1616

17-
idx, err := indexer.GetIndex(catalog)
17+
catalogFile, err := files.Get(catalog)
1818
if err != nil {
19-
logger.Error(err, "error getting index")
19+
logger.Error(err, "error getting catalog file")
2020
http.Error(w, "Not found", http.StatusNotFound)
2121
return
2222
}
23+
defer catalogFile.Close()
2324

24-
catalogFile := idx.All()
25-
catalogStat, err := idx.Stat()
25+
catalogStat, err := catalogFile.Stat()
2626
if err != nil {
27-
logger.Error(err, "error stat-ing index")
27+
logger.Error(err, "error stat-ing catalog file")
2828
http.Error(w, "Internal server error", http.StatusInternalServerError)
2929
return
3030
}
3131

3232
w.Header().Add("Content-Type", "application/jsonl")
3333
http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile)
34-
}), http.MethodGet, http.MethodHead)
34+
})
35+
return handlerutil.AllowedMethodsHandler(allHandler, http.MethodGet, http.MethodHead)
3536
}

0 commit comments

Comments
 (0)