Skip to content

Commit c37f9ae

Browse files
authored
refactor(bigquery/storage/managedwriter): introduce send optimizers (#7323)
* refactor(bigquery/storage/managedwriter): introduce send optimizers This PR introduces a new optimizer abstraction for connection objects, but doesn't wire it in. The purpose of the optimizers is to leverage awareness of previous requests to reduce transferred bytes.
1 parent 0bf80d7 commit c37f9ae

File tree

2 files changed

+397
-0
lines changed

2 files changed

+397
-0
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
19+
"google.golang.org/protobuf/proto"
20+
)
21+
22+
// optimizeAndSend handles the general task of optimizing AppendRowsRequest messages send to the backend.
23+
//
24+
// The basic premise is that by maintaining awareness of previous sends, individual messages can be made
25+
// more efficient (smaller) by redacting redundant information.
26+
type sendOptimizer interface {
27+
// signalReset is used to signal to the optimizer that the connection is freshly (re)opened.
28+
signalReset()
29+
30+
// optimizeSend handles redactions for a given stream.
31+
optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error
32+
}
33+
34+
// passthroughOptimizer is an optimizer that doesn't modify requests.
35+
type passthroughOptimizer struct {
36+
}
37+
38+
func (po *passthroughOptimizer) signalReset() {
39+
// we don't care, just here to satisfy the interface.
40+
}
41+
42+
func (po *passthroughOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
43+
return arc.Send(req)
44+
}
45+
46+
// simplexOptimizer is used for connections where there's only a single stream's data being transmitted.
47+
//
48+
// The optimizations here are straightforward: the first request on a stream is unmodified, all
49+
// subsequent requests can redact WriteStream, WriterSchema, and TraceID.
50+
//
51+
// TODO: this optimizer doesn't do schema evolution checkes, but relies on existing behavior that triggers reconnect
52+
// on schema change. Revisit this, as it may not be necessary once b/266946486 is resolved.
53+
type simplexOptimizer struct {
54+
haveSent bool
55+
}
56+
57+
func (eo *simplexOptimizer) signalReset() {
58+
eo.haveSent = false
59+
}
60+
61+
func (eo *simplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
62+
var resp error
63+
if eo.haveSent {
64+
// subsequent send, clone and redact.
65+
cp := proto.Clone(req).(*storagepb.AppendRowsRequest)
66+
cp.WriteStream = ""
67+
cp.GetProtoRows().WriterSchema = nil
68+
cp.TraceId = ""
69+
resp = arc.Send(cp)
70+
} else {
71+
// first request, send unmodified.
72+
resp = arc.Send(req)
73+
}
74+
eo.haveSent = resp == nil
75+
return resp
76+
}
77+
78+
// multiplexOptimizer is used for connections where requests for multiple streams are sent on a common connection.
79+
//
80+
// In this case, the optimizations are as follows:
81+
// * We **must** send the WriteStream on all requests.
82+
// * For sequential requests to the same stream, schema can be redacted after the first request.
83+
// * Trace ID can be redacted from all requests after the first.
84+
type multiplexOptimizer struct {
85+
prev *storagepb.AppendRowsRequest
86+
}
87+
88+
func (mo *multiplexOptimizer) signalReset() {
89+
mo.prev = nil
90+
}
91+
92+
func (mo *multiplexOptimizer) optimizeSend(arc storagepb.BigQueryWrite_AppendRowsClient, req *storagepb.AppendRowsRequest) error {
93+
var resp error
94+
// we'll need a copy
95+
cp := proto.Clone(req).(*storagepb.AppendRowsRequest)
96+
if mo.prev != nil {
97+
var swapOnSuccess bool
98+
// Clear trace ID. We use the _presence_ of a previous request for reasoning about TraceID, we don't compare
99+
// it's value.
100+
cp.TraceId = ""
101+
// we have a previous send.
102+
if cp.GetWriteStream() != mo.prev.GetWriteStream() {
103+
// different stream, no further optimization.
104+
swapOnSuccess = true
105+
} else {
106+
// same stream
107+
if !proto.Equal(mo.prev.GetProtoRows().GetWriterSchema().GetProtoDescriptor(), cp.GetProtoRows().GetWriterSchema().GetProtoDescriptor()) {
108+
swapOnSuccess = true
109+
} else {
110+
// the redaction case, where we won't swap.
111+
cp.GetProtoRows().WriterSchema = nil
112+
}
113+
}
114+
resp = arc.Send(cp)
115+
if resp == nil && swapOnSuccess {
116+
cp.GetProtoRows().Rows = nil
117+
cp.MissingValueInterpretations = nil
118+
mo.prev = cp
119+
}
120+
if resp != nil {
121+
mo.prev = nil
122+
}
123+
return resp
124+
}
125+
126+
// no previous trace case.
127+
resp = arc.Send(req)
128+
if resp == nil {
129+
// copy the send as the previous.
130+
cp.GetProtoRows().Rows = nil
131+
cp.MissingValueInterpretations = nil
132+
mo.prev = cp
133+
}
134+
return resp
135+
}
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"io"
19+
"testing"
20+
21+
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
22+
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
23+
"github.com/google/go-cmp/cmp"
24+
"google.golang.org/protobuf/proto"
25+
"google.golang.org/protobuf/reflect/protodesc"
26+
"google.golang.org/protobuf/testing/protocmp"
27+
)
28+
29+
func TestSendOptimizer(t *testing.T) {
30+
31+
exampleReq := &storagepb.AppendRowsRequest{
32+
WriteStream: "foo",
33+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
34+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
35+
Rows: &storagepb.ProtoRows{
36+
SerializedRows: [][]byte{[]byte("row_data")},
37+
},
38+
WriterSchema: &storagepb.ProtoSchema{
39+
ProtoDescriptor: protodesc.ToDescriptorProto((&testdata.SimpleMessageProto2{}).ProtoReflect().Descriptor()),
40+
},
41+
},
42+
},
43+
TraceId: "trace_id",
44+
}
45+
46+
var testCases = []struct {
47+
description string
48+
optimizer sendOptimizer
49+
reqs []*storagepb.AppendRowsRequest
50+
sendResults []error
51+
wantReqs []*storagepb.AppendRowsRequest
52+
}{
53+
{
54+
description: "passthrough-optimizer",
55+
optimizer: &passthroughOptimizer{},
56+
reqs: []*storagepb.AppendRowsRequest{
57+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
58+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
59+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
60+
},
61+
sendResults: []error{
62+
nil,
63+
io.EOF,
64+
io.EOF,
65+
},
66+
wantReqs: []*storagepb.AppendRowsRequest{
67+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
68+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
69+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
70+
},
71+
},
72+
{
73+
description: "simplex no errors",
74+
optimizer: &simplexOptimizer{},
75+
reqs: []*storagepb.AppendRowsRequest{
76+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
77+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
78+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
79+
},
80+
sendResults: []error{
81+
nil,
82+
nil,
83+
nil,
84+
},
85+
wantReqs: func() []*storagepb.AppendRowsRequest {
86+
want := make([]*storagepb.AppendRowsRequest, 3)
87+
// first has no redactions.
88+
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
89+
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
90+
req.GetProtoRows().WriterSchema = nil
91+
req.TraceId = ""
92+
req.WriteStream = ""
93+
want[1] = req
94+
// previous had errors, so unredacted.
95+
want[2] = req
96+
return want
97+
}(),
98+
},
99+
{
100+
description: "simplex w/partial errors",
101+
optimizer: &simplexOptimizer{},
102+
reqs: []*storagepb.AppendRowsRequest{
103+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
104+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
105+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
106+
},
107+
sendResults: []error{
108+
nil,
109+
io.EOF,
110+
nil,
111+
},
112+
wantReqs: func() []*storagepb.AppendRowsRequest {
113+
want := make([]*storagepb.AppendRowsRequest, 3)
114+
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
115+
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
116+
req.GetProtoRows().WriterSchema = nil
117+
req.TraceId = ""
118+
req.WriteStream = ""
119+
want[1] = req
120+
want[2] = want[0]
121+
return want
122+
}(),
123+
},
124+
{
125+
description: "multiplex single all errors",
126+
optimizer: &multiplexOptimizer{},
127+
reqs: []*storagepb.AppendRowsRequest{
128+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
129+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
130+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
131+
},
132+
sendResults: []error{
133+
io.EOF,
134+
io.EOF,
135+
io.EOF,
136+
},
137+
wantReqs: []*storagepb.AppendRowsRequest{
138+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
139+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
140+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
141+
},
142+
},
143+
{
144+
description: "multiplex single no errors",
145+
optimizer: &multiplexOptimizer{},
146+
reqs: []*storagepb.AppendRowsRequest{
147+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
148+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
149+
proto.Clone(exampleReq).(*storagepb.AppendRowsRequest),
150+
},
151+
sendResults: []error{
152+
nil,
153+
nil,
154+
nil,
155+
},
156+
wantReqs: func() []*storagepb.AppendRowsRequest {
157+
want := make([]*storagepb.AppendRowsRequest, 3)
158+
want[0] = proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
159+
req := proto.Clone(want[0]).(*storagepb.AppendRowsRequest)
160+
req.GetProtoRows().WriterSchema = nil
161+
req.TraceId = ""
162+
want[1] = req
163+
want[2] = req
164+
return want
165+
}(),
166+
},
167+
{
168+
description: "multiplex interleave",
169+
optimizer: &multiplexOptimizer{},
170+
reqs: func() []*storagepb.AppendRowsRequest {
171+
reqs := make([]*storagepb.AppendRowsRequest, 10)
172+
reqA := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
173+
reqA.WriteStream = "alpha"
174+
175+
reqB := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
176+
reqB.WriteStream = "beta"
177+
reqB.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
178+
reqs[0] = reqA
179+
reqs[1] = reqA
180+
reqs[2] = reqB
181+
reqs[3] = reqA
182+
reqs[4] = reqB
183+
reqs[5] = reqB
184+
reqs[6] = reqB
185+
reqs[7] = reqB
186+
reqs[8] = reqA
187+
reqs[9] = reqA
188+
189+
return reqs
190+
}(),
191+
sendResults: []error{
192+
nil,
193+
nil,
194+
nil,
195+
nil,
196+
nil,
197+
io.EOF,
198+
nil,
199+
nil,
200+
nil,
201+
io.EOF,
202+
},
203+
wantReqs: func() []*storagepb.AppendRowsRequest {
204+
want := make([]*storagepb.AppendRowsRequest, 10)
205+
206+
wantReqAFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
207+
wantReqAFull.WriteStream = "alpha"
208+
209+
wantReqANoTrace := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
210+
wantReqANoTrace.TraceId = ""
211+
212+
wantReqAOpt := proto.Clone(wantReqAFull).(*storagepb.AppendRowsRequest)
213+
wantReqAOpt.GetProtoRows().WriterSchema = nil
214+
wantReqAOpt.TraceId = ""
215+
216+
wantReqBFull := proto.Clone(exampleReq).(*storagepb.AppendRowsRequest)
217+
wantReqBFull.WriteStream = "beta"
218+
wantReqBFull.GetProtoRows().GetWriterSchema().ProtoDescriptor = protodesc.ToDescriptorProto((&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor())
219+
220+
wantReqBNoTrace := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
221+
wantReqBNoTrace.TraceId = ""
222+
223+
wantReqBOpt := proto.Clone(wantReqBFull).(*storagepb.AppendRowsRequest)
224+
wantReqBOpt.GetProtoRows().WriterSchema = nil
225+
wantReqBOpt.TraceId = ""
226+
227+
want[0] = wantReqAFull
228+
want[1] = wantReqAOpt
229+
want[2] = wantReqBNoTrace
230+
want[3] = wantReqANoTrace
231+
want[4] = wantReqBNoTrace
232+
want[5] = wantReqBOpt
233+
want[6] = wantReqBFull
234+
want[7] = wantReqBOpt
235+
want[8] = wantReqANoTrace
236+
want[9] = wantReqAOpt
237+
238+
return want
239+
}(),
240+
},
241+
}
242+
243+
for _, tc := range testCases {
244+
testARC := &testAppendRowsClient{}
245+
testARC.sendF = func(req *storagepb.AppendRowsRequest) error {
246+
testARC.requests = append(testARC.requests, proto.Clone(req).(*storagepb.AppendRowsRequest))
247+
respErr := tc.sendResults[0]
248+
tc.sendResults = tc.sendResults[1:]
249+
return respErr
250+
}
251+
252+
for _, req := range tc.reqs {
253+
tc.optimizer.optimizeSend(testARC, req)
254+
}
255+
// now, compare.
256+
for k, wr := range tc.wantReqs {
257+
if diff := cmp.Diff(testARC.requests[k], wr, protocmp.Transform()); diff != "" {
258+
t.Errorf("%s (req %d) mismatch: -got, +want:\n%s", tc.description, k, diff)
259+
}
260+
}
261+
}
262+
}

0 commit comments

Comments
 (0)