Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,55 @@ protected LifecyclePolicy createTestInstance() {
return randomTimeseriesLifecyclePolicy(lifecycleName);
}

/**
* The same as {@link #randomTimeseriesLifecyclePolicy(String)} but ensures
* that the resulting policy has all valid phases and all valid actions.
*/
public static LifecyclePolicy randomTimeseriesLifecyclePolicyWithAllPhases(@Nullable String lifecycleName) {
List<String> phaseNames = TimeseriesLifecycleType.VALID_PHASES;
Map<String, Phase> phases = new HashMap<>(phaseNames.size());
Function<String, Set<String>> validActions = (phase) -> {
switch (phase) {
case "hot":
return TimeseriesLifecycleType.VALID_HOT_ACTIONS;
case "warm":
return TimeseriesLifecycleType.VALID_WARM_ACTIONS;
case "cold":
return TimeseriesLifecycleType.VALID_COLD_ACTIONS;
case "delete":
return TimeseriesLifecycleType.VALID_DELETE_ACTIONS;
default:
throw new IllegalArgumentException("invalid phase [" + phase + "]");
}};
Function<String, LifecycleAction> randomAction = (action) -> {
switch (action) {
case AllocateAction.NAME:
return AllocateActionTests.randomInstance();
case DeleteAction.NAME:
return new DeleteAction();
case ForceMergeAction.NAME:
return ForceMergeActionTests.randomInstance();
case ReadOnlyAction.NAME:
return new ReadOnlyAction();
case RolloverAction.NAME:
return RolloverActionTests.randomInstance();
case ShrinkAction.NAME:
return ShrinkActionTests.randomInstance();
default:
throw new IllegalArgumentException("invalid action [" + action + "]");
}};
for (String phase : phaseNames) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we're not ready for this yet but don't we want a random subset of the phases and actions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this made it easier to pick a random step for the entire policy because it guaranteed that I could pick a random phase that would have a random step. For this particular test it also tests (through randomization) that all phases and steps can be serialized/deserialized to/from JSON in the metadata

TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = new HashMap<>();
Set<String> actionNames = validActions.apply(phase);
for (String action : actionNames) {
actions.put(action, randomAction.apply(action));
}
phases.put(phase, new Phase(phase, after, actions));
}
return new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, lifecycleName, phases);
}

public static LifecyclePolicy randomTimeseriesLifecyclePolicy(@Nullable String lifecycleName) {
List<String> phaseNames = randomSubsetOf(TimeseriesLifecycleType.VALID_PHASES);
Map<String, Phase> phases = new HashMap<>(phaseNames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
// This index doesn't exist any more, there's nothing to execute currently
return currentState;
}
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, index,
Step registeredCurrentStep = IndexLifecycleRunner.getCurrentStep(policyStepsRegistry, policy, indexMetaData,
indexMetaData.getSettings());
if (currentStep.equals(registeredCurrentStep)) {
// We can do cluster state steps all together until we
Expand Down Expand Up @@ -118,7 +118,7 @@ public ClusterState execute(ClusterState currentState) throws IOException {
if (currentStep.getKey().getPhase().equals(currentStep.getNextStepKey().getPhase()) == false) {
return currentState;
}
currentStep = policyStepsRegistry.getStep(index, currentStep.getNextStepKey());
currentStep = policyStepsRegistry.getStep(indexMetaData, currentStep.getNextStepKey());
}
return currentState;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void runPolicy(String policy, IndexMetaData indexMetaData, ClusterState c
+ LifecycleSettings.LIFECYCLE_SKIP + "== true");
return;
}
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData.getIndex(), indexSettings);
Step currentStep = getCurrentStep(stepRegistry, policy, indexMetaData, indexSettings);
if (currentStep == null) {
// This may happen in the case that there is invalid ilm-step index settings or the stepRegistry is out of
// sync with the current cluster state
Expand Down Expand Up @@ -197,12 +197,12 @@ public static StepKey getCurrentStepKey(Settings indexSettings) {
}
}

static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, Index index, Settings indexSettings) {
static Step getCurrentStep(PolicyStepsRegistry stepRegistry, String policy, IndexMetaData indexMetaData, Settings indexSettings) {
Copy link
Copy Markdown
Contributor

@AthenaEryma AthenaEryma Sep 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to cause a (not very big, I think?) merge conflict with #33783. Not sure which side that should be handled on, but fair warning.

StepKey currentStepKey = getCurrentStepKey(indexSettings);
if (currentStepKey == null) {
return stepRegistry.getFirstStep(policy);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this only needed to onboard an index. Can we move this (as a follow up) to a dedicated place? on boarding is an exceptional case - we should put it in it's own method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah as a followup definitely, we'd need to remove it if we want to remove PolicyStepsRegistry too

} else {
return stepRegistry.getStep(index, currentStepKey);
return stepRegistry.getStep(indexMetaData, currentStepKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ public void clusterChanged(ClusterChangedEvent event) {
public void applyClusterState(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise
// keep idle until elected
// Since indices keep their current phase's details even if the policy changes, it's possible for a deleted index to have a
// policy, and then be re-created with the same name, so here we remove indices that have been delete so they don't waste memory
if (event.indicesDeleted().isEmpty() == false) {
policyRegistry.removeIndices(event.indicesDeleted());
}
if (event.state().metaData().custom(IndexLifecycleMetadata.TYPE) != null) {
policyRegistry.update(event.state());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.indexlifecycle;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
Expand All @@ -19,6 +19,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -53,26 +54,18 @@ public class PolicyStepsRegistry {
private final Map<String, Step> firstStepMap;
// keeps track of a mapping from policy/step-name to respective Step, the key is policy name
private final Map<String, Map<Step.StepKey, Step>> stepMap;
// A map of index to a list of compiled steps for the current phase
private final Map<Index, List<Step>> indexPhaseSteps;
private final NamedXContentRegistry xContentRegistry;

public PolicyStepsRegistry(NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = new TreeMap<>();
this.firstStepMap = new HashMap<>();
this.stepMap = new HashMap<>();
this.indexPhaseSteps = new HashMap<>();
this.xContentRegistry = xContentRegistry;
this.client = client;
this(new TreeMap<>(), new HashMap<>(), new HashMap<>(), xContentRegistry, client);
}

PolicyStepsRegistry(SortedMap<String, LifecyclePolicyMetadata> lifecyclePolicyMap,
Map<String, Step> firstStepMap, Map<String, Map<Step.StepKey, Step>> stepMap,
Map<Index, List<Step>> indexPhaseSteps, NamedXContentRegistry xContentRegistry, Client client) {
NamedXContentRegistry xContentRegistry, Client client) {
this.lifecyclePolicyMap = lifecyclePolicyMap;
this.firstStepMap = firstStepMap;
this.stepMap = stepMap;
this.indexPhaseSteps = indexPhaseSteps;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
Expand All @@ -89,17 +82,6 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
return stepMap;
}

/**
* Remove phase step lists for indices that have been deleted
* @param indices a list of indices that have been deleted
*/
public void removeIndices(List<Index> indices) {
indices.forEach(index -> {
logger.trace("removing cached phase steps for deleted index [{}]", index.getName());
indexPhaseSteps.remove(index);
});
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void update(ClusterState clusterState) {
final IndexLifecycleMetadata meta = clusterState.metaData().custom(IndexLifecycleMetadata.TYPE);
Expand Down Expand Up @@ -146,103 +128,86 @@ public LifecyclePolicyMetadata read(StreamInput in, String key) {
assert ErrorStep.NAME.equals(step.getKey().getName()) == false : "unexpected error step in policy";
stepMapForPolicy.put(step.getKey(), step);
}
logger.trace("updating cached steps for [{}] policy, new steps: {}",
policyMetadata.getName(), stepMapForPolicy.keySet());
stepMap.put(policyMetadata.getName(), stepMapForPolicy);
}
}
}
}

for (ObjectCursor<IndexMetaData> imd : clusterState.metaData().getIndices().values()) {
final Index index = imd.value.getIndex();
final String policy = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
if (policy == null || lifecyclePolicyMap.containsKey(policy) == false) {
indexPhaseSteps.remove(index);
} else {
final List<Step> currentSteps = indexPhaseSteps.get(index);
// Get the current steps' phase, if there are steps stored
final String existingPhase = (currentSteps == null || currentSteps.size() == 0) ?
"_none_" : currentSteps.get(0).getKey().getPhase();
// Retrieve the current phase, defaulting to "new" if no phase is set
final String currentPhase = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE,
InitializePolicyContextStep.INITIALIZATION_PHASE);

if (existingPhase.equals(currentPhase) == false) {
logger.debug("index [{}] has transitioned phases [{} -> {}], rebuilding step list",
index, existingPhase, currentPhase);
// parse existing phase steps from the phase definition in the index settings
String phaseDef = imd.value.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
InitializePolicyContextStep.INITIALIZATION_PHASE);
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
policyToExecute = currentPolicy;
} else {
// if the current phase definition describes an internal step/phase, do not parse
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (IOException e) {
logger.error("failed to configure phase [" + currentPhase + "] for index [" + index.getName() + "]", e);
indexPhaseSteps.remove(index);
continue;
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phaseExecutionInfo.getPhase() != null) {
phaseMap.put(currentPhase, phaseExecutionInfo.getPhase());
}
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
}
LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders());
final List<Step> steps = policyToExecute.toSteps(policyClient);
// Build a list of steps that correspond with the phase the index is currently in
final List<Step> phaseSteps;
if (steps == null) {
phaseSteps = new ArrayList<>();
} else {
phaseSteps = steps.stream()
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.collect(Collectors.toList());
}
indexPhaseSteps.put(index, phaseSteps);
}
private List<Step> parseStepsFromPhase(String policy, String currentPhase, String phaseDef) throws IOException {
final PhaseExecutionInfo phaseExecutionInfo;
LifecyclePolicy currentPolicy = lifecyclePolicyMap.get(policy).getPolicy();
final LifecyclePolicy policyToExecute;
if (InitializePolicyContextStep.INITIALIZATION_PHASE.equals(phaseDef)
|| TerminalPolicyStep.COMPLETED_PHASE.equals(phaseDef)) {
// It is ok to re-use potentially modified policy here since we are in an initialization or completed phase
policyToExecute = currentPolicy;
} else {
// if the current phase definition describes an internal step/phase, do not parse
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
}
Map<String, Phase> phaseMap = new HashMap<>(currentPolicy.getPhases());
if (phaseExecutionInfo.getPhase() != null) {
phaseMap.put(currentPhase, phaseExecutionInfo.getPhase());
}
policyToExecute = new LifecyclePolicy(currentPolicy.getType(), currentPolicy.getName(), phaseMap);
}
LifecyclePolicySecurityClient policyClient = new LifecyclePolicySecurityClient(client,
ClientHelper.INDEX_LIFECYCLE_ORIGIN, lifecyclePolicyMap.get(policy).getHeaders());
final List<Step> steps = policyToExecute.toSteps(policyClient);
// Build a list of steps that correspond with the phase the index is currently in
final List<Step> phaseSteps;
if (steps == null) {
phaseSteps = new ArrayList<>();
} else {
phaseSteps = steps.stream()
.filter(e -> e.getKey().getPhase().equals(currentPhase))
.collect(Collectors.toList());
}
logger.trace("parsed steps for policy [{}] in phase [{}], definition: [{}], steps: [{}]",
policy, currentPhase, phaseDef, phaseSteps);
return phaseSteps;
}

/**
* returns the {@link Step} that matches the index name and
* stepkey specified. This is used by {@link ClusterState}
* readers that know the current policy and step by name
* as String values in the cluster state.
* @param index the index to get the step for
* @param stepKey the key to the requested {@link Step}
* @return the step for the given stepkey or null if the step was not found
*/
@Nullable
public Step getStep(final Index index, final Step.StepKey stepKey) {
public Step getStep(final IndexMetaData indexMetaData, final Step.StepKey stepKey) {
if (ErrorStep.NAME.equals(stepKey.getName())) {
return new ErrorStep(new Step.StepKey(stepKey.getPhase(), stepKey.getAction(), ErrorStep.NAME));
}

if (indexPhaseSteps.get(index) == null) {
return null;
final String phase = stepKey.getPhase();
final String policyName = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_NAME);
final Index index = indexMetaData.getIndex();

if (policyName == null) {
throw new IllegalArgumentException("failed to retrieve step " + stepKey + " as index [" + index.getName() + "] has no policy");
}

if (logger.isTraceEnabled()) {
logger.trace("[{}]: retrieving step [{}], found: [{}]\nall steps for this phase: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null),
indexPhaseSteps.get(index));
} else if (logger.isDebugEnabled()) {
logger.debug("[{}]: retrieving step [{}], found: [{}]", index, stepKey,
indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null));

// parse phase steps from the phase definition in the index settings
final String phaseJson = indexMetaData.getSettings().get(LifecycleSettings.LIFECYCLE_PHASE_DEFINITION,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also need to be updated for #33783, depending on which gets merged first.

InitializePolicyContextStep.INITIALIZATION_PHASE);

final List<Step> phaseSteps;
try {
phaseSteps = parseStepsFromPhase(policyName, phase, phaseJson);
} catch (IOException e) {
throw new ElasticsearchException("failed to load cached steps for " + stepKey, e);
} catch (XContentParseException parseErr) {
throw new XContentParseException(parseErr.getLocation(),
"failed to load cached steps for " + stepKey + " from [" + phaseJson + "]", parseErr);
}
assert indexPhaseSteps.get(index).stream().allMatch(step -> step.getKey().getPhase().equals(stepKey.getPhase())) :
"expected all steps for [" + index + "] to be in phase [" + stepKey.getPhase() +
"] but they were not, steps: " + indexPhaseSteps.get(index);
return indexPhaseSteps.get(index).stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);

assert phaseSteps.stream().allMatch(step -> step.getKey().getPhase().equals(phase)) :
"expected phase steps loaded from phase definition for [" + index.getName() + "] to be in phase [" + phase +
"] but they were not, steps: " + phaseSteps;

// Return the step that matches the given stepKey or else null if we couldn't find it
return phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
}

/**
Expand Down
Loading