Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))
- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17779](https://github.com/opensearch-project/OpenSearch/pull/17779))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
assertThat(pendingClusterTasks.get(0).getTimeInExecutionInMillis(), greaterThan(0L));
assertThat(pendingClusterTasks.get(1).getTimeInExecutionInMillis(), equalTo(0L));
for (PendingClusterTask task : pendingClusterTasks) {
controlSources.remove(task.getSource().string());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public String toString() {
.append(pendingClusterTask.getSource())
.append("/")
.append(pendingClusterTask.getTimeInQueue())
.append("/")
.append(pendingClusterTask.getTimeInExecution())
.append("\n");
}
return sb.toString();
Expand All @@ -108,6 +110,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.EXECUTING, pendingClusterTask.isExecuting());
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.field(Fields.TIME_IN_EXECUTION_MILLIS, pendingClusterTask.getTimeInExecutionInMillis());
builder.field(Fields.TIME_IN_EXECUTION, pendingClusterTask.getTimeInExecution());
builder.endObject();
}
builder.endArray();
Expand All @@ -129,6 +133,8 @@ static final class Fields {
static final String SOURCE = "source";
static final String TIME_IN_QUEUE_MILLIS = "time_in_queue_millis";
static final String TIME_IN_QUEUE = "time_in_queue";
static final String TIME_IN_EXECUTION_MILLIS = "time_in_execution_millis";
static final String TIME_IN_EXECUTION = "time_in_execution";

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ public List<PendingClusterTask> pendingTasks() {
pending.priority,
new Text(task.source()),
task.getAgeInMillis(),
pending.executing
pending.executing,
pending.executionTimeInMillis
);
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,27 @@ public class PendingClusterTask implements Writeable {
private Text source;
private long timeInQueue;
private boolean executing;
private long timeInExecution;

public PendingClusterTask(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.readFrom(in);
source = in.readText();
timeInQueue = in.readLong();
executing = in.readBoolean();
timeInExecution = in.readLong();
}

public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing) {
public PendingClusterTask(long insertOrder, Priority priority, Text source, long timeInQueue, boolean executing, long timeInExecution) {
assert timeInQueue >= 0 : "got a negative timeInQueue [" + timeInQueue + "]";
assert insertOrder >= 0 : "got a negative insertOrder [" + insertOrder + "]";
assert timeInExecution >= 0 : "got a negative timeInExecution [" + timeInExecution + "]";
this.insertOrder = insertOrder;
this.priority = priority;
this.source = source;
this.timeInQueue = timeInQueue;
this.executing = executing;
this.timeInExecution = timeInExecution;
}

public long getInsertOrder() {
Expand All @@ -90,10 +94,18 @@ public long getTimeInQueueInMillis() {
return timeInQueue;
}

public long getTimeInExecutionInMillis() {
return timeInExecution;
}

public TimeValue getTimeInQueue() {
return new TimeValue(getTimeInQueueInMillis());
}

public TimeValue getTimeInExecution() {
return new TimeValue(getTimeInExecutionInMillis());
}

public boolean isExecuting() {
return executing;
}
Expand All @@ -105,5 +117,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeText(source);
out.writeLong(timeInQueue);
out.writeBoolean(executing);
out.writeLong(timeInExecution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/**
* A prioritizing executor which uses a priority queue as a work queue. The jobs that will be submitted will be treated
Expand All @@ -63,6 +64,13 @@ public class PrioritizedOpenSearchThreadPoolExecutor extends OpenSearchThreadPoo
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
private final ScheduledExecutorService timer;

/*
Track current executing task and its start time
We can keep a map if we decide to go with multitask execution in future
*/
private final AtomicReference<Runnable> currentTask = new AtomicReference<>();
private final AtomicLong currentTaskStartTimeNanos = new AtomicLong(0);

public PrioritizedOpenSearchThreadPoolExecutor(
String name,
int corePoolSize,
Expand Down Expand Up @@ -112,8 +120,17 @@ public TimeValue getMaxTaskWaitTime() {
return TimeValue.timeValueNanos(now - oldestCreationDateInNanos);
}

private long getExecutionTimeInMillis(Runnable r) {
long startTimeNanos = currentTaskStartTimeNanos.get();
if (r.equals(currentTask.get()) && startTimeNanos != 0) {
return TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, TimeUnit.NANOSECONDS);
}
return 0; // Task is not currently executing
}

private void addPending(List<Runnable> runnables, List<Pending> pending, boolean executing) {
for (Runnable runnable : runnables) {
long executionTimeInMillis = executing ? getExecutionTimeInMillis(runnable) : 0;
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
Runnable innerRunnable = t.runnable;
Expand All @@ -122,28 +139,34 @@ private void addPending(List<Runnable> runnables, List<Pending> pending, boolean
innerRunnable can be null if task is finished but not removed from executor yet,
see {@link TieBreakingPrioritizedRunnable#run} and {@link TieBreakingPrioritizedRunnable#runAndClean}
*/
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing));
pending.add(new Pending(super.unwrap(innerRunnable), t.priority(), t.insertionOrder, executing, executionTimeInMillis));
}
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
Object task = t.task;
if (t.task instanceof Runnable) {
task = super.unwrap((Runnable) t.task);
}
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
pending.add(new Pending(task, t.priority, t.insertionOrder, executing, executionTimeInMillis));
}
}
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
current.add(r);
currentTask.set(r);
currentTaskStartTimeNanos.set(System.nanoTime());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
current.remove(r);
if (r.equals(currentTask.get())) {
currentTask.set(null);
currentTaskStartTimeNanos.set(0);
}
}

public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
Expand Down Expand Up @@ -211,12 +234,14 @@ public static class Pending {
public final Priority priority;
public final long insertionOrder;
public final boolean executing;
public final long executionTimeInMillis;

public Pending(Object task, Priority priority, long insertionOrder, boolean executing) {
public Pending(Object task, Priority priority, long insertionOrder, boolean executing, long executionTimeInMillis) {
this.task = task;
this.priority = priority;
this.insertionOrder = insertionOrder;
this.executing = executing;
this.executionTimeInMillis = executionTimeInMillis;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class PrioritizedExecutorsTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -231,6 +232,7 @@ public void testTimeout() throws Exception {
@Override
public void run() {
try {
Thread.sleep(50);
invoked.countDown();
block.await();
} catch (InterruptedException e) {
Expand All @@ -248,6 +250,7 @@ public String toString() {
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));

final AtomicBoolean executeCalled = new AtomicBoolean();
final CountDownLatch timedOut = new CountDownLatch(1);
Expand All @@ -272,8 +275,10 @@ public void run() {
assertThat(pending.length, equalTo(2));
assertThat(pending[0].task.toString(), equalTo("the blocking"));
assertThat(pending[0].executing, equalTo(true));
assertThat(pending[0].executionTimeInMillis, greaterThan(0L));
assertThat(pending[1].task.toString(), equalTo("the waiting"));
assertThat(pending[1].executing, equalTo(false));
assertThat(pending[1].executionTimeInMillis, equalTo(0L));

assertThat(timedOut.await(2, TimeUnit.SECONDS), equalTo(true));
block.countDown();
Expand Down
Loading