Skip to content

Commit 5274205

Browse files
committed
handle multi source applications
this also removes the ability to reference a schemas dir relative to the root repo, but I'm hoping it's unused, as it's complicated to reason about or manage. a central schemas repo makes more sense.
1 parent 2d7b205 commit 5274205

File tree

7 files changed

+104
-87
lines changed

7 files changed

+104
-87
lines changed

pkg/app_watcher/app_watcher.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,14 @@ func canProcessApp(obj interface{}) (*appv1alpha1.Application, bool) {
152152
return nil, false
153153
}
154154

155-
for _, src := range app.Spec.Sources {
155+
if src := app.Spec.Source; src != nil {
156156
if isGitRepo(src.RepoURL) {
157157
return app, true
158158
}
159159
}
160160

161-
if app.Spec.Source != nil {
162-
if isGitRepo(app.Spec.Source.RepoURL) {
161+
for _, src := range app.Spec.Sources {
162+
if isGitRepo(src.RepoURL) {
163163
return app, true
164164
}
165165
}

pkg/appdir/vcstoargomap.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,37 +79,39 @@ func (v2a VcsToArgoMap) WalkKustomizeApps(cloneURL string, fs fs.FS) *AppDirecto
7979
return result
8080
}
8181

82-
func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) {
83-
if app.Spec.Source == nil {
84-
log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name)
85-
return
82+
func (v2a VcsToArgoMap) processApp(app v1alpha1.Application, fn func(*AppDirectory)) {
83+
84+
if src := app.Spec.Source; src != nil {
85+
appDirectory := v2a.GetAppsInRepo(src.RepoURL)
86+
fn(appDirectory)
8687
}
8788

88-
appDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL)
89-
appDirectory.ProcessApp(*app)
89+
for _, src := range app.Spec.Sources {
90+
appDirectory := v2a.GetAppsInRepo(src.RepoURL)
91+
fn(appDirectory)
92+
}
9093
}
9194

92-
func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) {
93-
if new.Spec.Source == nil {
94-
log.Warn().Msgf("%s/%s: no source, skipping", new.Namespace, new.Name)
95-
return
96-
}
95+
func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) {
96+
v2a.processApp(*app, func(directory *AppDirectory) {
97+
directory.AddApp(*app)
98+
})
99+
}
97100

98-
oldAppDirectory := v2a.GetAppsInRepo(old.Spec.Source.RepoURL)
99-
oldAppDirectory.RemoveApp(*old)
101+
func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) {
102+
v2a.processApp(*old, func(directory *AppDirectory) {
103+
directory.RemoveApp(*old)
104+
})
100105

101-
newAppDirectory := v2a.GetAppsInRepo(new.Spec.Source.RepoURL)
102-
newAppDirectory.ProcessApp(*new)
106+
v2a.processApp(*new, func(directory *AppDirectory) {
107+
directory.AddApp(*new)
108+
})
103109
}
104110

105111
func (v2a VcsToArgoMap) DeleteApp(app *v1alpha1.Application) {
106-
if app.Spec.Source == nil {
107-
log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name)
108-
return
109-
}
110-
111-
oldAppDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL)
112-
oldAppDirectory.RemoveApp(*app)
112+
v2a.processApp(*app, func(directory *AppDirectory) {
113+
directory.RemoveApp(*app)
114+
})
113115
}
114116

115117
func (v2a VcsToArgoMap) GetVcsRepos() []string {

pkg/checks/kubeconform/check.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,5 @@ import (
88
)
99

1010
func Check(ctx context.Context, request checks.Request) (msg.Result, error) {
11-
return argoCdAppValidate(
12-
ctx, request.Container, request.AppName, request.KubernetesVersion, request.Repo.Directory,
13-
request.YamlManifests,
14-
)
11+
return argoCdAppValidate(ctx, request.Container, request.AppName, request.KubernetesVersion, request.YamlManifests)
1512
}

pkg/checks/kubeconform/validate.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"io"
77
"os"
8-
"path/filepath"
98
"strings"
109

1110
"github.com/pkg/errors"
@@ -20,7 +19,7 @@ import (
2019

2120
var tracer = otel.Tracer("pkg/checks/kubeconform")
2221

23-
func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPath string) []string {
22+
func getSchemaLocations(ctr container.Container) []string {
2423
cfg := ctr.Config
2524

2625
locations := []string{
@@ -33,10 +32,6 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa
3332
if strings.HasPrefix(schemasLocation, "http://") || strings.HasPrefix(schemasLocation, "https://") {
3433
locations = append(locations, schemasLocation)
3534
} else {
36-
if !filepath.IsAbs(schemasLocation) {
37-
schemasLocation = filepath.Join(tempRepoPath, schemasLocation)
38-
}
39-
4035
if _, err := os.Stat(schemasLocation); err != nil {
4136
log.Warn().
4237
Err(err).
@@ -65,7 +60,7 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa
6560
return locations
6661
}
6762

68-
func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion, tempRepoPath string, appManifests []string) (msg.Result, error) {
63+
func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion string, appManifests []string) (msg.Result, error) {
6964
_, span := tracer.Start(ctx, "ArgoCdAppValidate")
7065
defer span.End()
7166

@@ -92,7 +87,7 @@ func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, ta
9287

9388
var (
9489
outputString []string
95-
schemaLocations = getSchemaLocations(ctx, ctr, tempRepoPath)
90+
schemaLocations = getSchemaLocations(ctr)
9691
)
9792

9893
log.Debug().Msgf("cache location: %s", vOpts.Cache)

pkg/checks/kubeconform/validate_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kubeconform
22

33
import (
4-
"context"
54
"fmt"
65
"os"
76
"strings"
@@ -16,17 +15,15 @@ import (
1615
)
1716

1817
func TestDefaultGetSchemaLocations(t *testing.T) {
19-
ctx := context.TODO()
2018
ctr := container.Container{}
21-
schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path")
19+
schemaLocations := getSchemaLocations(ctr)
2220

2321
// default schema location is "./schemas"
2422
assert.Len(t, schemaLocations, 1)
2523
assert.Equal(t, "default", schemaLocations[0])
2624
}
2725

2826
func TestGetRemoteSchemaLocations(t *testing.T) {
29-
ctx := context.TODO()
3027
ctr := container.Container{}
3128

3229
if os.Getenv("CI") == "" {
@@ -39,7 +36,7 @@ func TestGetRemoteSchemaLocations(t *testing.T) {
3936

4037
// t.Setenv("KUBECHECKS_SCHEMAS_LOCATION", fixture.URL) // doesn't work because viper needs to initialize from root, which doesn't happen
4138
viper.Set("schemas-location", []string{fixture.URL})
42-
schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path")
39+
schemaLocations := getSchemaLocations(ctr)
4340
hasTmpDirPrefix := strings.HasPrefix(schemaLocations[0], "/tmp/schemas")
4441
assert.Equal(t, hasTmpDirPrefix, true, "invalid schemas location. Schema location should have prefix /tmp/schemas but has %s", schemaLocations[0])
4542
}

pkg/events/runner.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/zapier/kubechecks/pkg"
1212
"github.com/zapier/kubechecks/pkg/checks"
1313
"github.com/zapier/kubechecks/pkg/container"
14-
"github.com/zapier/kubechecks/pkg/git"
1514
"github.com/zapier/kubechecks/pkg/msg"
1615
"github.com/zapier/kubechecks/telemetry"
1716
)
@@ -22,11 +21,7 @@ type Runner struct {
2221
wg sync.WaitGroup
2322
}
2423

25-
func newRunner(
26-
ctr container.Container, app v1alpha1.Application, repo *git.Repo,
27-
appName, k8sVersion string, jsonManifests, yamlManifests []string,
28-
logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application),
29-
) *Runner {
24+
func newRunner(ctr container.Container, app v1alpha1.Application, appName, k8sVersion string, jsonManifests, yamlManifests []string, logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application)) *Runner {
3025
return &Runner{
3126
Request: checks.Request{
3227
App: app,
@@ -38,7 +33,6 @@ func newRunner(
3833
Note: note,
3934
QueueApp: queueApp,
4035
RemoveApp: removeApp,
41-
Repo: repo,
4236
YamlManifests: yamlManifests,
4337
},
4438
}

pkg/events/worker.go

Lines changed: 70 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,32 @@ func (w *worker) run(ctx context.Context) {
4545
}
4646
}
4747

48+
type pathAndRepoUrl struct {
49+
Path, RepoURL, TargetRevision string
50+
}
51+
52+
func getAppSources(app v1alpha1.Application) []pathAndRepoUrl {
53+
var items []pathAndRepoUrl
54+
55+
if src := app.Spec.Source; src != nil {
56+
items = append(items, pathAndRepoUrl{
57+
Path: src.Path,
58+
RepoURL: src.RepoURL,
59+
TargetRevision: src.TargetRevision,
60+
})
61+
}
62+
63+
for _, src := range app.Spec.Sources {
64+
items = append(items, pathAndRepoUrl{
65+
Path: src.Path,
66+
RepoURL: src.RepoURL,
67+
TargetRevision: src.TargetRevision,
68+
})
69+
}
70+
71+
return items
72+
}
73+
4874
// processApp is a function that validates and processes a given application manifest against various checks,
4975
// such as ArgoCD schema validation, diff generation, conftest policy validation, and pre-upgrade checks using kubepug.
5076
// It takes a context (ctx), application name (app), directory (dir) as input and returns an error if any check fails.
@@ -54,27 +80,22 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) {
5480
var (
5581
err error
5682

57-
appName = app.Name
58-
appSrc = app.Spec.Source
59-
appPath = appSrc.Path
60-
appRepoUrl = appSrc.RepoURL
83+
appName = app.Name
6184

62-
logger = w.logger.With().
63-
Str("app_name", appName).
64-
Str("app_path", appPath).
65-
Logger()
85+
rootLogger = w.logger.With().
86+
Str("app_name", appName).
87+
Logger()
6688
)
6789

6890
ctx, span := tracer.Start(ctx, "processApp", trace.WithAttributes(
6991
attribute.String("app", appName),
70-
attribute.String("dir", appPath),
7192
))
7293
defer span.End()
7394

7495
atomic.AddInt32(&inFlight, 1)
7596
defer atomic.AddInt32(&inFlight, -1)
7697

77-
logger.Info().Msg("Processing app")
98+
rootLogger.Info().Msg("Processing app")
7899

79100
// Build a new section for this app in the parent comment
80101
w.vcsNote.AddNewApp(ctx, appName)
@@ -94,47 +115,58 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) {
94115
}
95116
}()
96117

97-
repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision)
98-
if err != nil {
99-
logger.Error().Err(err).Msg("Unable to clone repository")
100-
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
101-
State: pkg.StateError,
102-
Summary: "failed to clone repo",
103-
Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()),
104-
})
105-
return
106-
}
107-
repoPath := repo.Directory
118+
var jsonManifests []string
119+
sources := getAppSources(app)
120+
for _, appSrc := range sources {
121+
var (
122+
appPath = appSrc.Path
123+
appRepoUrl = appSrc.RepoURL
124+
logger = rootLogger.With().
125+
Str("app_path", appPath).
126+
Logger()
127+
)
128+
129+
repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision)
130+
if err != nil {
131+
logger.Error().Err(err).Msg("Unable to clone repository")
132+
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
133+
State: pkg.StateError,
134+
Summary: "failed to clone repo",
135+
Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()),
136+
})
137+
return
138+
}
139+
repoPath := repo.Directory
140+
141+
logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests")
142+
someJsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app)
143+
if err != nil {
144+
logger.Error().Err(err).Msg("Unable to get manifests")
145+
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
146+
State: pkg.StateError,
147+
Summary: "Unable to get manifests",
148+
Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)),
149+
})
150+
return
151+
}
108152

109-
logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests")
110-
jsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app)
111-
if err != nil {
112-
logger.Error().Err(err).Msg("Unable to get manifests")
113-
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
114-
State: pkg.StateError,
115-
Summary: "Unable to get manifests",
116-
Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)),
117-
})
118-
return
153+
jsonManifests = append(jsonManifests, someJsonManifests...)
119154
}
120155

121156
// Argo diff logic wants unformatted manifests but everything else wants them as YAML, so we prepare both
122157
yamlManifests := argo_client.ConvertJsonToYamlManifests(jsonManifests)
123-
logger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests)
158+
rootLogger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests)
124159

125160
k8sVersion, err := w.ctr.ArgoClient.GetKubernetesVersionByApplication(ctx, app)
126161
if err != nil {
127-
logger.Error().Err(err).Msg("Error retrieving the Kubernetes version")
162+
rootLogger.Error().Err(err).Msg("Error retrieving the Kubernetes version")
128163
k8sVersion = w.ctr.Config.FallbackK8sVersion
129164
} else {
130165
k8sVersion = fmt.Sprintf("%s.0", k8sVersion)
131-
logger.Info().Msgf("Kubernetes version: %s", k8sVersion)
166+
rootLogger.Info().Msgf("Kubernetes version: %s", k8sVersion)
132167
}
133168

134-
runner := newRunner(
135-
w.ctr, app, repo, appName, k8sVersion, jsonManifests, yamlManifests, logger, w.vcsNote,
136-
w.queueApp, w.removeApp,
137-
)
169+
runner := newRunner(w.ctr, app, appName, k8sVersion, jsonManifests, yamlManifests, rootLogger, w.vcsNote, w.queueApp, w.removeApp)
138170

139171
for _, processor := range w.processors {
140172
runner.Run(ctx, processor.Name, processor.Processor, processor.WorstState)

0 commit comments

Comments
 (0)