Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.amazon.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.server.DataPrepperServer;
import io.micrometer.core.instrument.util.StringUtils;
Expand All @@ -31,6 +32,7 @@ public class DataPrepper {
private static final String DEFAULT_SERVICE_NAME = "dataprepper";

private final PluginFactory pluginFactory;
private final PeerForwarderServer peerForwarderServer;
private Map<String, Pipeline> transformationPipelines;

// TODO: Remove DataPrepperServer dependency on DataPrepper
Expand All @@ -50,14 +52,16 @@ public static String getServiceNameForMetrics() {
@Inject
public DataPrepper(
final PipelineParser pipelineParser,
final PluginFactory pluginFactory
) {
final PluginFactory pluginFactory,
final PeerForwarderServer peerForwarderServer
) {
this.pluginFactory = pluginFactory;

transformationPipelines = pipelineParser.parseConfiguration();
if (transformationPipelines.size() == 0) {
throw new RuntimeException("No valid pipeline is available for execution, exiting");
}
this.peerForwarderServer = peerForwarderServer;
}

/**
Expand All @@ -66,6 +70,7 @@ public DataPrepper(
* @return true if execute successfully initiates the Data Prepper
*/
public boolean execute() {
peerForwarderServer.start();
transformationPipelines.forEach((name, pipeline) -> {
pipeline.execute();
});
Expand All @@ -84,10 +89,11 @@ public void shutdown() {
}

/**
* Triggers shutdown of the Data Prepper server.
* Triggers shutdown of the Data Prepper and Peer Forwarder server.
*/
public void shutdownDataPrepperServer() {
public void shutdownServers() {
dataPrepperServer.stop();
peerForwarderServer.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void handle(HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(HttpURLConnection.HTTP_INTERNAL_ERROR, 0);
} finally {
exchange.getResponseBody().close();
dataPrepper.shutdownDataPrepperServer();
dataPrepper.shutdownServers();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.amazon.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.parser.PipelineParser;
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
import org.opensearch.dataprepper.pipeline.Pipeline;
import org.opensearch.dataprepper.pipeline.server.DataPrepperServer;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -38,6 +39,8 @@ public class DataPrepperTests {
private PluginFactory pluginFactory;
@Mock
private DataPrepperServer dataPrepperServer;
@Mock
private PeerForwarderServer peerForwarderServer;
@InjectMocks
private DataPrepper dataPrepper;

Expand Down Expand Up @@ -73,7 +76,7 @@ public void testGivenInvalidInputThenExceptionThrown() {

assertThrows(
RuntimeException.class,
() -> new DataPrepper(pipelineParser, pluginFactory),
() -> new DataPrepper(pipelineParser, pluginFactory, peerForwarderServer),
"Exception should be thrown if pipeline parser has no pipeline configuration");
}

Expand All @@ -93,6 +96,7 @@ public void testGivenValidPipelineParserWhenExecuteThenAllPipelinesExecuteAndSer

verify(pipeline).execute();
verify(dataPrepperServer).start();
verify(peerForwarderServer).start();
}

@Test
Expand All @@ -116,10 +120,11 @@ public void testDataPrepperShutdownNonExistentPipelineWithoutException() {
}

@Test
public void testShutdownDataPrepperServer() {
dataPrepper.shutdownDataPrepperServer();
public void testShutdownServers() {
dataPrepper.shutdownServers();

verify(dataPrepperServer).stop();
verify(peerForwarderServer).stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testWhenShutdownWithPostRequestThenResponseWritten() throws IOExcept
verify(responseBody, times(1))
.close();
verify(dataPrepper, times(1))
.shutdownDataPrepperServer();
.shutdownServers();
}

@ParameterizedTest
Expand Down