-
Notifications
You must be signed in to change notification settings - Fork 25.8k
[ML][Data Frame] adding force delete #44590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| import org.elasticsearch.action.support.ActionFilters; | ||
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
| import org.elasticsearch.action.support.master.TransportMasterNodeAction; | ||
| import org.elasticsearch.client.Client; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.block.ClusterBlockException; | ||
| import org.elasticsearch.cluster.block.ClusterBlockLevel; | ||
|
|
@@ -24,24 +25,31 @@ | |
| import org.elasticsearch.transport.TransportService; | ||
| import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; | ||
| import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request; | ||
| import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; | ||
| import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor; | ||
| import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager; | ||
|
|
||
| import java.io.IOException; | ||
|
|
||
| import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; | ||
| import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; | ||
|
|
||
| public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> { | ||
|
|
||
| private final DataFrameTransformsConfigManager transformsConfigManager; | ||
| private final DataFrameAuditor auditor; | ||
| private final Client client; | ||
|
|
||
| @Inject | ||
| public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool, | ||
| ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, | ||
| DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor) { | ||
| DataFrameTransformsConfigManager transformsConfigManager, DataFrameAuditor auditor, | ||
| Client client) { | ||
| super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, | ||
| Request::new, indexNameExpressionResolver); | ||
| this.transformsConfigManager = transformsConfigManager; | ||
| this.auditor = auditor; | ||
| this.client = client; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -56,19 +64,34 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException { | |
|
|
||
| @Override | ||
| protected void masterOperation(Task task, Request request, ClusterState state, | ||
| ActionListener<AcknowledgedResponse> listener) throws Exception { | ||
| PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
| if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { | ||
| ActionListener<AcknowledgedResponse> listener) { | ||
| final PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); | ||
| if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null && request.isForce() == false) { | ||
| listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() + | ||
| "] as the task is running. Stop the task first", RestStatus.CONFLICT)); | ||
| } else { | ||
| // Task is not running, delete the configuration document | ||
| transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap( | ||
| r -> { | ||
| auditor.info(request.getId(), "Deleted data frame transform."); | ||
| listener.onResponse(new AcknowledgedResponse(r)); | ||
| }, | ||
| listener::onFailure)); | ||
| ActionListener<Void> stopTransformActionListener = ActionListener.wrap( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [optional] IMO more readable code could be achieved by: I see 2 advantages here:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to try and keep callbacks uniformed. Our callback stacks can get very complicated and to me its just easier to reason about if they are |
||
| stopResponse -> transformsConfigManager.deleteTransform(request.getId(), | ||
| ActionListener.wrap( | ||
| r -> { | ||
| auditor.info(request.getId(), "Deleted data frame transform."); | ||
| listener.onResponse(new AcknowledgedResponse(r)); | ||
| }, | ||
| listener::onFailure)), | ||
| listener::onFailure | ||
| ); | ||
|
|
||
| if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { | ||
| executeAsyncWithOrigin(client, | ||
| DATA_FRAME_ORIGIN, | ||
| StopDataFrameTransformAction.INSTANCE, | ||
| new StopDataFrameTransformAction.Request(request.getId(), true, true, null, true), | ||
| ActionListener.wrap( | ||
| r -> stopTransformActionListener.onResponse(null), | ||
| stopTransformActionListener::onFailure)); | ||
| } else { | ||
| stopTransformActionListener.onResponse(null); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,6 +141,12 @@ teardown: | |
| "pivot": { | ||
| "group_by": { "airline": {"terms": {"field": "airline"}}}, | ||
| "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} | ||
| }, | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed this config on purpose. I noticed while writing the |
||
| "sync": { | ||
| "time": { | ||
| "field": "time", | ||
| "delay": "90m" | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding "return;" after this line and getting rid of the "else" branch in line 72 would benefit readability IMO.