6262import org .opensearch .common .collect .Tuple ;
6363import org .opensearch .common .metrics .OperationMetrics ;
6464import org .opensearch .common .regex .Regex ;
65+ import org .opensearch .common .settings .Setting ;
6566import org .opensearch .common .settings .Settings ;
6667import org .opensearch .common .unit .TimeValue ;
6768import org .opensearch .common .util .concurrent .AbstractRunnable ;
@@ -107,6 +108,18 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
107108
108109 public static final String INGEST_ORIGIN = "ingest" ;
109110
111+ /**
112+ * Defines the limit for the number of processors which can run on a given document during ingestion.
113+ */
114+ public static final Setting <Integer > MAX_NUMBER_OF_INGEST_PROCESSORS = Setting .intSetting (
115+ "cluster.ingest.max_number_processors" ,
116+ Integer .MAX_VALUE ,
117+ 1 ,
118+ Integer .MAX_VALUE ,
119+ Setting .Property .NodeScope ,
120+ Setting .Property .Dynamic
121+ );
122+
110123 private static final Logger logger = LogManager .getLogger (IngestService .class );
111124
112125 private final ClusterService clusterService ;
@@ -123,6 +136,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
123136 private final ClusterManagerTaskThrottler .ThrottlingKey putPipelineTaskKey ;
124137 private final ClusterManagerTaskThrottler .ThrottlingKey deletePipelineTaskKey ;
125138 private volatile ClusterState state ;
139+ private volatile int maxIngestProcessorCount ;
126140
127141 public IngestService (
128142 ClusterService clusterService ,
@@ -156,6 +170,12 @@ public IngestService(
156170 // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
157171 putPipelineTaskKey = clusterService .registerClusterManagerTask (ClusterManagerTaskKeys .PUT_PIPELINE_KEY , true );
158172 deletePipelineTaskKey = clusterService .registerClusterManagerTask (ClusterManagerTaskKeys .DELETE_PIPELINE_KEY , true );
173+ clusterService .getClusterSettings ().addSettingsUpdateConsumer (MAX_NUMBER_OF_INGEST_PROCESSORS , this ::setMaxIngestProcessorCount );
174+ setMaxIngestProcessorCount (clusterService .getClusterSettings ().get (MAX_NUMBER_OF_INGEST_PROCESSORS ));
175+ }
176+
177+ private void setMaxIngestProcessorCount (Integer maxIngestProcessorCount ) {
178+ this .maxIngestProcessorCount = maxIngestProcessorCount ;
159179 }
160180
161181 private static Map <String , Processor .Factory > processorFactories (List <IngestPlugin > ingestPlugins , Processor .Parameters parameters ) {
@@ -495,7 +515,7 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
495515 Map <String , Object > pipelineConfig = XContentHelper .convertToMap (request .getSource (), false , request .getMediaType ()).v2 ();
496516 Pipeline pipeline = Pipeline .create (request .getId (), pipelineConfig , processorFactories , scriptService );
497517
498- IngestPipelineValidator . validateIngestPipeline (pipeline , clusterService );
518+ validateProcessorCountForIngestPipeline (pipeline );
499519
500520 List <Exception > exceptions = new ArrayList <>();
501521 for (Processor processor : pipeline .flattenAllProcessors ()) {
@@ -510,6 +530,20 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
510530 ExceptionsHelper .rethrowAndSuppress (exceptions );
511531 }
512532
533+ public void validateProcessorCountForIngestPipeline (Pipeline pipeline ) {
534+ List <Processor > processors = pipeline .getCompoundProcessor ().getProcessors ();
535+
536+ if (processors .size () > maxIngestProcessorCount ) {
537+ throw new IllegalStateException (
538+ "Cannot use more than the maximum processors allowed. Number of processors being configured is ["
539+ + processors .size ()
540+ + "] which exceeds the maximum allowed configuration of ["
541+ + maxIngestProcessorCount
542+ + "] processors."
543+ );
544+ }
545+ }
546+
513547 public void executeBulkRequest (
514548 int numberOfActionRequests ,
515549 Iterable <DocWriteRequest <?>> actionRequests ,
@@ -1102,7 +1136,6 @@ void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
11021136 processorFactories ,
11031137 scriptService
11041138 );
1105- IngestPipelineValidator .validateIngestPipeline (newPipeline , clusterService );
11061139 newPipelines .put (newConfiguration .getId (), new PipelineHolder (newConfiguration , newPipeline ));
11071140
11081141 if (previous == null ) {
0 commit comments