-
Notifications
You must be signed in to change notification settings - Fork 25.9k
Rebuild step on PolicyStepsRegistry.getStep #33780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,7 +87,7 @@ public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState c | |
| + LifecycleSettings.LIFECYCLE_SKIP + "== true"); | ||
| return; | ||
| } | ||
| Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData.getIndex(), indexSettings); | ||
| Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, indexSettings); | ||
| if (currentStep == null) { | ||
| // This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of | ||
| // sync with the current cluster state | ||
|
|
@@ -197,12 +197,12 @@ public static StepKey getCurrentStepKey(Settings indexSettings) { | |
| } | ||
| } | ||
|
|
||
| static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Index index, Settings indexSettings) { | ||
| static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData, Settings indexSettings) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to cause a (not very big, I think?) merge conflict with #33783. Not sure which side that should be handled on, but fair warning. |
||
| StepKey currentStepKey = getCurrentStepKey(indexSettings); | ||
| if (currentStepKey == null) { | ||
| return stepRegistry.getFirstStep(policy); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I can tell, this only needed to onboard an index. Can we move this (as a follow up) to a dedicated place? on boarding is an exceptional case - we should put it in it's own method.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah as a followup definitely, we'd need to remove it if we want to remove |
||
| } else { | ||
| return stepRegistry.getStep(index, currentStepKey); | ||
| return stepRegistry.getStep(indexMetaData, currentStepKey); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,9 +5,9 @@ | |
| */ | ||
| package org.elasticsearch.xpack.indexlifecycle; | ||
|
|
||
| import com.carrotsearch.hppc.cursors.ObjectCursor; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.ElasticsearchException; | ||
| import org.elasticsearch.client.Client; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.Diff; | ||
|
|
@@ -19,6 +19,7 @@ | |
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.xcontent.DeprecationHandler; | ||
| import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||
| import org.elasticsearch.common.xcontent.XContentParseException; | ||
| import org.elasticsearch.common.xcontent.XContentParser; | ||
| import org.elasticsearch.common.xcontent.json.JsonXContent; | ||
| import org.elasticsearch.index.Index; | ||
|
|
@@ -53,26 +54,18 @@ public class PolicyStepsRegistry { | |
| private final Map<String, Step> firstStepMap; | ||
| // keeps track of a mapping from policy/step-name to respective Step, the key is policy name | ||
| private final Map<String, Map<Step.StepKey, Step>> stepMap; | ||
| // A map of index to a list of compiled steps for the current phase | ||
| private final Map<Index, List<Step>> indexPhaseSteps; | ||
| private final NamedXContentRegistry xContentRegistry; | ||
|
|
||
| public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client) { | ||
| this.lifecyclePolicyMap = new TreeMap<>(); | ||
| this.firstStepMap = new HashMap<>(); | ||
| this.stepMap = new HashMap<>(); | ||
| this.indexPhaseSteps = new HashMap<>(); | ||
| this.xContentRegistry = xContentRegistry; | ||
| this.client = client; | ||
| this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client); | ||
| } | ||
|
|
||
| PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap, | ||
| Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap, | ||
| Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry, Client client) { | ||
| NamedXContentRegistry xContentRegistry, Client client) { | ||
| this.lifecyclePolicyMap = lifecyclePolicyMap; | ||
| this.firstStepMap = firstStepMap; | ||
| this.stepMap = stepMap; | ||
| this.indexPhaseSteps = indexPhaseSteps; | ||
| this.xContentRegistry = xContentRegistry; | ||
| this.client = client; | ||
| } | ||
|
|
@@ -89,17 +82,6 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() { | |
| return stepMap; | ||
| } | ||
|
|
||
| /** | ||
| * Remove phase step lists for indices that have been deleted | ||
| * @param indices a list of indices that have been deleted | ||
| */ | ||
| public void removeIndices(List<Index> indices) { | ||
| indices.forEach(index -> { | ||
| logger.trace("removing cached phase steps for deleted index [{}]", index.getName()); | ||
| indexPhaseSteps.remove(index); | ||
| }); | ||
| } | ||
|
|
||
| @SuppressWarnings({ "unchecked", "rawtypes" }) | ||
| public void update(ClusterState clusterState) { | ||
| final IndexLifecycleMetadata meta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE); | ||
|
|
@@ -146,103 +128,86 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) { | |
| assert ErrorStep.NAME.equals(step.getKey().getName()) == false : "unexpected error step in policy"; | ||
| stepMapForPolicy.put(step.getKey(), step); | ||
| } | ||
| logger.trace("updating cached steps for [{}] policy, new steps: {}", | ||
| policyMetadata.getName(), stepMapForPolicy.keySet()); | ||
| stepMap.put(policyMetadata.getName(), stepMapForPolicy); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) { | ||
| final Index index = imd.value.getIndex(); | ||
| final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME); | ||
| if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) { | ||
| indexPhaseSteps.remove(index); | ||
| } else { | ||
| final List<Step> currentSteps = indexPhaseSteps.get(index); | ||
| // Get the current steps' phase, if there are steps stored | ||
| final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ? | ||
| "_none_" : currentSteps.get(0).getKey().getPhase(); | ||
| // Retrieve the current phase, defaulting to "new" if no phase is set | ||
| final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE, | ||
| InitializePolicyContextStep.INITIALIZATION_PHASE); | ||
|
|
||
| if (existingPhase.equals(currentPhase) == false) { | ||
| logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list", | ||
| index, existingPhase, currentPhase); | ||
| // parse existing phase steps from the phase definition in the index settings | ||
| String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, | ||
| InitializePolicyContextStep.INITIALIZATION_PHASE); | ||
| final PhaseExecutionInfo phaseExecutionInfo; | ||
| LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy(); | ||
| final LifecyclePolicy policyToExecute; | ||
| if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef) | ||
| || TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) { | ||
| // It is ok to re-use potentially modified policy here since we are in an initialization or completed phase | ||
| policyToExecute = currentPolicy; | ||
| } else { | ||
| // if the current phase definition describes an internal step/phase, do not parse | ||
| try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry, | ||
| DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) { | ||
| phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase); | ||
| } catch (IOException e) { | ||
| logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e); | ||
| indexPhaseSteps.remove(index); | ||
| continue; | ||
| } | ||
| Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases()); | ||
| if (phaseExecutionInfo.getPhase() != null) { | ||
| phaseMap.put(currentPhase, phaseExecutionInfo.getPhase()); | ||
| } | ||
| policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap); | ||
| } | ||
| LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client, | ||
| ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders()); | ||
| final List<Step> steps = policyToExecute.toSteps(policyClient); | ||
| // Build a list of steps that correspond with the phase the index is currently in | ||
| final List<Step> phaseSteps; | ||
| if (steps == null) { | ||
| phaseSteps = new ArrayList<>(); | ||
| } else { | ||
| phaseSteps = steps.stream() | ||
| .filter(e -> e.getKey().getPhase().equals(currentPhase)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| indexPhaseSteps.put(index, phaseSteps); | ||
| } | ||
| private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException { | ||
| final PhaseExecutionInfo phaseExecutionInfo; | ||
| LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy(); | ||
| final LifecyclePolicy policyToExecute; | ||
| if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef) | ||
| || TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) { | ||
| // It is ok to re-use potentially modified policy here since we are in an initialization or completed phase | ||
| policyToExecute = currentPolicy; | ||
| } else { | ||
| // if the current phase definition describes an internal step/phase, do not parse | ||
| try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry, | ||
| DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) { | ||
| phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase); | ||
| } | ||
| Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases()); | ||
| if (phaseExecutionInfo.getPhase() != null) { | ||
| phaseMap.put(currentPhase, phaseExecutionInfo.getPhase()); | ||
| } | ||
| policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap); | ||
| } | ||
| LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client, | ||
| ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders()); | ||
| final List<Step> steps = policyToExecute.toSteps(policyClient); | ||
| // Build a list of steps that correspond with the phase the index is currently in | ||
| final List<Step> phaseSteps; | ||
| if (steps == null) { | ||
| phaseSteps = new ArrayList<>(); | ||
| } else { | ||
| phaseSteps = steps.stream() | ||
| .filter(e -> e.getKey().getPhase().equals(currentPhase)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| logger.trace("parsed steps for policy [{}] in phase [{}], definition: [{}], steps: [{}]", | ||
| policy, currentPhase, phaseDef, phaseSteps); | ||
| return phaseSteps; | ||
| } | ||
|
|
||
| /** | ||
| * returns the {@link Step} that matches the index name and | ||
| * stepkey specified. This is used by {@link ClusterState} | ||
| * readers that know the current policy and step by name | ||
| * as String values in the cluster state. | ||
| * @param index the index to get the step for | ||
| * @param stepKey the key to the requested {@link Step} | ||
| * @return the step for the given stepkey or null if the step was not found | ||
| */ | ||
| @Nullable | ||
| public Step getStep(final Index index, final Step.StepKey stepKey) { | ||
| public Step getStep(final IndexMetaData indexMetaData, final Step.StepKey stepKey) { | ||
| if (ErrorStep.NAME.equals(stepKey.getName())) { | ||
| return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME)); | ||
| } | ||
|
|
||
| if (indexPhaseSteps.get(index) == null) { | ||
| return null; | ||
| final String phase = stepKey.getPhase(); | ||
| final String policyName = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_NAME); | ||
| final Index index = indexMetaData.getIndex(); | ||
|
|
||
| if (policyName == null) { | ||
| throw new IllegalArgumentException("failed to retrieve step " + stepKey + " as index [" + index.getName() + "] has no policy"); | ||
| } | ||
|
|
||
| if (logger.isTraceEnabled()) { | ||
| logger.trace("[{}]: retrieving step [{}], found: [{}]\nall steps for this phase: [{}]", index, stepKey, | ||
| indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null), | ||
| indexPhaseSteps.get(index)); | ||
| } else if (logger.isDebugEnabled()) { | ||
| logger.debug("[{}]: retrieving step [{}], found: [{}]", index, stepKey, | ||
| indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null)); | ||
|
|
||
| // parse phase steps from the phase definition in the index settings | ||
| final String phaseJson = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will also need to be updated for #33783, depending on which gets merged first. |
||
| InitializePolicyContextStep.INITIALIZATION_PHASE); | ||
|
|
||
| final List<Step> phaseSteps; | ||
| try { | ||
| phaseSteps = parseStepsFromPhase(policyName, phase, phaseJson); | ||
| } catch (IOException e) { | ||
| throw new ElasticsearchException("failed to load cached steps for " + stepKey, e); | ||
| } catch (XContentParseException parseErr) { | ||
| throw new XContentParseException(parseErr.getLocation(), | ||
| "failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr); | ||
| } | ||
| assert indexPhaseSteps.get(index).stream().allMatch(step -> step.getKey().getPhase().equals(stepKey.getPhase())) : | ||
| "expected all steps for [" + index + "] to be in phase [" + stepKey.getPhase() + | ||
| "] but they were not, steps: " + indexPhaseSteps.get(index); | ||
| return indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); | ||
|
|
||
| assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) : | ||
| "expected phase steps loaded from phase definition for [" + index.getName() + "] to be in phase [" + phase + | ||
| "] but they were not, steps: " + phaseSteps; | ||
|
|
||
| // Return the step that matches the given stepKey or else null if we couldn't find it | ||
| return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we're not ready for this yet but don't we want a random subset of the phases and actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now this made it easier to pick a random step for the entire policy because it guaranteed that I could pick a random phase that would have a random step. For this particular test it also tests (through randomization) that all phases and steps can be serialized/deserialized to/from JSON in the metadata