Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion source-mysql/.snapshots/TestGeneric-MissingTable
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@
# ================================
# Captures Terminated With Errors
# ================================
error updating capture state: table "test.generic_missingtable_two" is a configured binding of this capture, but doesn't exist or isn't visible with current permissions
the capture cannot run due to the following error(s):
- user "flow_capture" cannot read from table "test.generic_missingtable_two"

10 changes: 10 additions & 0 deletions source-mysql/.snapshots/TestPrerequisites-captureAB
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ================================
# Collection "test.Prerequisites_aaa": 2 Documents
# ================================
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"Prerequisites_aaa"}},"data":"hello","id":0}
{"_meta":{"op":"c","source":{"schema":"test","snapshot":true,"table":"Prerequisites_aaa"}},"data":"world","id":1}
# ================================
# Final State Checkpoint
# ================================
{"cursor":"binlog.000123:56789","streams":{"test.prerequisites_aaa":{"key_columns":["id"],"metadata":{"schema":{"columns":["id","data"],"types":{"data":"text","id":"int"}}},"mode":"Active"},"test.prerequisites_bbb":{"key_columns":["id"],"metadata":{"schema":{"columns":["id","data"],"types":{"data":"text","id":"int"}}},"mode":"Active"}}}

10 changes: 10 additions & 0 deletions source-mysql/.snapshots/TestPrerequisites-captureABC-fails
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# ================================
# Final State Checkpoint
# ================================

# ================================
# Captures Terminated With Errors
# ================================
the capture cannot run due to the following error(s):
- user "flow_capture" cannot read from table "test.prerequisites_ccc"

1 change: 1 addition & 0 deletions source-mysql/.snapshots/TestPrerequisites-validateAB
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
no error
2 changes: 2 additions & 0 deletions source-mysql/.snapshots/TestPrerequisites-validateABC-fails
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
the capture cannot run due to the following error(s):
- user "flow_capture" cannot read from table "test.prerequisites_ccc"
2 changes: 1 addition & 1 deletion source-mysql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (db *mysqlDatabase) WriteWatermark(ctx context.Context, watermark string) e
var query = fmt.Sprintf(`REPLACE INTO %s (slot, watermark) VALUES (?,?);`, db.config.Advanced.WatermarksTable)
var results, err = db.conn.Execute(query, db.config.Advanced.NodeID, watermark)
if err != nil {
return fmt.Errorf("error upserting new watermark for slot %q: %w", db.config.Advanced.NodeID, err)
return fmt.Errorf("error upserting new watermark for slot %d: %w", db.config.Advanced.NodeID, err)
}
results.Close()
return nil
Expand Down
32 changes: 16 additions & 16 deletions source-mysql/datatype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (

// TestDatatypes runs the discovery test on various datatypes.
func TestDatatypes(t *testing.T) {
var ctx = context.Background()
var tb, ctx = mysqlTestBackend(t), context.Background()

// Tell MySQL to act as though we're running in Chicago. This has an effect (in very
// different ways) on the processing of DATETIME and TIMESTAMP values.
TestBackend.Query(ctx, t, "SET GLOBAL time_zone = 'America/Chicago';")
TestBackend.Query(ctx, t, "SET SESSION time_zone = 'America/Chicago';")
tb.Query(ctx, t, "SET GLOBAL time_zone = 'America/Chicago';")
tb.Query(ctx, t, "SET SESSION time_zone = 'America/Chicago';")

tests.TestDatatypes(ctx, t, TestBackend, []tests.DatatypeTestCase{
tests.TestDatatypes(ctx, t, tb, []tests.DatatypeTestCase{
{ColumnType: "integer", ExpectType: `{"type":["integer","null"]}`, InputValue: 123, ExpectValue: `123`},
{ColumnType: "integer", ExpectType: `{"type":["integer","null"]}`, InputValue: nil, ExpectValue: `null`},
{ColumnType: "integer not null", ExpectType: `{"type":"integer"}`, InputValue: 123, ExpectValue: `123`},
Expand Down Expand Up @@ -112,13 +112,13 @@ func TestDatatypes(t *testing.T) {
}

func TestDatetimes(t *testing.T) {
var ctx = context.Background()
var tb, ctx = mysqlTestBackend(t), context.Background()

// In Chicago noon should map to 17:00 or 18:00 UTC in summer/winter respectively due to DST.
t.Run("chicago", func(t *testing.T) {
TestBackend.Query(ctx, t, "SET GLOBAL time_zone = 'America/Chicago';")
TestBackend.Query(ctx, t, "SET SESSION time_zone = 'America/Chicago';")
tests.TestDatatypes(ctx, t, TestBackend, []tests.DatatypeTestCase{
tb.Query(ctx, t, "SET GLOBAL time_zone = 'America/Chicago';")
tb.Query(ctx, t, "SET SESSION time_zone = 'America/Chicago';")
tests.TestDatatypes(ctx, t, tb, []tests.DatatypeTestCase{
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T17:34:56Z"`},
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1992-01-01 12:34:56", ExpectValue: `"1992-01-01T18:34:56Z"`},
{ColumnType: "timestamp", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T17:34:56Z"`},
Expand All @@ -128,9 +128,9 @@ func TestDatetimes(t *testing.T) {

// In fixed offset UTC-6 noon should always map to 18:00 UTC regardless of the time of the year.
t.Run("utc_minus_6", func(t *testing.T) {
TestBackend.Query(ctx, t, "SET GLOBAL time_zone = '-6:00';") // Leading zero deliberately omitted to make sure MySQL normalizes it into something we can parse
TestBackend.Query(ctx, t, "SET SESSION time_zone = '-6:00';")
tests.TestDatatypes(ctx, t, TestBackend, []tests.DatatypeTestCase{
tb.Query(ctx, t, "SET GLOBAL time_zone = '-6:00';") // Leading zero deliberately omitted to make sure MySQL normalizes it into something we can parse
tb.Query(ctx, t, "SET SESSION time_zone = '-6:00';")
tests.TestDatatypes(ctx, t, tb, []tests.DatatypeTestCase{
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T18:34:56Z"`},
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1992-01-01 12:34:56", ExpectValue: `"1992-01-01T18:34:56Z"`},
{ColumnType: "timestamp", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T18:34:56Z"`},
Expand All @@ -141,9 +141,9 @@ func TestDatetimes(t *testing.T) {
// In Manila noon should map to 04:00 UTC regardless of the time of the year, because Philippines
// Standard Time stopped observing DST in 1990.
t.Run("manila", func(t *testing.T) {
TestBackend.Query(ctx, t, "SET GLOBAL time_zone = 'Asia/Manila';")
TestBackend.Query(ctx, t, "SET SESSION time_zone = 'Asia/Manila';")
tests.TestDatatypes(ctx, t, TestBackend, []tests.DatatypeTestCase{
tb.Query(ctx, t, "SET GLOBAL time_zone = 'Asia/Manila';")
tb.Query(ctx, t, "SET SESSION time_zone = 'Asia/Manila';")
tests.TestDatatypes(ctx, t, tb, []tests.DatatypeTestCase{
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T04:34:56Z"`},
{ColumnType: "datetime", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1992-01-01 12:34:56", ExpectValue: `"1992-01-01T04:34:56Z"`},
{ColumnType: "timestamp", ExpectType: `{"type":["string","null"],"format":"date-time"}`, InputValue: "1991-08-31 12:34:56", ExpectValue: `"1991-08-31T04:34:56Z"`},
Expand All @@ -153,7 +153,7 @@ func TestDatetimes(t *testing.T) {
}

func TestScanKeyDatetimes(t *testing.T) {
var tb, ctx = TestBackend, context.Background()
var tb, ctx = mysqlTestBackend(t), context.Background()
var tableName = tb.CreateTable(ctx, t, "", "(ts DATETIME(3) PRIMARY KEY, data TEXT)")
tb.Insert(ctx, t, tableName, [][]interface{}{
{"1991-08-31 12:34:56.000", "aood"},
Expand All @@ -179,7 +179,7 @@ func TestScanKeyDatetimes(t *testing.T) {
}

func TestScanKeyTypes(t *testing.T) {
var tb, ctx = TestBackend, context.Background()
var tb, ctx = mysqlTestBackend(t), context.Background()
for _, tc := range []struct {
Name string
ColumnType string
Expand Down
10 changes: 4 additions & 6 deletions source-mysql/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ services:
target: /b.csv
- mysql_data:/var/lib/mysql
healthcheck:
test: ["CMD", "mysqladmin", "ping", "--silent"]
test: [ "CMD", "mysqladmin", "ping", "--silent" ]
environment:
MYSQL_DATABASE: test
MYSQL_ROOT_PASSWORD: flow
MYSQL_USER: flow
MYSQL_PASSWORD: flow
MYSQL_DATABASE: mysql
MYSQL_ROOT_PASSWORD: secret1234
networks:
- flow-test
cap_add:
- SYS_NICE # Prevents 'mbind: Operation not permitted' errors. In theory those are benign but it's better to not have them.
- SYS_NICE # Prevents 'mbind: Operation not permitted' errors. In theory those are benign but it's better to not have them.

networks:
flow-test:
Expand Down
12 changes: 8 additions & 4 deletions source-mysql/docker-entrypoint-initdb.d/init-user-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ echo "======================================="
echo "Initializing Database for Flow Captures"
echo "======================================="

mysql --user="root" --password="flow" --database="test" <<-EOSQL
CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
CREATE USER IF NOT EXISTS flow_capture IDENTIFIED BY 'secret';
mysql --user="root" --password="secret1234" --database="mysql" <<-EOSQL
CREATE USER IF NOT EXISTS flow_capture IDENTIFIED BY 'secret1234';

CREATE DATABASE IF NOT EXISTS test;

GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'flow_capture';
GRANT SELECT ON *.* TO 'flow_capture';

CREATE DATABASE IF NOT EXISTS flow;
CREATE TABLE IF NOT EXISTS flow.watermarks (slot INTEGER PRIMARY KEY, watermark TEXT);
GRANT INSERT, UPDATE, DELETE ON flow.watermarks TO 'flow_capture';
EOSQL
73 changes: 0 additions & 73 deletions source-mysql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,6 @@ func (db *mysqlDatabase) connect(ctx context.Context) error {
return fmt.Errorf("error setting session time_zone: %w", err)
}

// Sanity-check binlog retention and error out if it's insufficiently long.
// By doing this during the Connect operation it will occur both during
// actual captures and when performing discovery/config validation, which
// is likely what we want.
if !db.config.Advanced.SkipBinlogRetentionCheck {
expiryTime, err := getBinlogExpiry(conn)
if err != nil {
return fmt.Errorf("error querying binlog expiry time: %w", err)
}
if expiryTime < minimumExpiryTime {
return fmt.Errorf("binlog retention period is too short (go.estuary.dev/PoMlNf): server reports %s but at least %s is required (and 30 days is preferred wherever possible)", expiryTime.String(), minimumExpiryTime.String())
}
}

return nil
}

Expand Down Expand Up @@ -299,36 +285,6 @@ func queryTimeZone(conn *client.Conn) (*time.Location, error) {
return nil, fmt.Errorf("unknown or invalid time_zone %q: %w", tzName, errDatabaseTimezoneUnknown)
}

func getBinlogExpiry(conn *client.Conn) (time.Duration, error) {
// When running on Amazon RDS MySQL there's an RDS-specific configuration
// for binlog retention, so that takes precedence if it exists.
rdsRetentionHours, err := queryNumericVariable(conn, `SELECT name, value FROM mysql.rds_configuration WHERE name = 'binlog retention hours';`)
if err == nil {
return time.Duration(rdsRetentionHours) * time.Hour, nil
}

// The newer 'binlog_expire_logs_seconds' variable takes priority if it exists and is nonzero.
expireLogsSeconds, err := queryNumericVariable(conn, `SHOW VARIABLES LIKE 'binlog_expire_logs_seconds';`)
if err == nil && expireLogsSeconds > 0 {
return time.Duration(expireLogsSeconds * float64(time.Second)), nil
}

// And as the final resort we'll check 'expire_logs_days' if 'seconds' was zero or nonexistent.
expireLogsDays, err := queryNumericVariable(conn, `SHOW VARIABLES LIKE 'expire_logs_days';`)
if err != nil {
return 0, err
}
if expireLogsDays > 0 {
return time.Duration(expireLogsDays) * 24 * time.Hour, nil
}

// If both 'binlog_expire_logs_seconds' and 'expire_logs_days' are set to zero
// MySQL will not automatically purge binlog segments. For simplicity we just
// represent that as a 'one year' expiry time, since all we need the value for
// is to make sure it's not too short.
return 365 * 24 * time.Hour, nil
}

func queryStringVariable(conn *client.Conn, query string) (string, error) {
var results, err = conn.Execute(query)
if err != nil {
Expand All @@ -344,35 +300,6 @@ func queryStringVariable(conn *client.Conn, query string) (string, error) {
return fmt.Sprintf("%s", value.Value()), nil
}

func queryNumericVariable(conn *client.Conn, query string) (float64, error) {
var results, err = conn.Execute(query)
if err != nil {
return 0, fmt.Errorf("error executing query %q: %w", query, err)
}
if len(results.Values) == 0 {
return 0, fmt.Errorf("no results from query %q", query)
}

// Return the second column of the first row. It has to be the second
// column because that's how the `SHOW VARIABLES LIKE` query does it.
var value = &results.Values[0][1]
switch value.Type {
case mysql.FieldValueTypeNull:
return 0, nil
case mysql.FieldValueTypeString:
var n, err = strconv.ParseFloat(string(value.AsString()), 64)
if err != nil {
return 0, fmt.Errorf("couldn't parse string value as number: %w", err)
}
return n, nil
case mysql.FieldValueTypeUnsigned:
return float64(value.AsUint64()), nil
case mysql.FieldValueTypeSigned:
return float64(value.AsInt64()), nil
}
return value.AsFloat64(), nil
}

func (db *mysqlDatabase) Close(ctx context.Context) error {
if err := db.conn.Close(); err != nil {
return fmt.Errorf("error closing database connection: %w", err)
Expand Down
Loading