Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.Objects;

public class LifecyclePolicyMetadata extends AbstractDiffable<LifecyclePolicyMetadata>
implements ToXContentObject, Diffable<LifecyclePolicyMetadata> {

public static final ParseField POLICY = new ParseField("policy");
public static final ParseField HEADERS = new ParseField("headers");
static final ParseField POLICY = new ParseField("policy");
static final ParseField HEADERS = new ParseField("headers");
static final ParseField VERSION = new ParseField("version");
static final ParseField MODIFIED_DATE = new ParseField("modified_date");
static final ParseField MODIFIED_DATE_STRING = new ParseField("modified_date_string");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<LifecyclePolicyMetadata, String> PARSER = new ConstructingObjectParser<>("policy_metadata",
a -> {
LifecyclePolicy policy = (LifecyclePolicy) a[0];
return new LifecyclePolicyMetadata(policy, (Map<String, String>) a[1]);
return new LifecyclePolicyMetadata(policy, (Map<String, String>) a[1], (long) a[2], (long) a[3]);
});
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), LifecyclePolicy::parse, POLICY);
PARSER.declareField(ConstructingObjectParser.constructorArg(), XContentParser::mapStrings, HEADERS, ValueType.OBJECT);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), MODIFIED_DATE);
PARSER.declareString(ConstructingObjectParser.constructorArg(), MODIFIED_DATE_STRING);
}

public static LifecyclePolicyMetadata parse(XContentParser parser, String name) {
Expand All @@ -43,16 +53,22 @@ public static LifecyclePolicyMetadata parse(XContentParser parser, String name)

private final LifecyclePolicy policy;
private final Map<String, String> headers;
private final long version;
private final long modifiedDate;

public LifecyclePolicyMetadata(LifecyclePolicy policy, Map<String, String> headers) {
public LifecyclePolicyMetadata(LifecyclePolicy policy, Map<String, String> headers, long version, long modifiedDate) {
this.policy = policy;
this.headers = headers;
this.version = version;
this.modifiedDate = modifiedDate;
}

@SuppressWarnings("unchecked")
public LifecyclePolicyMetadata(StreamInput in) throws IOException {
this.policy = new LifecyclePolicy(in);
this.headers = (Map<String, String>) in.readGenericValue();
this.version = in.readVLong();
this.modifiedDate = in.readVLong();
}

public Map<String, String> getHeaders() {
Expand All @@ -67,11 +83,27 @@ public String getName() {
return policy.getName();
}

public long getVersion() {
return version;
}

public long getModifiedDate() {
return modifiedDate;
}

public String getModifiedDateString() {
ZonedDateTime modifiedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(modifiedDate), ZoneOffset.UTC);
return modifiedDateTime.toString();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(POLICY.getPreferredName(), policy);
builder.field(HEADERS.getPreferredName(), headers);
builder.field(VERSION.getPreferredName(), version);
builder.field(MODIFIED_DATE.getPreferredName(), modifiedDate);
builder.field(MODIFIED_DATE_STRING.getPreferredName(), getModifiedDateString());
builder.endObject();
return builder;
}
Expand All @@ -80,11 +112,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
public void writeTo(StreamOutput out) throws IOException {
policy.writeTo(out);
out.writeGenericValue(headers);
out.writeVLong(version);
out.writeVLong(modifiedDate);
}

@Override
public int hashCode() {
return Objects.hash(policy, headers);
return Objects.hash(policy, headers, version, modifiedDate);
}

@Override
Expand All @@ -97,7 +131,9 @@ public boolean equals(Object obj) {
}
LifecyclePolicyMetadata other = (LifecyclePolicyMetadata) obj;
return Objects.equals(policy, other.policy) &&
Objects.equals(headers, other.headers);
Objects.equals(headers, other.headers) &&
Objects.equals(version, other.version) &&
Objects.equals(modifiedDate, other.modifiedDate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
Expand All @@ -37,32 +38,36 @@ public Response newResponse() {

public static class Response extends ActionResponse implements ToXContentObject {

private List<LifecyclePolicy> policies;
private List<LifecyclePolicyResponseItem> policies;

public Response() {
}

public Response(List<LifecyclePolicy> policies) {
public Response(List<LifecyclePolicyResponseItem> policies) {
this.policies = policies;
}

public List<LifecyclePolicy> getPolicies() {
public List<LifecyclePolicyResponseItem> getPolicies() {
return policies;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
for (LifecyclePolicy policy : policies) {
builder.field(policy.getName(), policy);
for (LifecyclePolicyResponseItem item : policies) {
builder.startObject(item.getLifecyclePolicy().getName());
builder.field("version", item.getVersion());
builder.field("modified_date", item.getModifiedDate());
builder.field("policy", item.getLifecyclePolicy());
builder.endObject();
}
builder.endObject();
return builder;
}

@Override
public void readFrom(StreamInput in) throws IOException {
policies = in.readList(LifecyclePolicy::new);
this.policies = in.readList(LifecyclePolicyResponseItem::new);
}

@Override
Expand Down Expand Up @@ -148,4 +153,61 @@ public boolean equals(Object obj) {

}

public static class LifecyclePolicyResponseItem implements Writeable {
private final LifecyclePolicy lifecyclePolicy;
private final long version;
private final String modifiedDate;

public LifecyclePolicyResponseItem(LifecyclePolicy lifecyclePolicy, long version, String modifiedDate) {
this.lifecyclePolicy = lifecyclePolicy;
this.version = version;
this.modifiedDate = modifiedDate;
}

LifecyclePolicyResponseItem(StreamInput in) throws IOException {
this.lifecyclePolicy = new LifecyclePolicy(in);
this.version = in.readVLong();
this.modifiedDate = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
lifecyclePolicy.writeTo(out);
out.writeVLong(version);
out.writeString(modifiedDate);
}

public LifecyclePolicy getLifecyclePolicy() {
return lifecyclePolicy;
}

public long getVersion() {
return version;
}

public String getModifiedDate() {
return modifiedDate;
}

@Override
public int hashCode() {
return Objects.hash(lifecyclePolicy, version, modifiedDate);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
LifecyclePolicyResponseItem other = (LifecyclePolicyResponseItem) obj;
return Objects.equals(lifecyclePolicy, other.lifecyclePolicy) &&
Objects.equals(version, other.version) &&
Objects.equals(modifiedDate, other.modifiedDate);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ protected LifecyclePolicyMetadata createTestInstance() {
for (int i = 0; i < numberHeaders; i++) {
headers.put(randomAlphaOfLength(10), randomAlphaOfLength(10));
}
return new LifecyclePolicyMetadata(LifecyclePolicyTests.randomTimeseriesLifecyclePolicy(lifecycleName), headers);
return new LifecyclePolicyMetadata(LifecyclePolicyTests.randomTimeseriesLifecyclePolicy(lifecycleName), headers,
randomNonNegativeLong(), randomNonNegativeLong());
}

@Override
Expand All @@ -87,7 +88,9 @@ protected Reader<LifecyclePolicyMetadata> instanceReader() {
protected LifecyclePolicyMetadata mutateInstance(LifecyclePolicyMetadata instance) throws IOException {
LifecyclePolicy policy = instance.getPolicy();
Map<String, String> headers = instance.getHeaders();
switch (between(0, 1)) {
long version = instance.getVersion();
long creationDate = instance.getModifiedDate();
switch (between(0, 3)) {
case 0:
policy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, policy.getName() + randomAlphaOfLengthBetween(1, 5),
policy.getPhases());
Expand All @@ -96,10 +99,16 @@ protected LifecyclePolicyMetadata mutateInstance(LifecyclePolicyMetadata instanc
headers = new HashMap<>(headers);
headers.put(randomAlphaOfLength(11), randomAlphaOfLength(11));
break;
case 2:
version++;
break;
case 3:
creationDate++;
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new LifecyclePolicyMetadata(policy, headers);
return new LifecyclePolicyMetadata(policy, headers, version, creationDate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
import org.elasticsearch.xpack.core.indexlifecycle.TestLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.LifecyclePolicyResponseItem;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Response;

import java.util.ArrayList;
Expand All @@ -25,11 +25,12 @@ public class GetLifecycleResponseTests extends AbstractStreamableTestCase<GetLif
@Override
protected Response createTestInstance() {
String randomPrefix = randomAlphaOfLength(5);
List<LifecyclePolicy> policies = new ArrayList<>();
List<LifecyclePolicyResponseItem> responseItems = new ArrayList<>();
for (int i = 0; i < randomIntBetween(0, 2); i++) {
policies.add(randomTestLifecyclePolicy(randomPrefix + i));
responseItems.add(new LifecyclePolicyResponseItem(randomTestLifecyclePolicy(randomPrefix + i),
randomNonNegativeLong(), randomAlphaOfLength(8)));
}
return new Response(policies);
return new Response(responseItems);
}

@Override
Expand All @@ -45,16 +46,18 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {

@Override
protected Response mutateInstance(Response response) {
List<LifecyclePolicy> policies = new ArrayList<>(response.getPolicies());
if (policies.size() > 0) {
List<LifecyclePolicyResponseItem> responseItems = new ArrayList<>(response.getPolicies());
if (responseItems.size() > 0) {
if (randomBoolean()) {
policies.add(randomTestLifecyclePolicy(randomAlphaOfLength(5)));
responseItems.add(new LifecyclePolicyResponseItem(randomTestLifecyclePolicy(randomAlphaOfLength(5)),
randomNonNegativeLong(), randomAlphaOfLength(4)));
} else {
policies.remove(policies.size() - 1);
responseItems.remove(0);
}
} else {
policies.add(randomTestLifecyclePolicy(randomAlphaOfLength(2)));
responseItems.add(new LifecyclePolicyResponseItem(randomTestLifecyclePolicy(randomAlphaOfLength(2)),
randomNonNegativeLong(), randomAlphaOfLength(4)));
}
return new Response(policies);
return new Response(responseItems);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.LifecyclePolicyResponseItem;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Request;
import org.elasticsearch.xpack.core.indexlifecycle.action.GetLifecycleAction.Response;

Expand Down Expand Up @@ -49,24 +50,29 @@ protected Response newResponse() {
}

@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener) {
IndexLifecycleMetadata metadata = clusterService.state().metaData().custom(IndexLifecycleMetadata.TYPE);
if (metadata == null) {
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", Arrays.toString(request.getPolicyNames())));
} else {
List<LifecyclePolicy> requestedPolicies;
List<LifecyclePolicyResponseItem> requestedPolicies;
// if no policies explicitly provided, behave as if `*` was specified
if (request.getPolicyNames().length == 0) {
requestedPolicies = new ArrayList<>(metadata.getPolicies().values());
requestedPolicies = new ArrayList<>(metadata.getPolicyMetadatas().size());
for (LifecyclePolicyMetadata policyMetadata : metadata.getPolicyMetadatas().values()) {
requestedPolicies.add(new LifecyclePolicyResponseItem(policyMetadata.getPolicy(),
policyMetadata.getVersion(), policyMetadata.getModifiedDateString()));
}
} else {
requestedPolicies = new ArrayList<>(request.getPolicyNames().length);
for (String name : request.getPolicyNames()) {
LifecyclePolicy policy = metadata.getPolicies().get(name);
if (policy == null) {
LifecyclePolicyMetadata policyMetadata = metadata.getPolicyMetadatas().get(name);
if (policyMetadata == null) {
listener.onFailure(new ResourceNotFoundException("Lifecycle policy not found: {}", name));
return;
}
requestedPolicies.add(policy);
requestedPolicies.add(new LifecyclePolicyResponseItem(policyMetadata.getPolicy(),
policyMetadata.getVersion(), policyMetadata.getModifiedDateString()));
}
}
listener.onResponse(new Response(requestedPolicies));
Expand All @@ -77,4 +83,4 @@ protected void masterOperation(Request request, ClusterState state, ActionListen
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction.Request;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction.Response;

import java.time.Instant;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -79,9 +80,12 @@ public ClusterState execute(ClusterState currentState) throws Exception {
if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata
currentMetadata = IndexLifecycleMetadata.EMPTY;
}
// NORELEASE Check if current step exists in new policy and if not move to next available step
LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas()
.get(request.getPolicy().getName());
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders);
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(request.getPolicy(), filteredHeaders,
nextVersion, Instant.now().toEpochMilli());
newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, OperationMode.RUNNING);
newState.metaData(MetaData.builder(currentState.getMetaData())
Expand Down
Loading