Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
builder.field(FIELD_NAME_NAME.getPreferredName(), datasource.getName());
builder.field(FIELD_NAME_STATE.getPreferredName(), datasource.getState());
builder.field(FIELD_NAME_ENDPOINT.getPreferredName(), datasource.getEndpoint());
builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getSchedule().getInterval());
builder.field(FIELD_NAME_UPDATE_INTERVAL.getPreferredName(), datasource.getUserSchedule().getInterval());
builder.timeField(
FIELD_NAME_NEXT_UPDATE_AT.getPreferredName(),
FIELD_NAME_NEXT_UPDATE_AT_READABLE.getPreferredName(),
datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
datasource.getUserSchedule().getNextExecutionTime(Instant.now()).toEpochMilli()
);
builder.field(FIELD_NAME_DATABASE.getPreferredName(), datasource.getDatabase());
builder.field(FIELD_NAME_UPDATE_STATS.getPreferredName(), datasource.getUpdateStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,12 @@ private void updateIfChanged(final UpdateDatasourceRequest request, final Dataso
}

if (isUpdateIntervalChanged(request, datasource)) {
datasource.setSchedule(
new IntervalSchedule(datasource.getSchedule().getStartTime(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS)
datasource.setUserSchedule(
new IntervalSchedule(
datasource.getUserSchedule().getStartTime(),
(int) request.getUpdateInterval().getDays(),
ChronoUnit.DAYS
)
);
isChanged = true;
}
Expand Down Expand Up @@ -163,7 +167,7 @@ private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasour

long updateInterval = isUpdateIntervalChanged(request, datasource)
? request.getUpdateInterval().days()
: datasource.getSchedule().getInterval();
: datasource.getUserSchedule().getInterval();

if (updateInterval >= validForInDays) {
throw new InvalidParameterException(
Expand All @@ -177,6 +181,7 @@ private boolean isEndpointChanged(final UpdateDatasourceRequest request, final D
}

private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) {
return request.getUpdateInterval() != null && (int) request.getUpdateInterval().days() != datasource.getSchedule().getInterval();
return request.getUpdateInterval() != null
&& (int) request.getUpdateInterval().days() != datasource.getUserSchedule().getInterval();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,28 @@ public class Datasource implements Writeable, ScheduledJobParameter {
private static final ParseField ENABLED_FIELD = new ParseField("update_enabled");
private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time");
private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field");
private static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
/**
* Schedule that user set
*/
private static final ParseField USER_SCHEDULE_FIELD = new ParseField("user_schedule");
/**
* System schedule which will be used by job scheduler
*
* If datasource is going to get expired before next update, we want to run clean up task before the next update
* by changing system schedule.
*
* If datasource is restored from snapshot, we want to run clean up task immediately to handle expired datasource
* by changing system schedule.
*
* For every task run, we revert system schedule back to user schedule.
*/
private static final ParseField SYSTEM_SCHEDULE_FIELD = new ParseField("system_schedule");
/**
* {@link DatasourceTask} that DatasourceRunner will execute in next run
*
* For every task run, we revert task back to {@link DatasourceTask#ALL}
*/
private static final ParseField TASK_FIELD = new ParseField("task");
private static final ParseField ENABLED_TIME_FIELD = new ParseField("enabled_time");
private static final ParseField ENABLED_TIME_FIELD_READABLE = new ParseField("enabled_time_field");

Expand Down Expand Up @@ -97,10 +118,22 @@ public class Datasource implements Writeable, ScheduledJobParameter {
*/
private boolean isEnabled;
/**
* @param schedule Schedule for a GeoIP data update
* @return Schedule for the job scheduler
* @param userSchedule Schedule that user provided
* @return Schedule that user provided
*/
private IntervalSchedule userSchedule;

/**
* @param systemSchedule Schedule that job scheduler use
* @return Schedule that job scheduler use
*/
private IntervalSchedule schedule;
private IntervalSchedule systemSchedule;

/**
* @param task Task that {@link DatasourceRunner} will execute
* @return Task that {@link DatasourceRunner} will execute
*/
private DatasourceTask task;

/**
* Additional variables for datasource
Expand Down Expand Up @@ -143,18 +176,22 @@ public class Datasource implements Writeable, ScheduledJobParameter {
Instant lastUpdateTime = Instant.ofEpochMilli((long) args[1]);
Instant enabledTime = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]);
boolean isEnabled = (boolean) args[3];
IntervalSchedule schedule = (IntervalSchedule) args[4];
String endpoint = (String) args[5];
DatasourceState state = DatasourceState.valueOf((String) args[6]);
List<String> indices = (List<String>) args[7];
Database database = (Database) args[8];
UpdateStats updateStats = (UpdateStats) args[9];
IntervalSchedule userSchedule = (IntervalSchedule) args[4];
IntervalSchedule systemSchedule = (IntervalSchedule) args[5];
DatasourceTask task = DatasourceTask.valueOf((String) args[6]);
String endpoint = (String) args[7];
DatasourceState state = DatasourceState.valueOf((String) args[8]);
List<String> indices = (List<String>) args[9];
Database database = (Database) args[10];
UpdateStats updateStats = (UpdateStats) args[11];
Datasource parameter = new Datasource(
name,
lastUpdateTime,
enabledTime,
isEnabled,
schedule,
userSchedule,
systemSchedule,
task,
endpoint,
state,
indices,
Expand All @@ -170,7 +207,9 @@ public class Datasource implements Writeable, ScheduledJobParameter {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_UPDATE_TIME_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ENABLED_TIME_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SCHEDULE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), USER_SCHEDULE_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SYSTEM_SCHEDULE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TASK_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
Expand All @@ -190,6 +229,8 @@ public Datasource(final String name, final IntervalSchedule schedule, final Stri
null,
false,
schedule,
schedule,
DatasourceTask.ALL,
endpoint,
DatasourceState.CREATING,
new ArrayList<>(),
Expand All @@ -203,7 +244,9 @@ public Datasource(final StreamInput in) throws IOException {
lastUpdateTime = toInstant(in.readVLong());
enabledTime = toInstant(in.readOptionalVLong());
isEnabled = in.readBoolean();
schedule = new IntervalSchedule(in);
userSchedule = new IntervalSchedule(in);
systemSchedule = new IntervalSchedule(in);
task = DatasourceTask.valueOf(in.readString());
endpoint = in.readString();
state = DatasourceState.valueOf(in.readString());
indices = in.readStringList();
Expand All @@ -217,7 +260,9 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeVLong(lastUpdateTime.toEpochMilli());
out.writeOptionalVLong(enabledTime == null ? null : enabledTime.toEpochMilli());
out.writeBoolean(isEnabled);
schedule.writeTo(out);
userSchedule.writeTo(out);
systemSchedule.writeTo(out);
out.writeString(task.name());
out.writeString(endpoint);
out.writeString(state.name());
out.writeStringCollection(indices);
Expand All @@ -242,7 +287,9 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
);
}
builder.field(ENABLED_FIELD.getPreferredName(), isEnabled);
builder.field(SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(USER_SCHEDULE_FIELD.getPreferredName(), userSchedule);
builder.field(SYSTEM_SCHEDULE_FIELD.getPreferredName(), systemSchedule);
builder.field(TASK_FIELD.getPreferredName(), task.name());
builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint);
builder.field(STATE_FIELD.getPreferredName(), state.name());
builder.field(INDICES_FIELD.getPreferredName(), indices);
Expand All @@ -269,7 +316,7 @@ public Instant getEnabledTime() {

@Override
public IntervalSchedule getSchedule() {
return schedule;
return systemSchedule;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.jobscheduler;

/**
* Task that {@link DatasourceRunner} will run
*/
public enum DatasourceTask {
/**
* Do everything
*/
ALL,

/**
* Only delete unused indices
*/
DELETE_UNUSED_INDICES
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
Expand Down Expand Up @@ -143,6 +144,13 @@ protected DatasourceState randomState() {
.get(Randomness.createSecure().nextInt(DatasourceState.values().length - 1));
}

protected DatasourceTask randomTask() {
return Arrays.stream(DatasourceTask.values())
.sequential()
.collect(Collectors.toList())
.get(Randomness.createSecure().nextInt(DatasourceTask.values().length - 1));
}

protected String randomIpAddress() {
return String.format(
Locale.ROOT,
Expand Down Expand Up @@ -183,7 +191,9 @@ protected Datasource randomDatasource() {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setUserSchedule(new IntervalSchedule(now, Randomness.get().nextInt(28) + 1, ChronoUnit.DAYS));
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public void testToXContent_whenValidInput_thenSucceed() throws Exception {
assertTrue(json.contains(String.format(Locale.ROOT, "\"name\":\"%s\"", datasource.getName())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"state\":\"%s\"", datasource.getState())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"endpoint\":\"%s\"", datasource.getEndpoint())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getSchedule().getInterval())));
assertTrue(
json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getUserSchedule().getInterval()))
);
assertTrue(json.contains(String.format(Locale.ROOT, "\"next_update_at_in_epoch_millis\"")));
assertTrue(json.contains(String.format(Locale.ROOT, "\"provider\":\"%s\"", datasource.getDatabase().getProvider())));
assertTrue(json.contains(String.format(Locale.ROOT, "\"sha256_hash\":\"%s\"", datasource.getDatabase().getSha256Hash())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void testInternalDoExecute_whenValidInput_thenSucceed() {
verify(datasourceFacade).putDatasource(datasourceCaptor.capture(), actionListenerCaptor.capture());
assertEquals(request.getName(), datasourceCaptor.getValue().getName());
assertEquals(request.getEndpoint(), datasourceCaptor.getValue().getEndpoint());
assertEquals(request.getUpdateInterval().days(), datasourceCaptor.getValue().getSchedule().getInterval());
assertEquals(request.getUpdateInterval().days(), datasourceCaptor.getValue().getUserSchedule().getInterval());

// Run next listener.onResponse
actionListenerCaptor.getValue().onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testDoExecute_whenValidInput_thenUpdate() {
verify(datasourceFacade).updateDatasource(datasource);
verify(datasourceUpdateService).getHeaderFields(request.getEndpoint());
assertEquals(request.getEndpoint(), datasource.getEndpoint());
assertEquals(request.getUpdateInterval().days(), datasource.getSchedule().getInterval());
assertEquals(request.getUpdateInterval().days(), datasource.getUserSchedule().getInterval());
verify(listener).onResponse(new AcknowledgedResponse(true));
verify(ip2GeoLockService).releaseLock(eq(lockModel));
}
Expand All @@ -119,7 +119,7 @@ public void testDoExecute_whenValidInput_thenUpdate() {
public void testDoExecute_whenNoChangesInValues_thenNoUpdate() {
Datasource datasource = randomDatasource();
UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName());
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval()));
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getUserSchedule().getInterval()));
request.setEndpoint(datasource.getEndpoint());

Task task = mock(Task.class);
Expand Down