Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
- Add synchronous execution option to workflow provisioning ([#990](https://github.com/opensearch-project/flow-framework/pull/990))
- Add ApiSpecFetcher for Fetching and Comparing API Specifications ([#651](https://github.com/opensearch-project/flow-framework/issues/651))
- Add optional config field to tool step ([#899](https://github.com/opensearch-project/flow-framework/pull/899))
- Add API Consistency Tests with ML-Common and Set Up Daily GitHub Action Trigger([#908](https://github.com/opensearch-project/flow-framework/issues/908))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private CommonValue() {}
public static final String PROVISION_WORKFLOW = "provision";
/** The param name for update workflow field in create API */
public static final String UPDATE_WORKFLOW_FIELDS = "update_fields";
/** The param name for specifying the timeout duration in seconds to wait for workflow completion */
public static final String WAIT_FOR_COMPLETION_TIMEOUT = "wait_for_completion_timeout";
/** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
Expand Down Expand Up @@ -186,6 +188,8 @@ private CommonValue() {}
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/** Provision Timeout field */
public static final String PROVISION_TIMEOUT_FIELD = "provision.timeout";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand Down Expand Up @@ -43,6 +44,7 @@
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -88,6 +90,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
Expand Down Expand Up @@ -145,6 +148,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
return processError(ffe, params, request);
}
// Ensure wait_for_completion is not set unless reprovision or provision is true
if (waitForCompletionTimeout != TimeValue.MINUS_ONE && !(reprovision || provision)) {
FlowFrameworkException ffe = new FlowFrameworkException(
"Request parameters 'wait_for_completion_timeout' are not allowed unless the 'provision' or 'reprovision' parameter is set to true.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}

try {
Template template;
Map<String, String> useCaseDefaultsMap = Collections.emptyMap();
Expand Down Expand Up @@ -219,7 +231,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (updateFields) {
params = Map.of(UPDATE_WORKFLOW_FIELDS, "true");
}

if (waitForCompletionTimeout != TimeValue.MINUS_ONE) {
params = Map.of(WAIT_FOR_COMPLETION_TIMEOUT, waitForCompletionTimeout.toString());
}
WorkflowRequest workflowRequest = new WorkflowRequest(
workflowId,
template,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
Expand All @@ -33,6 +34,7 @@
import java.util.stream.Collectors;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
Expand Down Expand Up @@ -73,6 +75,7 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
TimeValue waitForCompletionTimeout = request.paramAsTime(WAIT_FOR_COMPLETION_TIMEOUT, TimeValue.MINUS_ONE);
try {
Map<String, String> params = parseParamsAndContent(request);
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
Expand All @@ -86,7 +89,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}
// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params);
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, params, waitForCompletionTimeout);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD;
import static org.opensearch.flowframework.common.CommonValue.WAIT_FOR_COMPLETION_TIMEOUT;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FILTER_BY_BACKEND_ROLES;
import static org.opensearch.flowframework.util.ParseUtils.checkFilterByBackendRoles;
import static org.opensearch.flowframework.util.ParseUtils.getUserContext;
Expand Down Expand Up @@ -210,6 +211,16 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
}
}
String workflowId = request.getWorkflowId();
TimeValue waitForTimeCompletion;
if (request.getParams().containsKey(WAIT_FOR_COMPLETION_TIMEOUT)) {
waitForTimeCompletion = TimeValue.parseTimeValue(
request.getParams().get(WAIT_FOR_COMPLETION_TIMEOUT),
WAIT_FOR_COMPLETION_TIMEOUT
);
} else {
// default to minus one indicate async execution
waitForTimeCompletion = TimeValue.MINUS_ONE;
}
if (workflowId == null) {
// This is a new workflow (POST)
// Throttle incoming requests
Expand Down Expand Up @@ -244,7 +255,8 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
WorkflowRequest workflowRequest = new WorkflowRequest(
globalContextResponse.getId(),
null,
request.getParams()
request.getParams(),
waitForTimeCompletion
);
logger.info(
"Provisioning parameter is set, continuing to provision workflow {}",
Expand All @@ -254,7 +266,14 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
ProvisionWorkflowAction.INSTANCE,
workflowRequest,
ActionListener.wrap(provisionResponse -> {
listener.onResponse(new WorkflowResponse(provisionResponse.getWorkflowId()));
listener.onResponse(
(workflowRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE)
? new WorkflowResponse(provisionResponse.getWorkflowId())
: new WorkflowResponse(
provisionResponse.getWorkflowId(),
provisionResponse.getWorkflowState()
)
);
}, exception -> {
String errorMessage = "Provisioning failed.";
logger.error(errorMessage, exception);
Expand Down Expand Up @@ -336,19 +355,26 @@ private void createExecute(WorkflowRequest request, User user, ActionListener<Wo
.build();

if (request.isReprovision()) {

// Reprovision request
ReprovisionWorkflowRequest reprovisionRequest = new ReprovisionWorkflowRequest(
getResponse.getId(),
existingTemplate,
template
template,
waitForTimeCompletion
);
logger.info("Reprovisioning parameter is set, continuing to reprovision workflow {}", getResponse.getId());
client.execute(
ReprovisionWorkflowAction.INSTANCE,
reprovisionRequest,
ActionListener.wrap(reprovisionResponse -> {
listener.onResponse(new WorkflowResponse(reprovisionResponse.getWorkflowId()));
listener.onResponse(
reprovisionRequest.getWaitForCompletionTimeout() == TimeValue.MINUS_ONE
? new WorkflowResponse(reprovisionResponse.getWorkflowId())
: new WorkflowResponse(
reprovisionResponse.getWorkflowId(),
reprovisionResponse.getWorkflowState()
)
);
}, exception -> {
String errorMessage = "Reprovisioning failed for workflow " + workflowId;
logger.error(errorMessage, exception);
Expand Down
Loading