Skip to content

Commit 2381e5d

Browse files
authored
Correctly handling download_database_on_pipeline_creation within a pipeline processor within a default or final pipeline (#131236)
1 parent c1c7218 commit 2381e5d

File tree

3 files changed

+252
-14
lines changed

3 files changed

+252
-14
lines changed

docs/changelog/131236.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131236
2+
summary: Correctly handling `download_database_on_pipeline_creation` within a pipeline
3+
processor within a default or final pipeline
4+
area: Ingest Node
5+
type: bug
6+
issues: []

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 151 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.ClusterChangedEvent;
2222
import org.elasticsearch.cluster.ClusterStateListener;
2323
import org.elasticsearch.cluster.metadata.IndexAbstraction;
24+
import org.elasticsearch.cluster.metadata.IndexMetadata;
2425
import org.elasticsearch.cluster.metadata.ProjectId;
2526
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2627
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -47,6 +48,7 @@
4748
import org.elasticsearch.transport.RemoteTransportException;
4849

4950
import java.util.Collections;
51+
import java.util.HashMap;
5052
import java.util.HashSet;
5153
import java.util.List;
5254
import java.util.Map;
@@ -280,11 +282,14 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
280282
return false;
281283
}
282284

283-
return projectMetadata.indices().values().stream().anyMatch(indexMetadata -> {
285+
for (IndexMetadata indexMetadata : projectMetadata.indices().values()) {
284286
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
285287
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
286-
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
287-
});
288+
if (checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline)) {
289+
return true;
290+
}
291+
}
292+
return false;
288293
}
289294

290295
/**
@@ -297,12 +302,26 @@ static boolean hasAtLeastOneGeoipProcessor(ProjectMetadata projectMetadata) {
297302
@SuppressWarnings("unchecked")
298303
private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMetadata, boolean downloadDatabaseOnPipelineCreation) {
299304
List<PipelineConfiguration> configurations = IngestService.getPipelines(projectMetadata);
305+
Map<String, PipelineConfiguration> pipelineConfigById = HashMap.newHashMap(configurations.size());
306+
for (PipelineConfiguration configuration : configurations) {
307+
pipelineConfigById.put(configuration.getId(), configuration);
308+
}
309+
// this map is used to keep track of pipelines that have already been checked
310+
Map<String, Boolean> pipelineHasGeoProcessorById = HashMap.newHashMap(configurations.size());
300311
Set<String> ids = new HashSet<>();
301312
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
302313
for (PipelineConfiguration configuration : configurations) {
303314
List<Map<String, Object>> processors = (List<Map<String, Object>>) configuration.getConfig().get(Pipeline.PROCESSORS_KEY);
304-
if (hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation)) {
305-
ids.add(configuration.getId());
315+
String pipelineName = configuration.getId();
316+
if (pipelineHasGeoProcessorById.containsKey(pipelineName) == false) {
317+
if (hasAtLeastOneGeoipProcessor(
318+
processors,
319+
downloadDatabaseOnPipelineCreation,
320+
pipelineConfigById,
321+
pipelineHasGeoProcessorById
322+
)) {
323+
ids.add(pipelineName);
324+
}
306325
}
307326
}
308327
return Collections.unmodifiableSet(ids);
@@ -312,13 +331,27 @@ private static Set<String> pipelinesWithGeoIpProcessor(ProjectMetadata projectMe
312331
* Check if a list of processor contains at least a geoip processor.
313332
* @param processors List of processors.
314333
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
334+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
335+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
336+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
337+
* out (null).
315338
* @return true if a geoip processor is found in the processor list.
316339
*/
317-
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
340+
private static boolean hasAtLeastOneGeoipProcessor(
341+
List<Map<String, Object>> processors,
342+
boolean downloadDatabaseOnPipelineCreation,
343+
Map<String, PipelineConfiguration> pipelineConfigById,
344+
Map<String, Boolean> pipelineHasGeoProcessorById
345+
) {
318346
if (processors != null) {
319347
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
320348
for (Map<String, Object> processor : processors) {
321-
if (hasAtLeastOneGeoipProcessor(processor, downloadDatabaseOnPipelineCreation)) {
349+
if (hasAtLeastOneGeoipProcessor(
350+
processor,
351+
downloadDatabaseOnPipelineCreation,
352+
pipelineConfigById,
353+
pipelineHasGeoProcessorById
354+
)) {
322355
return true;
323356
}
324357
}
@@ -330,10 +363,19 @@ private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> pro
330363
* Check if a processor config is a geoip processor or contains at least a geoip processor.
331364
* @param processor Processor config.
332365
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
366+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
367+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
368+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
369+
* out (null).
333370
* @return true if a geoip processor is found in the processor list.
334371
*/
335372
@SuppressWarnings("unchecked")
336-
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
373+
private static boolean hasAtLeastOneGeoipProcessor(
374+
Map<String, Object> processor,
375+
boolean downloadDatabaseOnPipelineCreation,
376+
Map<String, PipelineConfiguration> pipelineConfigById,
377+
Map<String, Boolean> pipelineHasGeoProcessorById
378+
) {
337379
if (processor == null) {
338380
return false;
339381
}
@@ -352,27 +394,51 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
352394
}
353395
}
354396

355-
return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
356-
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
397+
return isProcessorWithOnFailureGeoIpProcessor(
398+
processor,
399+
downloadDatabaseOnPipelineCreation,
400+
pipelineConfigById,
401+
pipelineHasGeoProcessorById
402+
)
403+
|| isForeachProcessorWithGeoipProcessor(
404+
processor,
405+
downloadDatabaseOnPipelineCreation,
406+
pipelineConfigById,
407+
pipelineHasGeoProcessorById
408+
)
409+
|| isPipelineProcessorWithGeoIpProcessor(
410+
processor,
411+
downloadDatabaseOnPipelineCreation,
412+
pipelineConfigById,
413+
pipelineHasGeoProcessorById
414+
);
357415
}
358416

359417
/**
360418
* Check if a processor config has an on_failure clause containing at least a geoip processor.
361419
* @param processor Processor config.
362420
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
421+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
422+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
423+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
424+
* out (null).
363425
* @return true if a geoip processor is found in the processor list.
364426
*/
365427
@SuppressWarnings("unchecked")
366428
private static boolean isProcessorWithOnFailureGeoIpProcessor(
367429
Map<String, Object> processor,
368-
boolean downloadDatabaseOnPipelineCreation
430+
boolean downloadDatabaseOnPipelineCreation,
431+
Map<String, PipelineConfiguration> pipelineConfigById,
432+
Map<String, Boolean> pipelineHasGeoProcessorById
369433
) {
370434
// note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
371435
for (Object value : processor.values()) {
372436
if (value instanceof Map
373437
&& hasAtLeastOneGeoipProcessor(
374438
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
375-
downloadDatabaseOnPipelineCreation
439+
downloadDatabaseOnPipelineCreation,
440+
pipelineConfigById,
441+
pipelineHasGeoProcessorById
376442
)) {
377443
return true;
378444
}
@@ -384,13 +450,84 @@ && hasAtLeastOneGeoipProcessor(
384450
* Check if a processor is a foreach processor containing at least a geoip processor.
385451
* @param processor Processor config.
386452
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
453+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
454+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
455+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
456+
* out (null).
387457
* @return true if a geoip processor is found in the processor list.
388458
*/
389459
@SuppressWarnings("unchecked")
390-
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
460+
private static boolean isForeachProcessorWithGeoipProcessor(
461+
Map<String, Object> processor,
462+
boolean downloadDatabaseOnPipelineCreation,
463+
Map<String, PipelineConfiguration> pipelineConfigById,
464+
Map<String, Boolean> pipelineHasGeoProcessorById
465+
) {
391466
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
392467
return processorConfig != null
393-
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
468+
&& hasAtLeastOneGeoipProcessor(
469+
(Map<String, Object>) processorConfig.get("processor"),
470+
downloadDatabaseOnPipelineCreation,
471+
pipelineConfigById,
472+
pipelineHasGeoProcessorById
473+
);
474+
}
475+
476+
/**
477+
* Check if a processor is a pipeline processor containing at least a geoip processor. This method also updates
478+
* pipelineHasGeoProcessorById with a result for any pipelines it looks at.
479+
* @param processor Processor config.
480+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
481+
* @param pipelineConfigById A Map of pipeline id to PipelineConfiguration
482+
* @param pipelineHasGeoProcessorById A Map of pipeline id to Boolean, indicating whether the pipeline references a geoip processor
483+
* (true), does not reference a geoip processor (false), or we are currently trying to figure that
484+
* out (null).
485+
* @return true if a geoip processor is found in the processors of this processor if this processor is a pipeline processor.
486+
*/
487+
@SuppressWarnings("unchecked")
488+
private static boolean isPipelineProcessorWithGeoIpProcessor(
489+
Map<String, Object> processor,
490+
boolean downloadDatabaseOnPipelineCreation,
491+
Map<String, PipelineConfiguration> pipelineConfigById,
492+
Map<String, Boolean> pipelineHasGeoProcessorById
493+
) {
494+
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("pipeline");
495+
if (processorConfig != null) {
496+
String pipelineName = (String) processorConfig.get("name");
497+
if (pipelineName != null) {
498+
if (pipelineHasGeoProcessorById.containsKey(pipelineName)) {
499+
if (pipelineHasGeoProcessorById.get(pipelineName) == null) {
500+
/*
501+
* If the value is null here, it indicates that this method has been called recursively with the same pipeline name.
502+
* This will cause a runtime error when the pipeline is executed, but we're avoiding changing existing behavior at
503+
* server startup time. Instead, we just bail out as quickly as possible. It is possible that this could lead to a
504+
* geo database not being downloaded for the pipeline, but it doesn't really matter since the pipeline was going to
505+
* fail anyway.
506+
*/
507+
pipelineHasGeoProcessorById.put(pipelineName, false);
508+
}
509+
} else {
510+
List<Map<String, Object>> childProcessors = null;
511+
PipelineConfiguration config = pipelineConfigById.get(pipelineName);
512+
if (config != null) {
513+
childProcessors = (List<Map<String, Object>>) config.getConfig().get(Pipeline.PROCESSORS_KEY);
514+
}
515+
// We initialize this to null so that we know it's in progress and can use it to avoid stack overflow errors:
516+
pipelineHasGeoProcessorById.put(pipelineName, null);
517+
pipelineHasGeoProcessorById.put(
518+
pipelineName,
519+
hasAtLeastOneGeoipProcessor(
520+
childProcessors,
521+
downloadDatabaseOnPipelineCreation,
522+
pipelineConfigById,
523+
pipelineHasGeoProcessorById
524+
)
525+
);
526+
}
527+
return pipelineHasGeoProcessorById.get(pipelineName);
528+
}
529+
}
530+
return false;
394531
}
395532

396533
// starts GeoIP downloader task for a single project

0 commit comments

Comments
 (0)