Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
GOOS := $(if $(GOOS),$(GOOS),linux)
GOARCH := $(if $(GOARCH),$(GOARCH),amd64)
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GO111MODULE=on go
GO=GO15VENDOREXPERIMENT="1" CGO_ENABLED=0 GOARCH=$(GOARCH) GO111MODULE=on go

PACKAGE_LIST := go list ./...| grep -vE "cmd"
PACKAGES := $$($(PACKAGE_LIST))
FILES_TO_FMT := $(shell find . -path -prune -o -name '*.go' -print)

LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.ReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/go-tpc/pkg/util.BuildHash=$(shell git rev-parse HEAD)"

GOBUILD=$(GO) build -ldflags '$(LDFLAGS)'

# Image URL to use all building/pushing image targets
IMG ?= go-tpc:latest

Expand All @@ -22,7 +27,7 @@ test:
go test ./... -cover $(PACKAGES)

build: mod
go build -o ./bin/go-tpc cmd/go-tpc/*
$(GOBUILD) -o ./bin/go-tpc cmd/go-tpc/*

vet:
go vet ./...
Expand Down
6 changes: 3 additions & 3 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error {
return err
}
sqlLoader := map[dbgen.Table]dbgen.Loader{
dbgen.TSupp: tpch.NewSuppLoader(ctx, s.Conn),
dbgen.TNation: tpch.NewNationLoader(ctx, s.Conn),
dbgen.TRegion: tpch.NewRegionLoader(ctx, s.Conn),
dbgen.TSupp: tpch.NewSuppLoader(ctx, w.db),
dbgen.TNation: tpch.NewNationLoader(ctx, w.db),
dbgen.TRegion: tpch.NewRegionLoader(ctx, w.db),
}
dbgen.InitDbGen(1)
if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TSupp}); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func main() {

cobra.EnablePrefixMatching = true

registerVersionInfo(rootCmd)
registerTpcc(rootCmd)
registerTpch(rootCmd)
registerCHBenchmark(rootCmd)
Expand Down
3 changes: 3 additions & 0 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac
go func(index int) {
defer wg.Done()
if err := execute(ctx, w, action, threads, index); err != nil {
if action == "prepare" {
panic(fmt.Sprintf("a fatal occurred when preparing data: %v", err))
}
fmt.Printf("execute %s failed, err %v\n", action, err)
return
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "net/http/pprof"
"os"
"runtime"
"time"

"github.com/pingcap/go-tpc/pkg/measurement"
"github.com/pingcap/go-tpc/pkg/workload"
Expand Down Expand Up @@ -90,6 +91,8 @@ func registerTpcc(root *cobra.Command) {
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.OutputDir, "output-dir", "", "Output directory for generating file if specified")
cmdPrepare.PersistentFlags().StringVar(&tpccConfig.SpecifiedTables, "tables", "", "Specified tables for "+
"generating file, separated by ','. Valid only if output is set. If this flag is not set, generate all tables by default")
cmdPrepare.PersistentFlags().IntVar(&tpccConfig.PrepareReCommitCount, "retry-count", 50, "Retry count when errors occur")
cmdPrepare.PersistentFlags().DurationVar(&tpccConfig.PrepareReCommitDuration, "retry-duration", 10*time.Second, "The duration for each retry")

var cmdRun = &cobra.Command{
Use: "run",
Expand Down
25 changes: 25 additions & 0 deletions cmd/go-tpc/versioninfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package main

import (
"fmt"

"github.com/spf13/cobra"

"github.com/pingcap/go-tpc/pkg/util"
)

func printVersion() {
fmt.Println("Git Commit Hash:", util.BuildHash)
fmt.Println("UTC Build Time:", util.BuildTS)
fmt.Println("Release version:", util.ReleaseVersion)
}

func registerVersionInfo(root *cobra.Command) {
cmd := &cobra.Command{
Use: "version",
Run: func(cmd *cobra.Command, args []string) {
printVersion()
},
}
root.AddCommand(cmd)
}
35 changes: 29 additions & 6 deletions pkg/load/batch_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import (
"context"
"database/sql"
"encoding/csv"
"fmt"
"os"
"strings"
"time"
)

const (
Expand All @@ -20,17 +23,23 @@ type BatchLoader interface {
// SQLBatchLoader helps us insert in batch
type SQLBatchLoader struct {
insertHint string
conn *sql.Conn
db *sql.DB
buf bytes.Buffer
count int

// loader retry
retryCount int
retryDuration time.Duration
}

// NewSQLBatchLoader creates a batch loader for database connection
func NewSQLBatchLoader(conn *sql.Conn, hint string) *SQLBatchLoader {
func NewSQLBatchLoader(db *sql.DB, hint string, retryCount int, retryDuration time.Duration) *SQLBatchLoader {
return &SQLBatchLoader{
count: 0,
insertHint: hint,
conn: conn,
count: 0,
insertHint: hint,
db: db,
retryCount: retryCount,
retryDuration: retryDuration,
}
}

Expand Down Expand Up @@ -59,7 +68,21 @@ func (b *SQLBatchLoader) Flush(ctx context.Context) error {
return nil
}

_, err := b.conn.ExecContext(ctx, b.buf.String())
var err error
for i := 0; i < 1+b.retryCount; i++ {
_, err = b.db.ExecContext(ctx, b.buf.String())
if err == nil || (strings.Contains(err.Error(), "Error 1062: Duplicate entry") && i == 0) {
break
}
if i < b.retryCount {
fmt.Printf("exec statement error: %v, may try again later...\n", err)
time.Sleep(b.retryDuration)
}
}
if err != nil {
return fmt.Errorf("exec statement error: %v", err)
}

b.count = 0
b.buf.Reset()

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/versioninfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package util

// Version information
var (
ReleaseVersion string
BuildTS string
BuildHash string
)
18 changes: 9 additions & 9 deletions tpcc/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (w *Workloader) loadItem(ctx context.Context) error {
s := getTPCCState(ctx)
hint := "INSERT INTO item (i_id, i_im_id, i_name, i_price, i_data) VALUES "

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < maxItems; i++ {
s.Buf.Reset()
Expand All @@ -50,7 +50,7 @@ func (w *Workloader) loadWarehouse(ctx context.Context, warehouse int) error {
s := getTPCCState(ctx)
hint := "INSERT INTO warehouse (w_id, w_name, w_street_1, w_street_2, w_city, w_state, w_zip, w_tax, w_ytd) VALUES "

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

wName := randChars(s.R, s.Buf, 6, 10)
wStree1 := randChars(s.R, s.Buf, 10, 20)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (w *Workloader) loadStock(ctx context.Context, warehouse int) error {
s_dist_01, s_dist_02, s_dist_03, s_dist_04, s_dist_05, s_dist_06,
s_dist_07, s_dist_08, s_dist_09, s_dist_10, s_ytd, s_order_cnt, s_remote_cnt, s_data) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < stockPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -122,7 +122,7 @@ func (w *Workloader) loadDistrict(ctx context.Context, warehouse int) error {
hint := `INSERT INTO district (d_id, d_w_id, d_name, d_street_1, d_street_2,
d_city, d_state, d_zip, d_tax, d_ytd, d_next_o_id) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < districtPerWarehouse; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -158,7 +158,7 @@ func (w *Workloader) loadCustomer(ctx context.Context, warehouse int, district i
c_street_1, c_street_2, c_city, c_state, c_zip, c_phone, c_since, c_credit, c_credit_lim,
c_discount, c_balance, c_ytd_payment, c_payment_cnt, c_delivery_cnt, c_data) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < customerPerDistrict; i++ {
s.Buf.Reset()
Expand Down Expand Up @@ -212,7 +212,7 @@ func (w *Workloader) loadHistory(ctx context.Context, warehouse int, district in
s := getTPCCState(ctx)

hint := `INSERT INTO history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_date, h_amount, h_data) VALUES `
l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

// 1 customer has 1 row
for i := 0; i < customerPerDistrict; i++ {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (w *Workloader) loadOrder(ctx context.Context, warehouse int, district int)
hint := `INSERT INTO orders (o_id, o_d_id, o_w_id, o_c_id, o_entry_d,
o_carrier_id, o_ol_cnt, o_all_local) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

cids := rand.Perm(orderPerDistrict)
s.R.Shuffle(len(cids), func(i, j int) {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (w *Workloader) loadNewOrder(ctx context.Context, warehouse int, district i

hint := `INSERT INTO new_order (no_o_id, no_d_id, no_w_id) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < newOrderPerDistrict; i++ {
s.Buf.Reset()
Expand All @@ -312,7 +312,7 @@ func (w *Workloader) loadOrderLine(ctx context.Context, warehouse int, district
hint := `INSERT INTO order_line (ol_o_id, ol_d_id, ol_w_id, ol_number,
ol_i_id, ol_supply_w_id, ol_delivery_d, ol_quantity, ol_amount, ol_dist_info) VALUES `

l := load.NewSQLBatchLoader(s.Conn, hint)
l := load.NewSQLBatchLoader(w.db, hint, w.cfg.PrepareReCommitCount, w.cfg.PrepareReCommitDuration)

for i := 0; i < orderPerDistrict; i++ {
for j := 0; j < olCnts[i]; j++ {
Expand Down
2 changes: 1 addition & 1 deletion tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (w *Workloader) runNewOrder(ctx context.Context, thread int) error {

// Process 1
if err := s.newOrderStmts[newOrderSelectCustomer].QueryRowContext(ctx, d.wID, d.dID, d.cID).Scan(&d.cDiscount, &d.cLast, &d.cCredit, &d.wTax); err != nil {
return fmt.Errorf("exec %s failed %v", newOrderSelectCustomer, err)
return fmt.Errorf("exec %s(wID=%d,dID=%d,cID=%d) failed %v", newOrderSelectCustomer, d.wID, d.dID, d.cID, err)
}

// Process 2
Expand Down
4 changes: 4 additions & 0 deletions tpcc/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Config struct {
OutputType string
OutputDir string
SpecifiedTables string

// connection, retry count when commiting statement fails, default 0
PrepareReCommitCount int
PrepareReCommitDuration time.Duration
}

// Workloader is TPCC workload
Expand Down
49 changes: 25 additions & 24 deletions tpch/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"

"github.com/pingcap/go-tpc/pkg/load"
"github.com/pingcap/go-tpc/tpch/dbgen"
)
Expand Down Expand Up @@ -173,43 +174,43 @@ func (r *regionLoader) Load(item interface{}) error {
return r.InsertValue(v)
}

func NewOrderLoader(ctx context.Context, conn *sql.Conn) *orderLoader {
return &orderLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `),
func NewOrderLoader(ctx context.Context, db *sql.DB) *orderLoader {
return &orderLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO orders (O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK, O_SHIPPRIORITY, O_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewLineItemLoader(ctx context.Context, conn *sql.Conn) *lineItemloader {
return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `),
func NewLineItemLoader(ctx context.Context, db *sql.DB) *lineItemloader {
return &lineItemloader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO lineitem (L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewCustLoader(ctx context.Context, conn *sql.Conn) *custLoader {
return &custLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `),
func NewCustLoader(ctx context.Context, db *sql.DB) *custLoader {
return &custLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO customer (C_CUSTKEY, C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewPartLoader(ctx context.Context, conn *sql.Conn) *partLoader {
return &partLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `),
func NewPartLoader(ctx context.Context, db *sql.DB) *partLoader {
return &partLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO part (P_PARTKEY, P_NAME, P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewPartSuppLoader(ctx context.Context, conn *sql.Conn) *partSuppLoader {
return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `),
func NewPartSuppLoader(ctx context.Context, db *sql.DB) *partSuppLoader {
return &partSuppLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO partsupp (PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewSuppLoader(ctx context.Context, conn *sql.Conn) *suppLoader {
return &suppLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `),
func NewSuppLoader(ctx context.Context, db *sql.DB) *suppLoader {
return &suppLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO supplier (S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewNationLoader(ctx context.Context, conn *sql.Conn) *nationLoader {
return &nationLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `),
func NewNationLoader(ctx context.Context, db *sql.DB) *nationLoader {
return &nationLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO nation (N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT) VALUES `, 0, 0),
ctx}}
}
func NewRegionLoader(ctx context.Context, conn *sql.Conn) *regionLoader {
return &regionLoader{sqlLoader{load.NewSQLBatchLoader(conn,
`INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `),
func NewRegionLoader(ctx context.Context, db *sql.DB) *regionLoader {
return &regionLoader{sqlLoader{load.NewSQLBatchLoader(db,
`INSERT INTO region (R_REGIONKEY, R_NAME, R_COMMENT) VALUES `, 0, 0),
ctx}}
}
18 changes: 8 additions & 10 deletions tpch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,18 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error {
if threadID != 0 {
return nil
}
s := w.getState(ctx)

if err := w.createTables(ctx); err != nil {
return err
}
sqlLoader := map[dbgen.Table]dbgen.Loader{
dbgen.TOrder: NewOrderLoader(ctx, s.Conn),
dbgen.TLine: NewLineItemLoader(ctx, s.Conn),
dbgen.TPart: NewPartLoader(ctx, s.Conn),
dbgen.TPsupp: NewPartSuppLoader(ctx, s.Conn),
dbgen.TSupp: NewSuppLoader(ctx, s.Conn),
dbgen.TCust: NewCustLoader(ctx, s.Conn),
dbgen.TNation: NewNationLoader(ctx, s.Conn),
dbgen.TRegion: NewRegionLoader(ctx, s.Conn),
dbgen.TOrder: NewOrderLoader(ctx, w.db),
dbgen.TLine: NewLineItemLoader(ctx, w.db),
dbgen.TPart: NewPartLoader(ctx, w.db),
dbgen.TPsupp: NewPartSuppLoader(ctx, w.db),
dbgen.TSupp: NewSuppLoader(ctx, w.db),
dbgen.TCust: NewCustLoader(ctx, w.db),
dbgen.TNation: NewNationLoader(ctx, w.db),
dbgen.TRegion: NewRegionLoader(ctx, w.db),
}
dbgen.InitDbGen(int64(w.cfg.ScaleFactor))
if err := dbgen.DbGen(sqlLoader, []dbgen.Table{dbgen.TNation, dbgen.TRegion, dbgen.TCust, dbgen.TSupp, dbgen.TPartPsupp, dbgen.TOrderLine}); err != nil {
Expand Down