Skip to content

Commit f1f1574

Browse files
authored
1148 improve performance of npm package ingester (#1149)
NPM package ingester is more performent. avoid cases where a deadlock can happen.
1 parent 2a6918c commit f1f1574

File tree

4 files changed

+139
-12
lines changed

4 files changed

+139
-12
lines changed

lunatrace/bsl/ingest-worker/cmd/ingestworker/vulnerability/vulnerability.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,17 @@ func NewCommand(p Params) clifx.CommandResult {
101101
Usage: "[file or directory]",
102102
Flags: []cli.Flag{
103103
&cli.StringFlag{
104-
Name: "source",
105-
Usage: "Where the vulnerabilities have been sourced from.",
106-
Required: true,
104+
Name: "source",
105+
Usage: "Where the vulnerabilities have been sourced from.",
107106
},
108107
&cli.StringFlag{
109108
Name: "source-relative-path",
110109
Usage: "Relative path from within the source to where advisories are located.",
111110
},
111+
&cli.StringFlag{
112+
Name: "vuln-id",
113+
Usage: "ID of a vulnerability to make sure all related information is ingested (ex. affected packages).",
114+
},
112115
&cli.BoolFlag{
113116
Name: "ingest-affected",
114117
Usage: "Ensure for every affected package that all metadata is collected.",
@@ -119,9 +122,17 @@ func NewCommand(p Params) clifx.CommandResult {
119122
advisoryLocation := ctx.Args().First()
120123

121124
source := ctx.String("source")
125+
vulnID := ctx.String("vuln-id")
122126
sourceRelativePath := ctx.String("source-relative-path")
123127
ingestAffected := ctx.Bool("ingest-affected")
124128

129+
if vulnID != "" {
130+
log.Info().
131+
Str("vulnerability", vulnID).
132+
Msg("processing vulnerability")
133+
return p.AffectedIngester.Ingest(ctx.Context, vulnID)
134+
}
135+
125136
log.Info().
126137
Str("source", source).
127138
Msg("starting vulnerability ingestion")

lunatrace/bsl/ingest-worker/pkg/metadata/ingester/sql.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,13 @@ func (s *packageSqlIngester) mapMaintainers(ctx context.Context, packageId uuid.
163163
}
164164

165165
func (s *packageSqlIngester) mapMaintainer(ctx context.Context, pm metadata.Maintainer) (uuid.UUID, error) {
166-
return upsertMaintainer(ctx, s.deps.DB, model.Maintainer{
166+
maintainer := model.Maintainer{
167167
PackageManager: mapper.NpmV,
168168
Email: pm.Email,
169169
Name: util.Ptr(pm.Name),
170-
})
170+
}
171+
172+
return upsertMaintainer(ctx, s.deps.DB, maintainer)
171173
}
172174

173175
func (s *packageSqlIngester) Ingest(ctx context.Context, pkg *metadata.PackageMetadata) (string, error) {

lunatrace/bsl/ingest-worker/pkg/metadata/ingester/sqlstms.go

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,26 @@ import (
2222
)
2323

2424
func upsertPackage(ctx context.Context, db *sql.DB, p model.Package) (id uuid.UUID, err error) {
25+
selectReleaseDependencyPackage := Package.SELECT(
26+
Package.Name,
27+
Package.PackageManager,
28+
Package.CustomRegistry,
29+
Package.Description,
30+
).WHERE(
31+
Package.Name.EQ(postgres.String(p.Name)).
32+
AND(Package.PackageManager.EQ(postgres.NewEnumValue(string(p.PackageManager)))).
33+
AND(Package.CustomRegistry.EQ(postgres.String(p.CustomRegistry))),
34+
)
35+
36+
var releaseDependencyPackage model.Package
37+
err = selectReleaseDependencyPackage.QueryContext(ctx, db, &releaseDependencyPackage)
38+
if err == nil {
39+
// If the release dependency package already exists, we don't need to do anything
40+
if releaseDependencyPackage.Description == p.Description {
41+
return releaseDependencyPackage.ID, nil
42+
}
43+
}
44+
2545
packageInsert := Package.INSERT(
2646
Package.Name,
2747
Package.PackageManager,
@@ -48,6 +68,31 @@ func upsertPackage(ctx context.Context, db *sql.DB, p model.Package) (id uuid.UU
4868
}
4969

5070
func upsertRelease(ctx context.Context, db *sql.DB, r model.Release) (id uuid.UUID, err error) {
71+
selectRelease := Release.SELECT(
72+
Release.AllColumns,
73+
).WHERE(
74+
Release.PackageID.EQ(postgres.UUID(r.PackageID)).
75+
AND(Release.Version.EQ(postgres.String(r.Version))),
76+
)
77+
78+
var release model.Release
79+
err = selectRelease.QueryContext(ctx, db, &release)
80+
if err == nil {
81+
// TODO (cthompson) not easy to compare jsonb, so we're skipping this for now
82+
// release.UpstreamData == r.UpstreamData &&
83+
84+
// If the release already exists, we don't need to do anything
85+
if release.PublishingMaintainerID == r.PublishingMaintainerID &&
86+
release.ReleaseTime == r.ReleaseTime &&
87+
release.BlobHash == r.BlobHash &&
88+
release.UpstreamBlobURL == r.UpstreamBlobURL {
89+
90+
// TODO (cthompson) update FetchedTime
91+
92+
return release.ID, nil
93+
}
94+
}
95+
5196
releaseInsert := Release.INSERT(
5297
Release.PackageID,
5398
Release.PublishingMaintainerID,
@@ -82,6 +127,23 @@ func upsertRelease(ctx context.Context, db *sql.DB, r model.Release) (id uuid.UU
82127
}
83128

84129
func upsertReleaseDependencyPackage(ctx context.Context, db *sql.DB, p model.Package) (id uuid.UUID, err error) {
130+
selectReleaseDependencyPackage := Package.SELECT(
131+
Package.Name,
132+
Package.PackageManager,
133+
Package.CustomRegistry,
134+
).WHERE(
135+
Package.Name.EQ(postgres.String(p.Name)).
136+
AND(Package.PackageManager.EQ(postgres.NewEnumValue(string(p.PackageManager)))).
137+
AND(Package.CustomRegistry.EQ(postgres.String(p.CustomRegistry))),
138+
)
139+
140+
var releaseDependencyPackage model.Package
141+
err = selectReleaseDependencyPackage.QueryContext(ctx, db, &releaseDependencyPackage)
142+
if err == nil {
143+
// If the release dependency package already exists, we don't need to do anything
144+
return releaseDependencyPackage.ID, nil
145+
}
146+
85147
insertPackage := Package.INSERT(
86148
Package.Name,
87149
Package.PackageManager,
@@ -105,6 +167,26 @@ func upsertReleaseDependencyPackage(ctx context.Context, db *sql.DB, p model.Pac
105167
}
106168

107169
func upsertReleaseDependency(ctx context.Context, db *sql.DB, r model.ReleaseDependency) (id uuid.UUID, err error) {
170+
selectReleaseDependency := ReleaseDependency.SELECT(
171+
ReleaseDependency.ID,
172+
ReleaseDependency.ReleaseID,
173+
ReleaseDependency.IsDev,
174+
).WHERE(
175+
ReleaseDependency.ReleaseID.EQ(postgres.UUID(r.ReleaseID)).
176+
AND(ReleaseDependency.PackageName.EQ(postgres.String(r.PackageName))).
177+
AND(ReleaseDependency.PackageVersionQuery.EQ(postgres.String(r.PackageVersionQuery))),
178+
)
179+
180+
var releaseDependency model.ReleaseDependency
181+
err = selectReleaseDependency.QueryContext(ctx, db, &releaseDependency)
182+
if err == nil {
183+
// If the release dependency already exists, we don't need to do anything
184+
if releaseDependency.ReleaseID == r.ReleaseID &&
185+
releaseDependency.IsDev == r.IsDev {
186+
return releaseDependency.ID, nil
187+
}
188+
}
189+
108190
insertReleaseDependency := ReleaseDependency.INSERT(
109191
ReleaseDependency.ReleaseID,
110192
ReleaseDependency.DependencyPackageID,
@@ -131,18 +213,52 @@ func upsertReleaseDependency(ctx context.Context, db *sql.DB, r model.ReleaseDep
131213
}
132214

133215
func upsertPackageMaintainer(ctx context.Context, db *sql.DB, p model.PackageMaintainer) error {
216+
selectPackageMaintainer := PackageMaintainer.SELECT(
217+
PackageMaintainer.AllColumns,
218+
).WHERE(
219+
PackageMaintainer.PackageID.EQ(postgres.UUID(p.PackageID)).
220+
AND(PackageMaintainer.MaintainerID.EQ(postgres.UUID(p.MaintainerID))),
221+
)
222+
223+
var packageMaintainer model.PackageMaintainer
224+
err := selectPackageMaintainer.QueryContext(ctx, db, &packageMaintainer)
225+
if err == nil {
226+
// If the package maintainer already exists, we don't need to do anything
227+
if packageMaintainer.PackageID == p.PackageID &&
228+
packageMaintainer.MaintainerID == p.MaintainerID {
229+
return nil
230+
}
231+
}
232+
134233
insertPackageMaintainer := PackageMaintainer.INSERT(
135234
PackageMaintainer.PackageID,
136235
PackageMaintainer.MaintainerID,
137236
).MODEL(p).
138237
ON_CONFLICT().
139238
ON_CONSTRAINT("package_maintainer_package_id_maintainer_id_idx").
140239
DO_NOTHING()
141-
_, err := insertPackageMaintainer.ExecContext(ctx, db)
240+
_, err = insertPackageMaintainer.ExecContext(ctx, db)
142241
return err
143242
}
144243

145244
func upsertMaintainer(ctx context.Context, db *sql.DB, m model.Maintainer) (id uuid.UUID, err error) {
245+
selectMaintainer := Maintainer.SELECT(
246+
Maintainer.AllColumns,
247+
).WHERE(
248+
Maintainer.Email.EQ(postgres.String(m.Email)),
249+
)
250+
251+
var maintainer model.Maintainer
252+
err = selectMaintainer.QueryContext(ctx, db, &maintainer)
253+
if err == nil {
254+
// If the maintainer already exists, we don't need to do anything
255+
if maintainer.Email == m.Email &&
256+
maintainer.Name == m.Name &&
257+
maintainer.PackageManager == m.PackageManager {
258+
return maintainer.ID, nil
259+
}
260+
}
261+
146262
insertMaintainer := Maintainer.INSERT(
147263
Maintainer.Email,
148264
Maintainer.Name,

lunatrace/bsl/ingest-worker/pkg/metadata/replicator/npm/api.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ func (s *npmAPIReplicator) ReplicatePackages(packages []string, resolvePackage b
172172
table.PackageDownloadCount.Downloads,
173173
table.PackageDownloadCount.PackageID,
174174
).MODELS(packageDownloadCounts).
175-
ON_CONFLICT(table.PackageDownloadCount.Name, table.PackageDownloadCount.Day).
175+
ON_CONFLICT().
176+
ON_CONSTRAINT("package_download_count_name_day_key").
176177
DO_UPDATE(
177178
postgres.SET(
178179
table.PackageDownloadCount.Downloads.SET(
@@ -278,11 +279,8 @@ func (s *npmAPIReplicator) ReplicateVersionDownloadCounts(packageName string, re
278279
table.PackageVersionDownloadCount.Downloads,
279280
table.PackageVersionDownloadCount.Day,
280281
).MODELS(models).
281-
ON_CONFLICT(
282-
table.PackageVersionDownloadCount.Name,
283-
table.PackageVersionDownloadCount.Version,
284-
table.PackageVersionDownloadCount.Day,
285-
).
282+
ON_CONFLICT().
283+
ON_CONSTRAINT("package_version_download_count_name_version_day_key").
286284
DO_UPDATE(
287285
postgres.SET(
288286
table.PackageVersionDownloadCount.Downloads.SET(

0 commit comments

Comments
 (0)