diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySink.java b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySink.java index 855707c7..1bc20f80 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySink.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySink.java @@ -3,7 +3,7 @@ import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache; -import io.odpf.sink.connectors.bigquery.error.ErrorHandler; +import io.odpf.sink.connectors.bigquery.handler.ErrorHandler; import io.odpf.sink.connectors.error.ErrorInfo; import io.odpf.sink.connectors.message.OdpfMessage; import io.odpf.sink.connectors.OdpfSink; @@ -64,7 +64,7 @@ public OdpfSinkResponse pushToSink(List messageList) { if (response.hasErrors()) { Map errorInfoMap = BigQueryResponseParser.parseAndFillOdpfSinkResponse(records.getValidRecords(), response, bigQueryMetrics, instrumentation); errorInfoMap.forEach(odpfSinkResponse::addErrors); - errorHandler.handle(errorInfoMap, records.getValidRecords()); + errorHandler.handle(response.getInsertErrors(), records.getValidRecords()); } } return odpfSinkResponse; diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java index 624122c3..f1ec87a2 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/BigQuerySinkFactory.java @@ -1,12 +1,13 @@ package io.odpf.sink.connectors.bigquery; import io.odpf.sink.connectors.OdpfSink; -import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache; -import io.odpf.sink.connectors.bigquery.error.ErrorHandler; import io.odpf.sink.connectors.bigquery.handler.BigQueryClient; import io.odpf.sink.connectors.bigquery.handler.BigQueryRow; import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithInsertId; import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithoutInsertId; +import io.odpf.sink.connectors.bigquery.handler.ErrorHandler; +import io.odpf.sink.connectors.bigquery.handler.ErrorHandlerFactory; +import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache; import io.odpf.sink.connectors.stencil.OdpfStencilUpdateListener; import io.odpf.sink.connectors.config.BigQuerySinkConfig; import io.odpf.sink.connectors.message.OdpfMessageParser; @@ -43,6 +44,7 @@ public void init() { this.bigQueryClient = new BigQueryClient(sinkConfig, bigQueryMetrics, new Instrumentation(statsDReporter, BigQueryClient.class)); this.bigQueryMetrics = new BigQueryMetrics(sinkConfig); this.recordConverterWrapper = new MessageRecordConverterCache(); + this.errorHandler = ErrorHandlerFactory.create(sinkConfig, bigQueryClient); OdpfStencilUpdateListener odpfStencilUpdateListener = BigqueryStencilUpdateListenerFactory.create(sinkConfig, bigQueryClient, recordConverterWrapper); OdpfMessageParser odpfMessageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter, odpfStencilUpdateListener); odpfStencilUpdateListener.setOdpfMessageParser(odpfMessageParser); @@ -57,6 +59,7 @@ public void init() { } } + public OdpfSink create() { return new BigQuerySink( bigQueryClient, diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/error/ErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/error/ErrorHandler.java deleted file mode 100644 index a737db44..00000000 --- a/src/main/java/io/odpf/sink/connectors/bigquery/error/ErrorHandler.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.odpf.sink.connectors.bigquery.error; - -import io.odpf.sink.connectors.bigquery.models.Record; -import io.odpf.sink.connectors.error.ErrorInfo; - -import java.util.List; -import java.util.Map; - -public interface ErrorHandler { - void handle(Map errorInfoMap, List records); -} diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java new file mode 100644 index 00000000..ea29b916 --- /dev/null +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandler.java @@ -0,0 +1,11 @@ +package io.odpf.sink.connectors.bigquery.handler; + +import com.google.cloud.bigquery.BigQueryError; +import io.odpf.sink.connectors.bigquery.models.Record; + +import java.util.List; +import java.util.Map; + +public interface ErrorHandler { + void handle(Map> errorInfoMap, List records); +} diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandlerFactory.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandlerFactory.java new file mode 100644 index 00000000..c47fea9e --- /dev/null +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/ErrorHandlerFactory.java @@ -0,0 +1,15 @@ +package io.odpf.sink.connectors.bigquery.handler; + +import io.odpf.sink.connectors.config.BigQuerySinkConfig; +import io.odpf.sink.connectors.config.enums.InputSchemaDataType; + +public class ErrorHandlerFactory { + public static ErrorHandler create(BigQuerySinkConfig sinkConfig, BigQueryClient bigQueryClient) { + if (InputSchemaDataType.JSON == sinkConfig.getSinkConnectorSchemaDataTye()) { + return new JsonErrorHandler( + bigQueryClient, + sinkConfig); + } + return new NoopErrorHandler(); + } +} diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandler.java new file mode 100644 index 00000000..9e9622f5 --- /dev/null +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandler.java @@ -0,0 +1,106 @@ +package io.odpf.sink.connectors.bigquery.handler; + +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import io.odpf.sink.connectors.bigquery.models.Record; +import io.odpf.sink.connectors.config.BigQuerySinkConfig; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.stream.Collectors; + +/* +the job of the class is to handle unknown field errors and then update the bq table schema, + this happens incase of where schema is inferred from incoming data + */ +public class JsonErrorHandler implements ErrorHandler { + + private final BigQueryClient bigQueryClient; + private final String tablePartitionKey; + private final boolean castAllColumnsToStringDataType; + + public JsonErrorHandler(BigQueryClient bigQueryClient, BigQuerySinkConfig bigQuerySinkConfig) { + + this.bigQueryClient = bigQueryClient; + this.tablePartitionKey = bigQuerySinkConfig.getTablePartitionKey(); + this.castAllColumnsToStringDataType = bigQuerySinkConfig.getOutputDefaultDatatypeStringEnable(); + } + + public void handle(Map> insertErrors, List records) { + + Schema schema = bigQueryClient.getSchema(); + FieldList existingFieldList = schema.getFields(); + List>> unknownFieldBqErrors = getUnknownFieldBqErrors(insertErrors); + if (!unknownFieldBqErrors.isEmpty()) { + Set missingFields = unknownFieldBqErrors + .parallelStream() + .map(x -> getColumnNamesForRecordsWhichHadUknownBqFieldErrors(records, x)) + .flatMap(keys -> keys.stream()) + .filter(key -> filterExistingFields(existingFieldList, key)) + .map(key -> getField(key)) + .collect(Collectors.toSet()); + ArrayList bqSchemaFields = new ArrayList<>(missingFields); + existingFieldList.iterator().forEachRemaining(bqSchemaFields::add); + bigQueryClient.upsertTable(bqSchemaFields); + } + } + + private Set getColumnNamesForRecordsWhichHadUknownBqFieldErrors(List records, Entry> x) { + Integer recordKey = x.getKey().intValue(); + return records.get(recordKey).getColumns().keySet(); + } + + + private List>> getUnknownFieldBqErrors(Map> insertErrors) { + List>> unkownFieldFieldBqError = insertErrors.entrySet().parallelStream() + .filter((x) -> { + List value = x.getValue(); + List bqErrorsWithNoSuchFields = getBqErrorsWithNoSuchFields(value); + if (!bqErrorsWithNoSuchFields.isEmpty()) { + return true; + } + return false; + + }).collect(Collectors.toList()); + return unkownFieldFieldBqError; + } + + private List getBqErrorsWithNoSuchFields(List value) { + return value.stream().filter( + bigQueryError -> { + if (bigQueryError.getReason().equals("invalid") + && bigQueryError.getMessage().contains("no such field")) { + return true; + } + return false; + } + ).collect(Collectors.toList()); + } + + + private Field getField(String key) { + if (tablePartitionKey != null && tablePartitionKey.equals(key)) { + return Field.of(key, LegacySQLTypeName.TIMESTAMP); + } + if (!castAllColumnsToStringDataType) { + throw new UnsupportedOperationException("only string data type is supported for fields other than partition key"); + } + return Field.of(key, LegacySQLTypeName.STRING); + } + + private boolean filterExistingFields(FieldList existingFieldList, String key) { + try { + existingFieldList.get(key); + return false; + } catch (IllegalArgumentException ex) { + return true; + } + } + +} diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java b/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java new file mode 100644 index 00000000..1039f4d4 --- /dev/null +++ b/src/main/java/io/odpf/sink/connectors/bigquery/handler/NoopErrorHandler.java @@ -0,0 +1,16 @@ +package io.odpf.sink.connectors.bigquery.handler; + +import com.google.cloud.bigquery.BigQueryError; +import io.odpf.sink.connectors.bigquery.models.Record; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.Map; + +@Slf4j +public class NoopErrorHandler implements ErrorHandler { + @Override + public void handle(Map> errorInfoMap, List records) { + + } +} diff --git a/src/main/java/io/odpf/sink/connectors/bigquery/models/Record.java b/src/main/java/io/odpf/sink/connectors/bigquery/models/Record.java index 4f2ab3e3..cbe5ae89 100644 --- a/src/main/java/io/odpf/sink/connectors/bigquery/models/Record.java +++ b/src/main/java/io/odpf/sink/connectors/bigquery/models/Record.java @@ -2,6 +2,7 @@ import io.odpf.sink.connectors.error.ErrorInfo; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -10,6 +11,7 @@ @AllArgsConstructor @Getter @EqualsAndHashCode +@Builder public class Record { private final Map metadata; private final Map columns; diff --git a/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java b/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java index fdee35ca..be67075c 100644 --- a/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java +++ b/src/main/java/io/odpf/sink/connectors/config/BigQuerySinkConfig.java @@ -73,5 +73,8 @@ public interface BigQuerySinkConfig extends OdpfSinkConfig { @Separator(ConfToListConverter.ELEMENT_SEPARATOR) List getMetadataColumnsTypes(); + @DefaultValue("true") + @Key("SINK_BIGQUERY_JSON_OUTPUT_DEFAULT_DATATYPE_STRING_ENABLE") + boolean getOutputDefaultDatatypeStringEnable(); } diff --git a/src/main/java/io/odpf/sink/connectors/error/ErrorInfo.java b/src/main/java/io/odpf/sink/connectors/error/ErrorInfo.java index 6cea305f..074657b0 100644 --- a/src/main/java/io/odpf/sink/connectors/error/ErrorInfo.java +++ b/src/main/java/io/odpf/sink/connectors/error/ErrorInfo.java @@ -1,10 +1,12 @@ package io.odpf.sink.connectors.error; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; @AllArgsConstructor @Data +@Builder public class ErrorInfo { private Exception exception; private ErrorType errorType; diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java b/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java index 68c6f418..fea670e9 100644 --- a/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java +++ b/src/test/java/io/odpf/sink/connectors/bigquery/BigQuerySinkTest.java @@ -5,12 +5,12 @@ import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.TableId; import io.odpf.sink.connectors.OdpfSinkResponse; -import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverter; -import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache; -import io.odpf.sink.connectors.bigquery.error.ErrorHandler; import io.odpf.sink.connectors.bigquery.handler.BigQueryClient; import io.odpf.sink.connectors.bigquery.handler.BigQueryRow; import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithInsertId; +import io.odpf.sink.connectors.bigquery.handler.ErrorHandler; +import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverter; +import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache; import io.odpf.sink.connectors.bigquery.models.Record; import io.odpf.sink.connectors.bigquery.models.Records; import io.odpf.sink.connectors.error.ErrorInfo; @@ -173,7 +173,7 @@ public void shouldReturnInvalidMessagesWithFailedInsertMessages() throws Excepti OdpfSinkResponse response = sink.pushToSink(messages); Mockito.verify(client, Mockito.times(1)).insertAll(rows); - + Mockito.verify(errorHandler, Mockito.times(1)).handle(Mockito.eq(insertErrorsMap), Mockito.any()); Assert.assertEquals(4, response.getErrors().size()); Assert.assertEquals(ErrorType.SINK_UNKNOWN_ERROR, response.getErrors().get(0L).get(0).getErrorType()); diff --git a/src/test/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandlerTest.java b/src/test/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandlerTest.java new file mode 100644 index 00000000..f3b4c28f --- /dev/null +++ b/src/test/java/io/odpf/sink/connectors/bigquery/handler/JsonErrorHandlerTest.java @@ -0,0 +1,345 @@ +package io.odpf.sink.connectors.bigquery.handler; + +import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.Schema; +import io.odpf.sink.connectors.bigquery.models.Record; +import io.odpf.sink.connectors.config.BigQuerySinkConfig; +import org.aeonbits.owner.ConfigFactory; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class JsonErrorHandlerTest { + + private final Schema emptyTableSchema = Schema.of(); + + private BigQuerySinkConfig bigQuerySinkConfig = ConfigFactory.create(BigQuerySinkConfig.class, Collections.emptyMap()); + + @Mock + private BigQueryClient bigQueryClient; + + @Captor + private ArgumentCaptor> fieldsArgumentCaptor; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + + } + + @Test + public void shouldUpdateTableFieldsOnSchemaError() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + Map> insertErrors = new HashMap<>(); + BigQueryError bigQueryError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + insertErrors.put(0L, asList(bigQueryError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecord = Record.builder().columns(columnsMap).build(); + + List records = new ArrayList<>(); + records.add(validRecord); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + + jsonErrorHandler.handle(insertErrors, records); + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field firstName = Field.of("first_name", LegacySQLTypeName.STRING); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(firstName)); + } + + @Test + public void shouldNotUpdateTableWhenNoSchemaError() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + Map> insertErrors = new HashMap<>(); + BigQueryError serverError = new BigQueryError("otherresons", "planet eart", "server error"); + BigQueryError anotherError = new BigQueryError("otherresons", "planet eart", "server error"); + insertErrors.put(0L, asList(serverError, anotherError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecord = Record.builder().columns(columnsMap).build(); + + List records = new ArrayList<>(); + records.add(validRecord); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(insertErrors, records); + + verify(bigQueryClient, never()).upsertTable(any()); + + } + + @Test + public void shouldUpdateTableFieldsForMultipleRecords() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + + Map> errorInfoMap = new HashMap<>(); + BigQueryError firstNameNotFoundError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + BigQueryError anotherError = new BigQueryError("otherresons", "planet eart", "some error"); + BigQueryError lastNameNotFoundError = new BigQueryError("invalid", "first_name", "no such field: last_name"); + errorInfoMap.put(0L, asList(firstNameNotFoundError, anotherError)); + errorInfoMap.put(1L, asList(lastNameNotFoundError)); + + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecordWithFirstName = Record.builder().columns(columnsMap).build(); + + Map columnsMapWithLastName = new HashMap<>(); + columnsMapWithLastName.put("last_name", "john carmack"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithLastName).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field firstName = Field.of("first_name", LegacySQLTypeName.STRING); + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(firstName, lastName)); + } + + @Test + public void shouldIngoreRecordsWhichHaveOtherErrors() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + BigQueryError otherError = new BigQueryError("otherresons", "planet eart", "server error"); + errorInfoMap.put(1L, asList(noSuchFieldError, otherError)); + errorInfoMap.put(0L, asList(otherError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecordWithFirstName = Record.builder().columns(columnsMap).build(); + + Map columnsMapWithLastName = new HashMap<>(); + columnsMapWithLastName.put("last_name", "john carmack"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithLastName).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(lastName)); + } + + @Test + public void shouldIngoreRecordsWithNoErrors() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + errorInfoMap.put(1L, asList(noSuchFieldError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecordWithFirstName = Record.builder().columns(columnsMap).build(); + + Map columnsMapWithLastName = new HashMap<>(); + columnsMapWithLastName.put("last_name", "john carmack"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithLastName).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(lastName)); + } + + @Test + public void shouldUpdateOnlyUniqueFields() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + errorInfoMap.put(0L, asList(noSuchFieldError)); + errorInfoMap.put(1L, asList(noSuchFieldError)); + errorInfoMap.put(2L, asList(noSuchFieldError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecordWithFirstName = Record.builder().columns(columnsMap).build(); + + Map columnsMapWithLastName = new HashMap<>(); + columnsMapWithLastName.put("last_name", "john carmack"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithLastName).build(); + Record anotheRecordWithLastName = Record.builder().columns(columnsMapWithLastName).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + validRecords.add(anotheRecordWithLastName); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + Field firstName = Field.of("first_name", LegacySQLTypeName.STRING); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(firstName, lastName)); + } + + @Test + public void shouldUpdatWithBothMissingFieldsAndExistingTableFields() { + //existing table fields + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + Field firstName = Field.of("first_name", LegacySQLTypeName.STRING); + + Schema nonEmptyTableSchema = Schema.of(firstName, lastName); + when(bigQueryClient.getSchema()).thenReturn(nonEmptyTableSchema); + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + errorInfoMap.put(0L, asList(noSuchFieldError)); + errorInfoMap.put(1L, asList(noSuchFieldError)); + errorInfoMap.put(2L, asList(noSuchFieldError)); + + Map columnsMapWithFistName = new HashMap<>(); + columnsMapWithFistName.put("first_name", "john doe"); + columnsMapWithFistName.put("newFieldAddress", "planet earth"); + Record validRecordWithFirstName = Record.builder().columns(columnsMapWithFistName).build(); + + Map columnsMapWithNewFieldDog = new HashMap<>(); + columnsMapWithNewFieldDog.put("newFieldDog", "golden retriever"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithNewFieldDog).build(); + Record anotheRecordWithLastName = Record.builder().columns(columnsMapWithNewFieldDog).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + validRecords.add(anotheRecordWithLastName); + + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, bigQuerySinkConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + //missing fields + Field newFieldDog = Field.of("newFieldDog", LegacySQLTypeName.STRING); + Field newFieldAddress = Field.of("newFieldAddress", LegacySQLTypeName.STRING); + + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(firstName, lastName, newFieldDog, newFieldAddress)); + } + + @Test + public void shouldUpsertTableWithPartitionKeyTimestampField() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + errorInfoMap.put(0L, asList(noSuchFieldError)); + errorInfoMap.put(1L, asList(noSuchFieldError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecordWithFirstName = Record.builder().columns(columnsMap).build(); + + Map columnsMapWithTimestamp = new HashMap<>(); + columnsMapWithTimestamp.put("last_name", "john carmack"); + columnsMapWithTimestamp.put("event_timestamp_partition", "today's date"); + Record validRecordWithLastName = Record.builder().columns(columnsMapWithTimestamp).build(); + + List validRecords = new ArrayList<>(); + validRecords.add(validRecordWithFirstName); + validRecords.add(validRecordWithLastName); + + Map envMap = new HashMap() {{ + put("SINK_BIGQUERY_TABLE_PARTITION_KEY", "event_timestamp_partition"); + }}; + BigQuerySinkConfig partitionKeyConfig = ConfigFactory.create(BigQuerySinkConfig.class, envMap); + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, partitionKeyConfig); + jsonErrorHandler.handle(errorInfoMap, validRecords); + + + verify(bigQueryClient, times(1)).upsertTable((List) fieldsArgumentCaptor.capture()); + + Field firstName = Field.of("first_name", LegacySQLTypeName.STRING); + Field lastName = Field.of("last_name", LegacySQLTypeName.STRING); + Field eventTimestamp = Field.of("event_timestamp_partition", LegacySQLTypeName.TIMESTAMP); + List actualFields = fieldsArgumentCaptor.getValue(); + assertThat(actualFields, containsInAnyOrder(firstName, lastName, eventTimestamp)); + } + + @Test + public void shouldThrowExceptionWhenCastFieldsToStringNotTrue() { + when(bigQueryClient.getSchema()).thenReturn(emptyTableSchema); + + + Map> errorInfoMap = new HashMap<>(); + BigQueryError noSuchFieldError = new BigQueryError("invalid", "first_name", "no such field: first_name"); + errorInfoMap.put(0L, asList(noSuchFieldError)); + + Map columnsMap = new HashMap<>(); + columnsMap.put("first_name", "john doe"); + Record validRecord = Record.builder().columns(columnsMap).build(); + + List records = new ArrayList<>(); + records.add(validRecord); + Map envMap = new HashMap() {{ + put("SINK_BIGQUERY_JSON_OUTPUT_DEFAULT_DATATYPE_STRING_ENABLE", "false"); + }}; + BigQuerySinkConfig stringDisableConfig = ConfigFactory.create(BigQuerySinkConfig.class, envMap); + JsonErrorHandler jsonErrorHandler = new JsonErrorHandler(bigQueryClient, stringDisableConfig); + assertThrows(UnsupportedOperationException.class, () -> { + jsonErrorHandler.handle(errorInfoMap, records); + }); + + verify(bigQueryClient, never()).upsertTable((List) any()); + } +} +