Skip to content

Commit cb7b0e7

Browse files
dimitris-athanasioudavidkyle
authored andcommitted
[ML] Job in index: delete filter action (elastic#34642)
This changes the delete filter action to search for jobs using the filter to be deleted in the index rather than the cluster state.
1 parent 7fd4b9d commit cb7b0e7

2 files changed

Lines changed: 42 additions & 30 deletions

File tree

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteFilterAction.java

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,21 @@
1616
import org.elasticsearch.action.support.WriteRequest;
1717
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1818
import org.elasticsearch.client.Client;
19-
import org.elasticsearch.cluster.ClusterState;
20-
import org.elasticsearch.cluster.service.ClusterService;
2119
import org.elasticsearch.common.inject.Inject;
2220
import org.elasticsearch.common.settings.Settings;
2321
import org.elasticsearch.rest.RestStatus;
2422
import org.elasticsearch.tasks.Task;
2523
import org.elasticsearch.transport.TransportService;
2624
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
27-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2825
import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
2926
import org.elasticsearch.xpack.core.ml.job.config.Detector;
3027
import org.elasticsearch.xpack.core.ml.job.config.Job;
3128
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
3229
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
30+
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
3331

3432
import java.util.ArrayList;
3533
import java.util.List;
36-
import java.util.Map;
3734
import java.util.function.Supplier;
3835

3936
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@@ -42,25 +39,39 @@
4239
public class TransportDeleteFilterAction extends HandledTransportAction<DeleteFilterAction.Request, AcknowledgedResponse> {
4340

4441
private final Client client;
45-
private final ClusterService clusterService;
42+
private final JobConfigProvider jobConfigProvider;
4643

4744
@Inject
4845
public TransportDeleteFilterAction(Settings settings, TransportService transportService,
49-
ActionFilters actionFilters, ClusterService clusterService, Client client) {
46+
ActionFilters actionFilters, Client client,
47+
JobConfigProvider jobConfigProvider) {
5048
super(settings, DeleteFilterAction.NAME, transportService, actionFilters,
5149
(Supplier<DeleteFilterAction.Request>) DeleteFilterAction.Request::new);
52-
this.clusterService = clusterService;
5350
this.client = client;
51+
this.jobConfigProvider = jobConfigProvider;
5452
}
5553

5654
@Override
5755
protected void doExecute(Task task, DeleteFilterAction.Request request, ActionListener<AcknowledgedResponse> listener) {
58-
5956
final String filterId = request.getFilterId();
60-
ClusterState state = clusterService.state();
61-
Map<String, Job> jobs = MlMetadata.getMlMetadata(state).getJobs();
57+
jobConfigProvider.findJobsWithCustomRules(ActionListener.wrap(
58+
jobs-> {
59+
List<String> currentlyUsedBy = findJobsUsingFilter(jobs, filterId);
60+
if (!currentlyUsedBy.isEmpty()) {
61+
listener.onFailure(ExceptionsHelper.conflictStatusException(
62+
"Cannot delete filter, currently used by jobs: " + currentlyUsedBy));
63+
} else {
64+
deleteFilter(filterId, listener);
65+
}
66+
},
67+
listener::onFailure
68+
)
69+
);
70+
}
71+
72+
private static List<String> findJobsUsingFilter(List<Job> jobs, String filterId) {
6273
List<String> currentlyUsedBy = new ArrayList<>();
63-
for (Job job : jobs.values()) {
74+
for (Job job : jobs) {
6475
List<Detector> detectors = job.getAnalysisConfig().getDetectors();
6576
for (Detector detector : detectors) {
6677
if (detector.extractReferencedFilters().contains(filterId)) {
@@ -69,31 +80,31 @@ protected void doExecute(Task task, DeleteFilterAction.Request request, ActionLi
6980
}
7081
}
7182
}
72-
if (!currentlyUsedBy.isEmpty()) {
73-
throw ExceptionsHelper.conflictStatusException("Cannot delete filter, currently used by jobs: "
74-
+ currentlyUsedBy);
75-
}
83+
return currentlyUsedBy;
84+
}
7685

77-
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, MlFilter.documentId(filterId));
86+
private void deleteFilter(String filterId, ActionListener<AcknowledgedResponse> listener) {
87+
DeleteRequest deleteRequest = new DeleteRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE,
88+
MlFilter.documentId(filterId));
7889
BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
7990
bulkRequestBuilder.add(deleteRequest);
8091
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
8192
executeAsyncWithOrigin(client, ML_ORIGIN, BulkAction.INSTANCE, bulkRequestBuilder.request(),
82-
new ActionListener<BulkResponse>() {
83-
@Override
84-
public void onResponse(BulkResponse bulkResponse) {
85-
if (bulkResponse.getItems()[0].status() == RestStatus.NOT_FOUND) {
86-
listener.onFailure(new ResourceNotFoundException("Could not delete filter with ID [" + filterId
87-
+ "] because it does not exist"));
88-
} else {
89-
listener.onResponse(new AcknowledgedResponse(true));
90-
}
93+
new ActionListener<BulkResponse>() {
94+
@Override
95+
public void onResponse(BulkResponse bulkResponse) {
96+
if (bulkResponse.getItems()[0].status() == RestStatus.NOT_FOUND) {
97+
listener.onFailure(new ResourceNotFoundException("Could not delete filter with ID [" + filterId
98+
+ "] because it does not exist"));
99+
} else {
100+
listener.onResponse(new AcknowledgedResponse(true));
91101
}
102+
}
92103

93-
@Override
94-
public void onFailure(Exception e) {
95-
listener.onFailure(ExceptionsHelper.serverError("Could not delete filter with ID [" + filterId + "]", e));
96-
}
97-
});
104+
@Override
105+
public void onFailure(Exception e) {
106+
listener.onFailure(ExceptionsHelper.serverError("Could not delete filter with ID [" + filterId + "]", e));
107+
}
108+
});
98109
}
99110
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void putJob(Job job, ActionListener<IndexResponse> listener) {
107107
ElasticsearchMappings.DOC_TYPE, Job.documentId(job.getId()))
108108
.setSource(source)
109109
.setOpType(DocWriteRequest.OpType.CREATE)
110+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
110111
.request();
111112

112113
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(

0 commit comments

Comments
 (0)