Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/go/rpk/pkg/serde/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// generated while compiling the schema.
const inMemFileName = "tmp.proto"

// newAvroEncoder will generate a serializer function using the specified
// newProtoEncoder will generate a serializer function using the specified
// schema. It utilizes the message type identified by protoFQN. If the schema
// includes references, it retrieves them using the supplied client. The
// generated function returns the record encoded in the protobuf wire format.
Expand All @@ -42,12 +42,14 @@ func newProtoEncoder(compiledFiles linker.Files, protoFQN string, schemaID int)
if !ok {
return nil, fmt.Errorf("unable to process message with name %q", protoFQN)
}
message := dynamicpb.NewMessage(msgDescriptor)

o := protojson.UnmarshalOptions{
Resolver: compiledFiles.AsResolver(),
}
return func(record []byte) ([]byte, error) {
// Create a new message instance for each encode call to avoid concurrent map write issues. //
message := dynamicpb.NewMessage(msgDescriptor)

// Unmarshal the record into the proto message. //
err := o.Unmarshal(record, message)
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions src/go/rpk/pkg/serde/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand Down Expand Up @@ -552,6 +553,53 @@ func protoReferenceHandler() http.HandlerFunc {
}
}

// Tests that the proto encoder is thread-safe by running multiple goroutines
// that concurrently encode different protobuf records. This test verifies that
// concurrent calls to EncodeRecord don't cause race conditions or panics.
func Test_encodeProtoRecordConcurrency(t *testing.T) {
const testSchema = `syntax = "proto3";

message Person {
string name = 1;
int32 id = 2;
bool isAdmin = 3;
optional string email = 4;
}`

noopCl, _ := sr.NewClient()
schema := sr.Schema{
Schema: testSchema,
Type: sr.TypeProtobuf,
}

serde, err := NewSerde(context.Background(), noopCl, &schema, 1, "Person")
require.NoError(t, err)

// Number of goroutines to run concurrently
numGoroutines := 10
numIterations := 100

// Use a sync.WaitGroup to wait for all goroutines
var wg sync.WaitGroup

// Launch multiple goroutines that encode records concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()

// Each goroutine encodes the same record multiple times to simulate concurrent access.
for j := 0; j < numIterations; j++ {
_, err := serde.EncodeRecord([]byte(`{"name":"igor","id":123,"isAdmin":true,"email":"test@redpanda.com"}`))
require.NoError(t, err)
}
}(i)
}

// Wait for all goroutines to complete
wg.Wait()
}

// Long message, including all well-known types baked in rpk that we can
// encode/decode.
const messageAllWellKnown = `{
Expand Down