Skip to content

Commit 7f81daf

Browse files
authored
chore(spanner): support mutation only operation for read-write mux (#11342)
* chore(spanner): support mutation only operation for read-write mux * incorporate changes
1 parent e41a153 commit 7f81daf

File tree

7 files changed

+381
-89
lines changed

7 files changed

+381
-89
lines changed

spanner/client.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,19 @@ func parseDatabaseName(db string) (project, instance, database string, err error
107107
// Client is a client for reading and writing data to a Cloud Spanner database.
108108
// A client is safe to use concurrently, except for its Close method.
109109
type Client struct {
110-
sc *sessionClient
111-
idleSessions *sessionPool
112-
logger *log.Logger
113-
qo QueryOptions
114-
ro ReadOptions
115-
ao []ApplyOption
116-
txo TransactionOptions
117-
bwo BatchWriteOptions
118-
ct *commonTags
119-
disableRouteToLeader bool
120-
enableMultiplexedSessionForRW bool
121-
dro *sppb.DirectedReadOptions
122-
otConfig *openTelemetryConfig
123-
metricsTracerFactory *builtinMetricsTracerFactory
110+
sc *sessionClient
111+
idleSessions *sessionPool
112+
logger *log.Logger
113+
qo QueryOptions
114+
ro ReadOptions
115+
ao []ApplyOption
116+
txo TransactionOptions
117+
bwo BatchWriteOptions
118+
ct *commonTags
119+
disableRouteToLeader bool
120+
dro *sppb.DirectedReadOptions
121+
otConfig *openTelemetryConfig
122+
metricsTracerFactory *builtinMetricsTracerFactory
124123
}
125124

126125
// DatabaseName returns the full name of a database, e.g.,
@@ -548,20 +547,19 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
548547
}
549548

550549
c = &Client{
551-
sc: sc,
552-
idleSessions: sp,
553-
logger: config.Logger,
554-
qo: getQueryOptions(config.QueryOptions),
555-
ro: config.ReadOptions,
556-
ao: config.ApplyOptions,
557-
txo: config.TransactionOptions,
558-
bwo: config.BatchWriteOptions,
559-
ct: getCommonTags(sc),
560-
disableRouteToLeader: config.DisableRouteToLeader,
561-
dro: config.DirectedReadOptions,
562-
otConfig: otConfig,
563-
metricsTracerFactory: metricsTracerFactory,
564-
enableMultiplexedSessionForRW: config.enableMultiplexedSessionForRW,
550+
sc: sc,
551+
idleSessions: sp,
552+
logger: config.Logger,
553+
qo: getQueryOptions(config.QueryOptions),
554+
ro: config.ReadOptions,
555+
ao: config.ApplyOptions,
556+
txo: config.TransactionOptions,
557+
bwo: config.BatchWriteOptions,
558+
ct: getCommonTags(sc),
559+
disableRouteToLeader: config.DisableRouteToLeader,
560+
dro: config.DirectedReadOptions,
561+
otConfig: otConfig,
562+
metricsTracerFactory: metricsTracerFactory,
565563
}
566564
return c, nil
567565
}
@@ -1025,7 +1023,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
10251023
err error
10261024
)
10271025
if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1028-
if c.enableMultiplexedSessionForRW {
1026+
if c.idleSessions.isMultiplexedSessionForRWEnabled() {
10291027
sh, err = c.idleSessions.takeMultiplexed(ctx)
10301028
} else {
10311029
// Session handle hasn't been allocated or has been destroyed.
@@ -1044,7 +1042,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
10441042
// Note that the t.begin(ctx) call could change the session that is being used by the transaction, as the
10451043
// BeginTransaction RPC invocation will be retried on a new session if it returns SessionNotFound.
10461044
t.txReadOnly.sh = sh
1047-
if err = t.begin(ctx); err != nil {
1045+
if err = t.begin(ctx, nil); err != nil {
10481046
trace.TracePrintf(ctx, nil, "Error while BeginTransaction during retrying a ReadWrite transaction: %v", ToSpannerError(err))
10491047
return ToSpannerError(err)
10501048
}
@@ -1072,7 +1070,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
10721070
return err
10731071
})
10741072
if isUnimplementedErrorForMultiplexedRW(err) {
1075-
c.enableMultiplexedSessionForRW = false
1073+
c.idleSessions.disableMultiplexedSessionForRW()
10761074
}
10771075
return resp, err
10781076
}

spanner/internal/testutil/inmem_spanner_server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,6 +1078,9 @@ func (s *inMemSpannerServer) BeginTransaction(ctx context.Context, req *spannerp
10781078
}
10791079
s.updateSessionLastUseTime(session.Name)
10801080
tx := s.beginTransaction(session, req.Options)
1081+
if session.Multiplexed && req.MutationKey != nil {
1082+
tx.PrecommitToken = s.getPreCommitToken(string(tx.Id), "TransactionPrecommitToken")
1083+
}
10811084
return tx, nil
10821085
}
10831086

spanner/mutation.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ limitations under the License.
1717
package spanner
1818

1919
import (
20+
"math/rand"
2021
"reflect"
22+
"time"
2123

2224
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
2325
"google.golang.org/grpc/codes"
@@ -427,24 +429,50 @@ func (m Mutation) proto() (*sppb.Mutation, error) {
427429

428430
// mutationsProto turns a spanner.Mutation array into a sppb.Mutation array,
429431
// it is convenient for sending batch mutations to Cloud Spanner.
430-
func mutationsProto(ms []*Mutation) ([]*sppb.Mutation, error) {
432+
func mutationsProto(ms []*Mutation) ([]*sppb.Mutation, *sppb.Mutation, error) {
433+
var selectedMutation *Mutation
434+
var nonInsertMutations []*Mutation
435+
431436
l := make([]*sppb.Mutation, 0, len(ms))
432437
for _, m := range ms {
438+
if m.op != opInsert {
439+
nonInsertMutations = append(nonInsertMutations, m)
440+
}
441+
if selectedMutation == nil {
442+
selectedMutation = m
443+
}
444+
// Track the INSERT mutation with the highest number of values if only INSERT mutation were found
445+
if selectedMutation.op == opInsert && m.op == opInsert && len(m.values) > len(selectedMutation.values) {
446+
selectedMutation = m
447+
}
448+
449+
// Convert the mutation to sppb.Mutation and add to the list
433450
pb, err := m.proto()
434451
if err != nil {
435-
return nil, err
452+
return nil, nil, err
436453
}
437454
l = append(l, pb)
438455
}
439-
return l, nil
456+
if len(nonInsertMutations) > 0 {
457+
selectedMutation = nonInsertMutations[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(nonInsertMutations))]
458+
}
459+
if selectedMutation != nil {
460+
m, err := selectedMutation.proto()
461+
if err != nil {
462+
return nil, nil, err
463+
}
464+
return l, m, nil
465+
}
466+
467+
return l, nil, nil
440468
}
441469

442470
// mutationGroupsProto turns a spanner.MutationGroup array into a
443471
// sppb.BatchWriteRequest_MutationGroup array, in preparation to send RPCs.
444472
func mutationGroupsProto(mgs []*MutationGroup) ([]*sppb.BatchWriteRequest_MutationGroup, error) {
445473
gs := make([]*sppb.BatchWriteRequest_MutationGroup, 0, len(mgs))
446474
for _, mg := range mgs {
447-
ms, err := mutationsProto(mg.Mutations)
475+
ms, _, err := mutationsProto(mg.Mutations)
448476
if err != nil {
449477
return nil, err
450478
}

0 commit comments

Comments
 (0)