Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
import static org.elasticsearch.client.RequestConverters.createEntity;
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;

final class DataFrameRequestConverters {
Expand Down Expand Up @@ -71,12 +72,16 @@ static Request getDataFrameTransform(GetDataFrameTransformRequest getRequest) {
return request;
}

static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest request) {
static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest deleteRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(request.getId())
.addPathPart(deleteRequest.getId())
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
if (deleteRequest.getForce() != null) {
request.addParameter(FORCE, Boolean.toString(deleteRequest.getForce()));
}
return request;
}

static Request startDataFrameTransform(StartDataFrameTransformRequest startRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
*/
public class DeleteDataFrameTransformRequest implements Validatable {

public static final String FORCE = "force";

private final String id;
private Boolean force;

public DeleteDataFrameTransformRequest(String id) {
this.id = id;
Expand All @@ -41,6 +44,14 @@ public String getId() {
return id;
}

public Boolean getForce() {
return force;
}

public void setForce(boolean force) {
this.force = force;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
Expand All @@ -54,7 +65,7 @@ public Optional<ValidationException> validate() {

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}

@Override
Expand All @@ -67,6 +78,6 @@ public boolean equals(Object obj) {
return false;
}
DeleteDataFrameTransformRequest other = (DeleteDataFrameTransformRequest) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && Objects.equals(force, other.force);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.not;

public class DataFrameRequestConvertersTests extends ESTestCase {

Expand Down Expand Up @@ -82,6 +84,13 @@ public void testDeleteDataFrameTransform() {

assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo"));

assertThat(request.getParameters(), not(hasKey("force")));

deleteRequest.setForce(true);
request = DataFrameRequestConverters.deleteDataFrameTransform(deleteRequest);

assertThat(request.getParameters(), hasEntry("force", "true"));
}

public void testStartDataFrameTransform() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ public void testDeleteDataFrameTransform() throws IOException, InterruptedExcept
// tag::delete-data-frame-transform-request
DeleteDataFrameTransformRequest request =
new DeleteDataFrameTransformRequest("mega-transform"); // <1>
request.setForce(false); // <2>
// end::delete-data-frame-transform-request

// tag::delete-data-frame-transform-execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ A +{request}+ object requires a non-null `id`.
include-tagged::{doc-tests-file}[{api}-request]
---------------------------------------------------
<1> Constructing a new request referencing an existing {dataframe-transform}
<2> Sets the optional argument `force`. When `true`, the {dataframe-transform}
is deleted regardless of its current state. The default value is `false`,
meaning that only `stopped` {dataframe-transforms} can be deleted.

include::../execution.asciidoc[]

Expand Down
7 changes: 7 additions & 0 deletions docs/reference/data-frames/apis/delete-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ see {stack-ov}/security-privileges.html[Security privileges] and
`<data_frame_transform_id>`::
(Required, string) Identifier for the {dataframe-transform}.

[[delete-data-frame-transform-query-parms]]
==== {api-query-parms-title}

`force`::
(Optional, boolean) When `true`, the {dataframe-transform} is deleted regardless of its
current state. The default value is `false`, meaning that the {dataframe-transform} must be
`stopped` before it can be deleted.

[[delete-data-frame-transform-examples]]
==== {api-examples-title}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.dataframe.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -27,25 +28,39 @@ private DeleteDataFrameTransformAction() {
}

public static class Request extends MasterNodeRequest<Request> {
private String id;
private final String id;
private final boolean force;

public Request(String id) {
public Request(String id, boolean force) {
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
this.force = force;
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
force = in.readBoolean();
} else {
force = false;
}
}

public String getId() {
return id;
}

public boolean isForce() {
return force;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeBoolean(force);
}
}

@Override
Expand All @@ -55,7 +70,7 @@ public ActionRequestValidationException validate() {

@Override
public int hashCode() {
return Objects.hash(id);
return Objects.hash(id, force);
}

@Override
Expand All @@ -68,7 +83,7 @@ public boolean equals(Object obj) {
return false;
}
Request other = (Request) obj;
return Objects.equals(id, other.id);
return Objects.equals(id, other.id) && force == other.force;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
public class DeleteDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -24,24 +25,31 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;

import java.io.IOException;

import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {

private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameAuditor auditor;
private final Client client;

@Inject
public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) {
DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor,
Client client) {
super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
Request::new, indexNameExpressionResolver);
this.transformsConfigManager = transformsConfigManager;
this.auditor = auditor;
this.client = client;
}

@Override
Expand All @@ -56,19 +64,34 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {

@Override
protected void masterOperation(Task task, Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
ActionListener<AcknowledgedResponse> listener) {
final PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null && request.isForce() == false) {
listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() +
"] as the task is running. Stop the task first", RestStatus.CONFLICT));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding "return;" after this line and getting rid of the "else" branch in line 72 would benefit readability IMO.

} else {
// Task is not running, delete the configuration document
transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
r -> {
auditor.info(request.getId(), "Deleted data frame transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
listener::onFailure));
ActionListener<Void> stopTransformActionListener = ActionListener.wrap(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[optional] IMO more readable code could be achieved by:

CheckedRunnable deleteTransform = () -> transformsConfigManager.deleteTransform(..., ...);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
  executeAsyncWithOrigin(
    ...,
    ActionListener.wrap(r -> deleteTransform.run(), listener::onFailure));
} else {
    deleteTransform.run();
}

I see 2 advantages here:

  1. variable "deleteTransform" clearly describes what it does and is not tied to some other action's listener.
  2. no need to pass unused parameters to it
    WDYT?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to try and keep callbacks uniformed. Our callback stacks can get very complicated and to me its just easier to reason about if they are ActionListener + and always within the flow (if possible)

stopResponse -> transformsConfigManager.deleteTransform(request.getId(),
ActionListener.wrap(
r -> {
auditor.info(request.getId(), "Deleted data frame transform.");
listener.onResponse(new AcknowledgedResponse(r));
},
listener::onFailure)),
listener::onFailure
);

if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
executeAsyncWithOrigin(client,
DATA_FRAME_ORIGIN,
StopDataFrameTransformAction.INSTANCE,
new StopDataFrameTransformAction.Request(request.getId(), true, true, null, true),
ActionListener.wrap(
r -> stopTransformActionListener.onResponse(null),
stopTransformActionListener::onFailure));
} else {
stopTransformActionListener.onResponse(null);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;

import java.io.IOException;

public class RestDeleteDataFrameTransformAction extends BaseRestHandler {

public RestDeleteDataFrameTransformAction(Settings settings, RestController controller) {
Expand All @@ -25,13 +23,14 @@ public RestDeleteDataFrameTransformAction(Settings settings, RestController cont
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("delete data frame transforms requests can not have a request body");
}

String id = restRequest.param(DataFrameField.ID.getPreferredName());
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);
boolean force = restRequest.paramAsBoolean(DataFrameField.FORCE.getPreferredName(), false);
DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id, force);

return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request,
new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
"required": true,
"description": "The id of the transform to delete"
}
},
"params": {
"force": {
"type": "boolean",
"required": false,
"description": "When `true`, the transform is deleted regardless of its current state. The default value is `false`, meaning that the transform must be `stopped` before it can be deleted."
}
}
},
"body": null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,47 @@ setup:
"description": "yaml test transform on airline-data",
"version": "7.3.0"
}
---
"Test force deleting a running transform":
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform-start-delete"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-start-delete" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"sync": {
"time": {
"field": "time",
"delay": "90m"
}
}
}
- match: { acknowledged: true }
- do:
data_frame.start_data_frame_transform:
transform_id: "airline-transform-start-delete"
- match: { acknowledged: true }

- do:
data_frame.get_data_frame_transform_stats:
transform_id: "airline-transform-start-delete"
- match: { count: 1 }
- match: { transforms.0.id: "airline-transform-start-delete" }
- match: { transforms.0.state.indexer_state: "/started|indexing/" }
- match: { transforms.0.state.task_state: "started" }

- do:
catch: /Cannot delete data frame \[airline-transform-start-delete\] as the task is running/
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-delete"

- do:
data_frame.delete_data_frame_transform:
transform_id: "airline-transform-start-delete"
force: true
- match: { acknowledged: true }
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ teardown:
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this config on purpose. I noticed while writing the yaml test for this PR, that this particular test was saying that its transform was continuous, but it actually wasn't. So, I just fixed it.

"sync": {
"time": {
"field": "time",
"delay": "90m"
}
}
}

Expand Down