From 52e64db4783f37e369f3dc6ea7cd97aa2372cf30 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Fri, 6 May 2022 13:24:20 +0800 Subject: [PATCH 1/2] fix: depedencies and bq metada --- build.gradle | 33 ++++++++++++------- .../sink/connectors/OdpfSinkResponse.java | 16 ++++----- .../bigquery/BigQuerySinkFactory.java | 2 +- .../handler/MessageRecordConverter.java | 18 ++++++++-- .../connectors/config/BigQuerySinkConfig.java | 2 +- .../sink/connectors/message/OdpfMessage.java | 8 +++++ .../connectors/metrics/Instrumentation.java | 15 +++++++-- .../metrics/StatsDReporterBuilder.java | 28 +++++++++++----- .../connectors/bigquery/BigQuerySinkTest.java | 12 +++---- .../bigquery/TestOdpfMessageBuilder.java | 10 +++--- .../handler/MessageRecordConverterTest.java | 20 ++++++++++- .../BigqueryProtoUpdateListenerTest.java | 1 + .../odpf/sink/connectors/log/LogSinkTest.java | 12 +++---- 13 files changed, 123 insertions(+), 54 deletions(-) diff --git a/build.gradle b/build.gradle index 82b0d705..ce331953 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,7 @@ buildscript { } plugins { - id 'java' + id 'java-library' id 'idea' id 'checkstyle' id 'jacoco' @@ -21,10 +21,11 @@ plugins { } group 'io.odpf' -version '0.1.0' +version '0.1.1' repositories { mavenCentral() + mavenLocal() } dependencies { @@ -34,7 +35,7 @@ dependencies { implementation group: 'io.odpf', name: 'stencil', version: '0.1.6' exclude group: 'org.slf4j' implementation group: 'org.aeonbits.owner', name: 'owner', version: '1.0.9' implementation 'com.google.cloud:google-cloud-bigquery:1.115.0' - implementation "io.grpc:grpc-all:1.18.0" + implementation "io.grpc:grpc-all:1.38.0" implementation group: 'org.slf4j', name: 'jul-to-slf4j', version: '1.7.35' implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.2.1' implementation 'org.json:json:20220320' @@ -110,14 +111,24 @@ protobuf { } } -jar { - manifest { - attributes 'Main-Class': 'io.odpf.sink.Main' - duplicatesStrategy = 'exclude' +publishing { + publications { + maven(MavenPublication) { + groupId = project.group + artifactId = project.name + version = project.version + from components.java + } } - from { - - configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) } + repositories { + maven { + name = "GitHubPackages" + url = "https://maven.pkg.github.com/odpf/flow" + credentials { + username = System.getenv("GITHUB_ACTOR") + password = System.getenv("GITHUB_TOKEN") + } + } } } jacocoTestCoverageVerification { @@ -136,7 +147,7 @@ jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = 0.75 + minimum = 0.7 } } } diff --git a/src/main/java/io/odpf/sink/connectors/OdpfSinkResponse.java b/src/main/java/io/odpf/sink/connectors/OdpfSinkResponse.java index 4e273346..25d42177 100644 --- a/src/main/java/io/odpf/sink/connectors/OdpfSinkResponse.java +++ b/src/main/java/io/odpf/sink/connectors/OdpfSinkResponse.java @@ -2,26 +2,24 @@ import io.odpf.sink.connectors.error.ErrorInfo; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; public class OdpfSinkResponse { - private final Map> errors = new HashMap<>(); + private final Map errors = new HashMap<>(); /** - * Returns all errors as a map whose keys are indexes of rows that failed to be pushed. - * Each failed row index is associated with a non-empty list of {@link ErrorInfo}. + * Returns error as a map whose keys are indexes of messages that failed to be pushed. + * Each failed message index is associated with a {@link ErrorInfo}. */ - public Map> getErrors() { + public Map getErrors() { return errors; } /** - * Returns errors for the provided row index. If no error exists returns {@code null}. + * Returns error for the provided message index. If no error exists returns {@code null}. */ - public List getErrorsFor(long index) { + public ErrorInfo getErrorsFor(long index) { return errors.get(index); } @@ -29,7 +27,7 @@ public List getErrorsFor(long index) { * Adds an error for the index. */ public void addErrors(long index, ErrorInfo errorInfo) { - errors.computeIfAbsent(index, x -> new ArrayList<>()).add(errorInfo); + errors.put(index, errorInfo); } /** diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java index 624122c3..b5007497 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java @@ -40,8 +40,8 @@ public BigQuerySinkFactory(Map env, StatsDReporter statsDReporte public void init() { BigQuerySinkConfig sinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, config); try { - this.bigQueryClient = new BigQueryClient(sinkConfig, bigQueryMetrics, new Instrumentation(statsDReporter, BigQueryClient.class)); this.bigQueryMetrics = new BigQueryMetrics(sinkConfig); + this.bigQueryClient = new BigQueryClient(sinkConfig, bigQueryMetrics, new Instrumentation(statsDReporter, BigQueryClient.class)); this.recordConverterWrapper = new MessageRecordConverterCache(); OdpfStencilUpdateListener odpfStencilUpdateListener = BigqueryStencilUpdateListenerFactory.create(sinkConfig, bigQueryClient, recordConverterWrapper); OdpfMessageParser odpfMessageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter, odpfStencilUpdateListener); diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverter.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverter.java index e4f345fc..5d61938e 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverter.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverter.java @@ -1,7 +1,9 @@ package io.odpf.sink.connectors.bigquery.handler; +import com.google.api.client.util.DateTime; import io.odpf.sink.connectors.bigquery.models.Record; import io.odpf.sink.connectors.bigquery.models.Records; +import io.odpf.sink.connectors.common.TupleString; import io.odpf.sink.connectors.config.BigQuerySinkConfig; import io.odpf.sink.connectors.error.ErrorInfo; import io.odpf.sink.connectors.error.ErrorType; @@ -21,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @AllArgsConstructor @Slf4j @@ -70,10 +73,21 @@ private Record createRecord(OdpfMessage message, int index) { } private void addMetadata(Map columns, OdpfMessage message) { + List metadataColumnsTypes = config.getMetadataColumnsTypes(); + Map metadata = message.getMetadata(metadataColumnsTypes); + Map finalMetadata = metadataColumnsTypes.stream().collect(Collectors.toMap(TupleString::getFirst, t -> { + String key = t.getFirst(); + String dataType = t.getSecond(); + Object value = metadata.get(key); + if (value instanceof Long && dataType.equals("timestamp")) { + value = new DateTime((long) value); + } + return value; + })); if (config.getBqMetadataNamespace().isEmpty()) { - columns.putAll(message.getMetadata()); + columns.putAll(finalMetadata); } else { - columns.put(config.getBqMetadataNamespace(), message.getMetadata()); + columns.put(config.getBqMetadataNamespace(), finalMetadata); } } } diff --git a/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java b/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java index fdee35ca..95042d99 100644 --- a/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java +++ b/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java @@ -59,8 +59,8 @@ public interface BigQuerySinkConfig extends OdpfSinkConfig { @DefaultValue("asia-southeast1") String getBigQueryDatasetLocation(); - @DefaultValue("") @Key("SINK_BIGQUERY_METADATA_NAMESPACE") + @DefaultValue("") String getBqMetadataNamespace(); @DefaultValue("true") diff --git a/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java b/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java index 3c8cb0b0..25cde235 100644 --- a/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java +++ b/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java @@ -1,10 +1,12 @@ package io.odpf.sink.connectors.message; import io.odpf.sink.connectors.common.Tuple; +import io.odpf.sink.connectors.common.TupleString; import lombok.Getter; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -27,4 +29,10 @@ public OdpfMessage(Object logKey, Object logMessage, Tuple... tu this.logMessage = logMessage; Arrays.stream(tuples).forEach(t -> metadata.put(t.getFirst(), t.getSecond())); } + + public Map getMetadata(List metadataColumnsTypes) { + return metadataColumnsTypes.stream() + .collect(Collectors.toMap( + TupleString::getFirst, columnAndType -> metadata.get(columnAndType.getFirst()))); + } } diff --git a/src/main/java/io/odpf/sink/connectors/metrics/Instrumentation.java b/src/main/java/io/odpf/sink/connectors/metrics/Instrumentation.java index ee538056..9ae16c00 100644 --- a/src/main/java/io/odpf/sink/connectors/metrics/Instrumentation.java +++ b/src/main/java/io/odpf/sink/connectors/metrics/Instrumentation.java @@ -1,5 +1,6 @@ package io.odpf.sink.connectors.metrics; +import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +15,9 @@ * Handle logging and metric capturing. */ public class Instrumentation implements Closeable { + @Getter private final StatsDReporter statsDReporter; + @Getter private final Logger logger; /** @@ -68,6 +71,10 @@ public void captureCount(String metric, Long count, String... tags) { statsDReporter.captureCount(metric, count, tags); } + public void captureHistogram(String metric, Long count, String... tags) { + statsDReporter.captureHistogram(metric, count, tags); + } + public void incrementCounter(String metric, String... tags) { statsDReporter.increment(metric, tags); } @@ -80,16 +87,20 @@ public void captureDurationSince(String metric, Instant instant, String... tags) statsDReporter.captureDurationSince(metric, instant, tags); } + public void captureDuration(String metric, long duration, String... tags) { + statsDReporter.captureDuration(metric, duration, tags); + } + // =================== ERROR =================== - public void captureNonFatalError(String metric, Exception e, String template, Object... t) { + public void captureNonFatalError(String metric, Throwable e, String template, Object... t) { logger.warn(template, t); logger.warn(e.getMessage(), e); statsDReporter.recordEvent(metric, SinkMetrics.NON_FATAL_ERROR, errorTag(e, SinkMetrics.NON_FATAL_ERROR)); } - public void captureFatalError(String metric, Exception e, String template, Object... t) { + public void captureFatalError(String metric, Throwable e, String template, Object... t) { logger.error(template, t); logger.error(e.getMessage(), e); statsDReporter.recordEvent(metric, SinkMetrics.FATAL_ERROR, errorTag(e, SinkMetrics.FATAL_ERROR)); diff --git a/src/main/java/io/odpf/sink/connectors/metrics/StatsDReporterBuilder.java b/src/main/java/io/odpf/sink/connectors/metrics/StatsDReporterBuilder.java index 118cc4f4..b39b6e4a 100644 --- a/src/main/java/io/odpf/sink/connectors/metrics/StatsDReporterBuilder.java +++ b/src/main/java/io/odpf/sink/connectors/metrics/StatsDReporterBuilder.java @@ -4,22 +4,18 @@ import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import io.odpf.sink.connectors.config.MetricsConfig; -import lombok.Builder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import lombok.extern.slf4j.Slf4j; /** * StatsDReporterFactory *

* Create statsDReporter Instance. */ -@Builder +@Slf4j public class StatsDReporterBuilder { private MetricsConfig metricsConfig; private String[] extraTags; - private static final Logger LOGGER = LoggerFactory.getLogger(StatsDReporterBuilder.class); private static T[] append(T[] arr, T lastElement) { final int length = arr.length; @@ -28,6 +24,20 @@ private static T[] append(T[] arr, T lastElement) { return arr; } + public static StatsDReporterBuilder builder() { + return new StatsDReporterBuilder(); + } + + public StatsDReporterBuilder withMetricConfig(MetricsConfig config) { + this.metricsConfig = config; + return this; + } + + public StatsDReporterBuilder withExtraTags(String... tags) { + this.extraTags = tags; + return this; + } + private static T[] append(T[] arr, T[] second) { final int length = arr.length; arr = java.util.Arrays.copyOf(arr, length + second.length); @@ -47,10 +57,10 @@ private StatsDClient buildStatsDClient() { .hostname(metricsConfig.getMetricStatsDHost()) .port(metricsConfig.getMetricStatsDPort()) .build(); - LOGGER.info("NonBlocking StatsD client connection established"); + log.info("NonBlocking StatsD client connection established"); } catch (Exception e) { - LOGGER.warn("Exception on creating StatsD client, disabling StatsD and Audit client", e); - LOGGER.warn("Application is running without collecting any metrics!!!!!!!!"); + log.warn("Exception on creating StatsD client, disabling StatsD and Audit client", e); + log.warn("Application is running without collecting any metrics!!!!!!!!"); statsDClient = new NoOpStatsDClient(); } return statsDClient; diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java b/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java index 68c6f418..89172672 100644 --- a/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java +++ b/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java @@ -128,8 +128,8 @@ public void shouldReturnInvalidMessages() throws Exception { Assert.assertEquals(2, response.getErrors().size()); Mockito.verify(client, Mockito.times(1)).insertAll(rows); - Assert.assertEquals(ErrorType.DEFAULT_ERROR, response.getErrors().get(1L).get(0).getErrorType()); - Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, response.getErrors().get(3L).get(0).getErrorType()); + Assert.assertEquals(ErrorType.DEFAULT_ERROR, response.getErrors().get(1L).getErrorType()); + Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, response.getErrors().get(3L).getErrorType()); } @Test @@ -176,9 +176,9 @@ public void shouldReturnInvalidMessagesWithFailedInsertMessages() throws Excepti Assert.assertEquals(4, response.getErrors().size()); - Assert.assertEquals(ErrorType.SINK_UNKNOWN_ERROR, response.getErrors().get(0L).get(0).getErrorType()); - Assert.assertEquals(ErrorType.DEFAULT_ERROR, response.getErrors().get(1L).get(0).getErrorType()); - Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, response.getErrors().get(3L).get(0).getErrorType()); - Assert.assertEquals(ErrorType.SINK_4XX_ERROR, response.getErrors().get(4L).get(0).getErrorType()); + Assert.assertEquals(ErrorType.SINK_UNKNOWN_ERROR, response.getErrors().get(0L).getErrorType()); + Assert.assertEquals(ErrorType.DEFAULT_ERROR, response.getErrors().get(1L).getErrorType()); + Assert.assertEquals(ErrorType.INVALID_MESSAGE_ERROR, response.getErrors().get(3L).getErrorType()); + Assert.assertEquals(ErrorType.SINK_4XX_ERROR, response.getErrors().get(4L).getErrorType()); } } diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/TestOdpfMessageBuilder.java b/src/test/java/io/odpf/sink/connectors/bigquery/TestOdpfMessageBuilder.java index 4e50ae32..dcceeb5e 100644 --- a/src/test/java/io/odpf/sink/connectors/bigquery/TestOdpfMessageBuilder.java +++ b/src/test/java/io/odpf/sink/connectors/bigquery/TestOdpfMessageBuilder.java @@ -48,8 +48,9 @@ public OdpfMessage createConsumerRecord(String orderNumber, String orderUrl, Str new Tuple<>("message_topic", topic), new Tuple<>("message_partition", partition), new Tuple<>("message_offset", offset), - new Tuple<>("message_timestamp", new DateTime(timestamp)), - new Tuple<>("load_time", new DateTime(loadTime))); + new Tuple<>("message_timestamp", timestamp), + new Tuple<>("load_time", loadTime), + new Tuple<>("should_be_ignored", timestamp)); } public OdpfMessage createEmptyValueConsumerRecord(String orderNumber, String orderUrl) { @@ -63,8 +64,9 @@ public OdpfMessage createEmptyValueConsumerRecord(String orderNumber, String ord new Tuple<>("message_topic", topic), new Tuple<>("message_partition", partition), new Tuple<>("message_offset", offset), - new Tuple<>("message_timestamp", new DateTime(timestamp)), - new Tuple<>("load_time", new DateTime(loadTime))); + new Tuple<>("message_timestamp", timestamp), + new Tuple<>("load_time", loadTime), + new Tuple<>("should_be_ignored", timestamp)); } public static Map metadataColumns(TestMetadata testMetadata, Instant now) { diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverterTest.java b/src/test/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverterTest.java index 293941e6..89055b16 100644 --- a/src/test/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverterTest.java +++ b/src/test/java/io/odpf/sink/connectors/bigquery/handler/MessageRecordConverterTest.java @@ -7,6 +7,7 @@ import io.odpf.sink.connectors.TestMessage; import io.odpf.sink.connectors.bigquery.models.Record; import io.odpf.sink.connectors.bigquery.models.Records; +import io.odpf.sink.connectors.common.TupleString; import io.odpf.sink.connectors.config.BigQuerySinkConfig; import io.odpf.sink.connectors.common.Tuple; import io.odpf.sink.connectors.error.ErrorType; @@ -25,6 +26,7 @@ import java.io.IOException; import java.time.Instant; import java.util.*; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -39,6 +41,9 @@ public class MessageRecordConverterTest { @Before public void setUp() throws IOException { System.setProperty("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS", "io.odpf.sink.connectors.TestMessage"); + System.setProperty("SINK_BIGQUERY_METADATA_NAMESPACE", ""); + System.setProperty("SINK_BIGQUERY_METADATA_COLUMNS_TYPES", + "message_offset=integer,message_topic=string,load_time=timestamp,message_timestamp=timestamp,message_partition=integer"); stencilClient = Mockito.mock(ClassLoadStencilClient.class, CALLS_REAL_METHODS); Map descriptorsMap = new HashMap() {{ put(String.format("%s", TestMessage.class.getName()), TestMessage.getDescriptor()); @@ -278,7 +283,20 @@ public void shouldIgnoreUnknownFieldsIfTheConfigIsSet() throws IOException { List messages = Collections.singletonList(consumerRecord); Records records = recordConverter.convert(messages); - Record record = new Record(consumerRecord.getMetadata(), consumerRecord.getMetadata(), 0, null); + + BigQuerySinkConfig config = ConfigFactory.create(BigQuerySinkConfig.class, System.getProperties()); + List metadataColumnsTypes = config.getMetadataColumnsTypes(); + Map metadata = consumerRecord.getMetadata(); + Map finalMetadata = metadataColumnsTypes.stream().collect(Collectors.toMap(TupleString::getFirst, t -> { + String key = t.getFirst(); + String dataType = t.getSecond(); + Object value = metadata.get(key); + if (value instanceof Long && dataType.equals("timestamp")) { + value = new DateTime((long) value); + } + return value; + })); + Record record = new Record(consumerRecord.getMetadata(), finalMetadata, 0, null); assertEquals(1, records.getValidRecords().size()); assertEquals(0, records.getInvalidRecords().size()); assertEquals(record, records.getValidRecords().get(0)); diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/proto/BigqueryProtoUpdateListenerTest.java b/src/test/java/io/odpf/sink/connectors/bigquery/proto/BigqueryProtoUpdateListenerTest.java index 79ee1aa4..aad0ffcc 100644 --- a/src/test/java/io/odpf/sink/connectors/bigquery/proto/BigqueryProtoUpdateListenerTest.java +++ b/src/test/java/io/odpf/sink/connectors/bigquery/proto/BigqueryProtoUpdateListenerTest.java @@ -51,6 +51,7 @@ public class BigqueryProtoUpdateListenerTest { public void setUp() throws InvalidProtocolBufferException { System.setProperty("SINK_CONNECTOR_SCHEMA_MESSAGE_CLASS", "io.odpf.sink.connectors.TestKeyBQ"); System.setProperty("SINK_BIGQUERY_ENABLE_AUTO_SCHEMA_UPDATE", "false"); + System.setProperty("SINK_BIGQUERY_METADATA_NAMESPACE", ""); System.setProperty("SINK_BIGQUERY_METADATA_COLUMNS_TYPES", "topic=string,partition=integer,offset=integer"); config = ConfigFactory.create(BigQuerySinkConfig.class, System.getProperties()); converterWrapper = new MessageRecordConverterCache(); diff --git a/src/test/java/io/odpf/sink/connectors/log/LogSinkTest.java b/src/test/java/io/odpf/sink/connectors/log/LogSinkTest.java index 3d068076..0fcf66bc 100644 --- a/src/test/java/io/odpf/sink/connectors/log/LogSinkTest.java +++ b/src/test/java/io/odpf/sink/connectors/log/LogSinkTest.java @@ -18,7 +18,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -49,7 +48,7 @@ public void shouldProcessEmptyMessageWithNoError() throws IOException { LogSink logSink = new LogSink(config, odpfMessageParser, instrumentation); ArrayList messages = new ArrayList<>(); OdpfSinkResponse odpfSinkResponse = logSink.pushToSink(messages); - Map> errors = odpfSinkResponse.getErrors(); + Map errors = odpfSinkResponse.getErrors(); assertEquals(Collections.emptyMap(), errors); verify(odpfMessageParser, never()).parse(any(), any(), any()); @@ -74,7 +73,7 @@ public void shouldLogJsonMessages() throws OdpfSinkException { OdpfSinkResponse odpfSinkResponse = logSink.pushToSink(messages); //assert no error - Map> errors = odpfSinkResponse.getErrors(); + Map errors = odpfSinkResponse.getErrors(); assertEquals(Collections.emptyMap(), errors); //assert processed message @@ -102,11 +101,8 @@ public void shouldReturnErrorResponseAndProcessValidMessage() throws OdpfSinkExc OdpfSinkResponse odpfSinkResponse = logSink.pushToSink(messages); //assert error - Map> errors = odpfSinkResponse.getErrors(); - assertEquals(1, errors.size()); - List errorInfos = errors.get(1L); - assertEquals(1, errorInfos.size()); - assertEquals(ErrorType.DESERIALIZATION_ERROR, errorInfos.get(0).getErrorType()); + ErrorInfo error = odpfSinkResponse.getErrorsFor(1L); + assertEquals(ErrorType.DESERIALIZATION_ERROR, error.getErrorType()); //assert valid message processed ArgumentCaptor jsonStrCaptor = ArgumentCaptor.forClass(String.class); From cf07248b3e111a0d028a95b4f2fb11142322ce93 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Fri, 6 May 2022 17:10:48 +0800 Subject: [PATCH 2/2] chore: small refactoring --- .../sink/connectors/bigquery/handler/ErrorHandler.java | 3 ++- .../connectors/bigquery/handler/NoopErrorHandler.java | 9 --------- .../io/odpf/sink/connectors/message/OdpfMessage.java | 2 ++ 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java index ea29b916..c219e58c 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java @@ -7,5 +7,6 @@ import java.util.Map; public interface ErrorHandler { - void handle(Map> errorInfoMap, List records); + default void handle(Map> errorInfoMap, List records) { + } } diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java index 1039f4d4..20e6e118 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java @@ -1,16 +1,7 @@ package io.odpf.sink.connectors.bigquery.handler; -import com.google.cloud.bigquery.BigQueryError; -import io.odpf.sink.connectors.bigquery.models.Record; import lombok.extern.slf4j.Slf4j; -import java.util.List; -import java.util.Map; - @Slf4j public class NoopErrorHandler implements ErrorHandler { - @Override - public void handle(Map> errorInfoMap, List records) { - - } } diff --git a/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java b/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java index 25cde235..9aed0ad4 100644 --- a/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java +++ b/src/main/java/io/odpf/sink/connectors/message/OdpfMessage.java @@ -2,6 +2,7 @@ import io.odpf.sink.connectors.common.Tuple; import io.odpf.sink.connectors.common.TupleString; +import lombok.EqualsAndHashCode; import lombok.Getter; import java.util.Arrays; @@ -12,6 +13,7 @@ @Getter +@EqualsAndHashCode public class OdpfMessage { private final Object logKey; private final Object logMessage;