Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache;
import io.odpf.sink.connectors.bigquery.error.ErrorHandler;
import io.odpf.sink.connectors.bigquery.handler.ErrorHandler;
import io.odpf.sink.connectors.error.ErrorInfo;
import io.odpf.sink.connectors.message.OdpfMessage;
import io.odpf.sink.connectors.OdpfSink;
Expand Down Expand Up @@ -64,7 +64,7 @@ public OdpfSinkResponse pushToSink(List<OdpfMessage> messageList) {
if (response.hasErrors()) {
Map<Long, ErrorInfo> errorInfoMap = BigQueryResponseParser.parseAndFillOdpfSinkResponse(records.getValidRecords(), response, bigQueryMetrics, instrumentation);
errorInfoMap.forEach(odpfSinkResponse::addErrors);
errorHandler.handle(errorInfoMap, records.getValidRecords());
errorHandler.handle(response.getInsertErrors(), records.getValidRecords());
}
}
return odpfSinkResponse;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.odpf.sink.connectors.bigquery;

import io.odpf.sink.connectors.OdpfSink;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache;
import io.odpf.sink.connectors.bigquery.error.ErrorHandler;
import io.odpf.sink.connectors.bigquery.handler.BigQueryClient;
import io.odpf.sink.connectors.bigquery.handler.BigQueryRow;
import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithInsertId;
import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithoutInsertId;
import io.odpf.sink.connectors.bigquery.handler.ErrorHandler;
import io.odpf.sink.connectors.bigquery.handler.ErrorHandlerFactory;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache;
import io.odpf.sink.connectors.stencil.OdpfStencilUpdateListener;
import io.odpf.sink.connectors.config.BigQuerySinkConfig;
import io.odpf.sink.connectors.message.OdpfMessageParser;
Expand Down Expand Up @@ -43,6 +44,7 @@ public void init() {
this.bigQueryClient = new BigQueryClient(sinkConfig, bigQueryMetrics, new Instrumentation(statsDReporter, BigQueryClient.class));
this.bigQueryMetrics = new BigQueryMetrics(sinkConfig);
this.recordConverterWrapper = new MessageRecordConverterCache();
this.errorHandler = ErrorHandlerFactory.create(sinkConfig, bigQueryClient);
OdpfStencilUpdateListener odpfStencilUpdateListener = BigqueryStencilUpdateListenerFactory.create(sinkConfig, bigQueryClient, recordConverterWrapper);
OdpfMessageParser odpfMessageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter, odpfStencilUpdateListener);
odpfStencilUpdateListener.setOdpfMessageParser(odpfMessageParser);
Expand All @@ -57,6 +59,7 @@ public void init() {
}
}


public OdpfSink create() {
return new BigQuerySink(
bigQueryClient,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.odpf.sink.connectors.bigquery.handler;

import com.google.cloud.bigquery.BigQueryError;
import io.odpf.sink.connectors.bigquery.models.Record;

import java.util.List;
import java.util.Map;

public interface ErrorHandler {
void handle(Map<Long, List<BigQueryError>> errorInfoMap, List<Record> records);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.odpf.sink.connectors.bigquery.handler;

import io.odpf.sink.connectors.config.BigQuerySinkConfig;
import io.odpf.sink.connectors.config.enums.InputSchemaDataType;

public class ErrorHandlerFactory {
public static ErrorHandler create(BigQuerySinkConfig sinkConfig, BigQueryClient bigQueryClient) {
if (InputSchemaDataType.JSON == sinkConfig.getSinkConnectorSchemaDataTye()) {
return new JsonErrorHandler(
bigQueryClient,
sinkConfig);
}
return new NoopErrorHandler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package io.odpf.sink.connectors.bigquery.handler;

import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import io.odpf.sink.connectors.bigquery.models.Record;
import io.odpf.sink.connectors.config.BigQuerySinkConfig;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;

/*
the job of the class is to handle unknown field errors and then update the bq table schema,
this happens incase of where schema is inferred from incoming data
*/
public class JsonErrorHandler implements ErrorHandler {

private final BigQueryClient bigQueryClient;
private final String tablePartitionKey;
private final boolean castAllColumnsToStringDataType;

public JsonErrorHandler(BigQueryClient bigQueryClient, BigQuerySinkConfig bigQuerySinkConfig) {

this.bigQueryClient = bigQueryClient;
this.tablePartitionKey = bigQuerySinkConfig.getTablePartitionKey();
this.castAllColumnsToStringDataType = bigQuerySinkConfig.getOutputDefaultDatatypeStringEnable();
}

public void handle(Map<Long, List<BigQueryError>> insertErrors, List<Record> records) {

Schema schema = bigQueryClient.getSchema();
FieldList existingFieldList = schema.getFields();
List<Entry<Long, List<BigQueryError>>> unknownFieldBqErrors = getUnknownFieldBqErrors(insertErrors);
if (!unknownFieldBqErrors.isEmpty()) {
Set<Field> missingFields = unknownFieldBqErrors
.parallelStream()
.map(x -> getColumnNamesForRecordsWhichHadUknownBqFieldErrors(records, x))
.flatMap(keys -> keys.stream())
.filter(key -> filterExistingFields(existingFieldList, key))
.map(key -> getField(key))
.collect(Collectors.toSet());
ArrayList<Field> bqSchemaFields = new ArrayList<>(missingFields);
existingFieldList.iterator().forEachRemaining(bqSchemaFields::add);
bigQueryClient.upsertTable(bqSchemaFields);
}
}

private Set<String> getColumnNamesForRecordsWhichHadUknownBqFieldErrors(List<Record> records, Entry<Long, List<BigQueryError>> x) {
Integer recordKey = x.getKey().intValue();
return records.get(recordKey).getColumns().keySet();
}


private List<Entry<Long, List<BigQueryError>>> getUnknownFieldBqErrors(Map<Long, List<BigQueryError>> insertErrors) {
List<Entry<Long, List<BigQueryError>>> unkownFieldFieldBqError = insertErrors.entrySet().parallelStream()
.filter((x) -> {
List<BigQueryError> value = x.getValue();
List<BigQueryError> bqErrorsWithNoSuchFields = getBqErrorsWithNoSuchFields(value);
if (!bqErrorsWithNoSuchFields.isEmpty()) {
return true;
}
return false;

}).collect(Collectors.toList());
return unkownFieldFieldBqError;
}

private List<BigQueryError> getBqErrorsWithNoSuchFields(List<BigQueryError> value) {
return value.stream().filter(
bigQueryError -> {
if (bigQueryError.getReason().equals("invalid")
&& bigQueryError.getMessage().contains("no such field")) {
return true;
}
return false;
}
).collect(Collectors.toList());
}


private Field getField(String key) {
if (tablePartitionKey != null && tablePartitionKey.equals(key)) {
return Field.of(key, LegacySQLTypeName.TIMESTAMP);
}
if (!castAllColumnsToStringDataType) {
throw new UnsupportedOperationException("only string data type is supported for fields other than partition key");
}
return Field.of(key, LegacySQLTypeName.STRING);
}

private boolean filterExistingFields(FieldList existingFieldList, String key) {
try {
existingFieldList.get(key);
return false;
} catch (IllegalArgumentException ex) {
return true;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.odpf.sink.connectors.bigquery.handler;

import com.google.cloud.bigquery.BigQueryError;
import io.odpf.sink.connectors.bigquery.models.Record;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;

@Slf4j
public class NoopErrorHandler implements ErrorHandler {
@Override
public void handle(Map<Long, List<BigQueryError>> errorInfoMap, List<Record> records) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.odpf.sink.connectors.error.ErrorInfo;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;

Expand All @@ -10,6 +11,7 @@
@AllArgsConstructor
@Getter
@EqualsAndHashCode
@Builder
public class Record {
private final Map<String, Object> metadata;
private final Map<String, Object> columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,8 @@ public interface BigQuerySinkConfig extends OdpfSinkConfig {
@Separator(ConfToListConverter.ELEMENT_SEPARATOR)
List<TupleString> getMetadataColumnsTypes();

@DefaultValue("true")
@Key("SINK_BIGQUERY_JSON_OUTPUT_DEFAULT_DATATYPE_STRING_ENABLE")
boolean getOutputDefaultDatatypeStringEnable();
}

2 changes: 2 additions & 0 deletions src/main/java/io/odpf/sink/connectors/error/ErrorInfo.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.odpf.sink.connectors.error;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@AllArgsConstructor
@Data
@Builder
public class ErrorInfo {
private Exception exception;
private ErrorType errorType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import io.odpf.sink.connectors.OdpfSinkResponse;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverter;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache;
import io.odpf.sink.connectors.bigquery.error.ErrorHandler;
import io.odpf.sink.connectors.bigquery.handler.BigQueryClient;
import io.odpf.sink.connectors.bigquery.handler.BigQueryRow;
import io.odpf.sink.connectors.bigquery.handler.BigQueryRowWithInsertId;
import io.odpf.sink.connectors.bigquery.handler.ErrorHandler;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverter;
import io.odpf.sink.connectors.bigquery.handler.MessageRecordConverterCache;
import io.odpf.sink.connectors.bigquery.models.Record;
import io.odpf.sink.connectors.bigquery.models.Records;
import io.odpf.sink.connectors.error.ErrorInfo;
Expand Down Expand Up @@ -173,7 +173,7 @@ public void shouldReturnInvalidMessagesWithFailedInsertMessages() throws Excepti

OdpfSinkResponse response = sink.pushToSink(messages);
Mockito.verify(client, Mockito.times(1)).insertAll(rows);

Mockito.verify(errorHandler, Mockito.times(1)).handle(Mockito.eq(insertErrorsMap), Mockito.any());
Assert.assertEquals(4, response.getErrors().size());

Assert.assertEquals(ErrorType.SINK_UNKNOWN_ERROR, response.getErrors().get(0L).get(0).getErrorType());
Expand Down
Loading