Skip to content

Commit 512d21d

Browse files
committed
feat(CLI): add --parallel flag to phrase pull for concurrent downloads
Download locale files using up to 4 concurrent requests (matching the Phrase API concurrency limit) via errgroup. Results are collected and printed in order after all downloads complete for clean output. A shared mutex coordinates rate-limit pauses across all workers. Only supported in sync mode; --parallel with --async warns and ignores.
1 parent 4f6cb6d commit 512d21d

5 files changed

Lines changed: 298 additions & 16 deletions

File tree

clients/cli/cmd/internal/pull.go

Lines changed: 149 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"path/filepath"
1111
"reflect"
1212
"strings"
13+
"sync"
1314
"time"
1415

1516
"github.com/phrase/phrase-cli/cmd/internal/paths"
@@ -18,6 +19,7 @@ import (
1819

1920
"github.com/antihax/optional"
2021
"github.com/phrase/phrase-go/v4"
22+
"golang.org/x/sync/errgroup"
2123
)
2224

2325
const (
@@ -26,13 +28,16 @@ const (
2628
asyncRetryCount = 360 // 30 minutes
2729
)
2830

31+
const maxParallelDownloads = 4 // Phrase API allows max 4 concurrent requests
32+
2933
var Config *phrase.Config
3034

3135
type PullCommand struct {
3236
phrase.Config
3337
Branch string
3438
UseLocalBranchName bool
3539
Async bool
40+
Parallel bool
3641
}
3742

3843
var Auth context.Context
@@ -82,7 +87,15 @@ func (cmd *PullCommand) Run(config *phrase.Config) error {
8287
}
8388

8489
for _, target := range targets {
85-
err := target.Pull(client, cmd.Async)
90+
var err error
91+
if cmd.Parallel && !cmd.Async {
92+
err = target.PullParallel(client)
93+
} else {
94+
if cmd.Parallel && cmd.Async {
95+
print.Warn("--parallel is not supported with --async, ignoring parallel")
96+
}
97+
err = target.Pull(client, cmd.Async)
98+
}
8699
if err != nil {
87100
return err
88101
}
@@ -146,25 +159,150 @@ func (target *Target) Pull(client *phrase.APIClient, async bool) error {
146159
return nil
147160
}
148161

149-
func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool) error {
150-
localVarOptionals := phrase.LocaleDownloadOpts{}
162+
type downloadResult struct {
163+
message string
164+
path string
165+
errMsg string
166+
}
167+
168+
func (target *Target) PullParallel(client *phrase.APIClient) error {
169+
if err := target.CheckPreconditions(); err != nil {
170+
return err
171+
}
172+
173+
localeFiles, err := target.LocaleFiles()
174+
if err != nil {
175+
return err
176+
}
177+
178+
// Ensure all destination files/dirs exist before parallel downloads
179+
for _, lf := range localeFiles {
180+
if err := createFile(lf.Path); err != nil {
181+
return err
182+
}
183+
}
184+
185+
results := make([]downloadResult, len(localeFiles))
186+
var rateMu sync.RWMutex
187+
188+
ctx, cancel := context.WithTimeout(context.Background(), timeoutInMinutes)
189+
defer cancel()
190+
g, ctx := errgroup.WithContext(ctx)
191+
g.SetLimit(maxParallelDownloads)
192+
193+
for i, lf := range localeFiles {
194+
g.Go(func() error {
195+
if ctx.Err() != nil {
196+
return ctx.Err()
197+
}
198+
199+
opts, err := target.buildDownloadOpts(lf)
200+
if err != nil {
201+
err = fmt.Errorf("%s for %s", err, lf.Path)
202+
results[i] = downloadResult{errMsg: err.Error()}
203+
return err
204+
}
205+
206+
err = target.downloadWithRateGate(client, lf, opts, &rateMu)
207+
if err != nil {
208+
if openapiError, ok := err.(phrase.GenericOpenAPIError); ok {
209+
print.Warn("API response: %s", openapiError.Body())
210+
}
211+
err = fmt.Errorf("%s for %s", err, lf.Path)
212+
results[i] = downloadResult{errMsg: err.Error()}
213+
return err
214+
}
215+
216+
results[i] = downloadResult{
217+
message: lf.Message(),
218+
path: lf.RelPath(),
219+
}
220+
return nil
221+
})
222+
}
223+
224+
waitErr := g.Wait()
225+
226+
// Print results in original order: successes and failures
227+
var skipCount int
228+
for _, r := range results {
229+
if r.path != "" {
230+
print.Success("Downloaded %s to %s", r.message, r.path)
231+
} else if r.errMsg != "" {
232+
print.Failure("Failed %s", r.errMsg)
233+
} else {
234+
skipCount++
235+
}
236+
}
237+
if skipCount > 0 {
238+
print.Warn("%d download(s) skipped due to earlier failure", skipCount)
239+
}
240+
241+
return waitErr
242+
}
243+
244+
// downloadWithRateGate downloads a locale file with rate-limit coordination.
245+
// Uses RWMutex as a broadcast gate: workers take a read lock (cheap, concurrent),
246+
// and a rate-limited worker takes the write lock to pause everyone until reset.
247+
func (target *Target) downloadWithRateGate(client *phrase.APIClient, localeFile *LocaleFile, opts phrase.LocaleDownloadOpts, gate *sync.RWMutex) error {
248+
// Read-lock gate: blocks only when a writer (rate-limited worker) holds it
249+
gate.RLock()
250+
gate.RUnlock()
251+
252+
file, response, err := client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
253+
if err != nil {
254+
if response != nil && response.Rate.Remaining == 0 {
255+
// TryLock ensures only one worker handles the rate limit pause.
256+
// Others will block on their next RLock until the pause is over.
257+
if gate.TryLock() {
258+
waitForRateLimit(response.Rate)
259+
gate.Unlock()
260+
} else {
261+
// Another worker is already pausing; wait for it
262+
gate.RLock()
263+
gate.RUnlock()
264+
}
265+
266+
file, _, err = client.LocalesApi.LocaleDownload(Auth, target.ProjectID, localeFile.ID, &opts)
267+
if err != nil {
268+
return err
269+
}
270+
} else {
271+
return err
272+
}
273+
}
274+
return copyToDestination(file, localeFile.Path)
275+
}
276+
277+
// buildDownloadOpts prepares the LocaleDownloadOpts for a locale file download.
278+
func (target *Target) buildDownloadOpts(localeFile *LocaleFile) (phrase.LocaleDownloadOpts, error) {
279+
opts := phrase.LocaleDownloadOpts{}
151280

152281
if target.Params != nil {
153-
localVarOptionals = target.Params.LocaleDownloadOpts
282+
opts = target.Params.LocaleDownloadOpts
154283
translationKeyPrefix, err := placeholders.ResolveTranslationKeyPrefix(target.Params.TranslationKeyPrefix, localeFile.Path)
155284
if err != nil {
156-
return err
285+
return opts, err
157286
}
158-
localVarOptionals.TranslationKeyPrefix = translationKeyPrefix
287+
opts.TranslationKeyPrefix = translationKeyPrefix
159288
}
160289

161-
if localVarOptionals.FileFormat.Value() == "" {
162-
localVarOptionals.FileFormat = optional.NewString(localeFile.FileFormat)
290+
if opts.FileFormat.Value() == "" {
291+
opts.FileFormat = optional.NewString(localeFile.FileFormat)
163292
}
164293

165294
if localeFile.Tag != "" {
166-
localVarOptionals.Tags = optional.NewString(localeFile.Tag)
167-
localVarOptionals.Tag = optional.EmptyString()
295+
opts.Tags = optional.NewString(localeFile.Tag)
296+
opts.Tag = optional.EmptyString()
297+
}
298+
299+
return opts, nil
300+
}
301+
302+
func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFile *LocaleFile, async bool) error {
303+
localVarOptionals, err := target.buildDownloadOpts(localeFile)
304+
if err != nil {
305+
return err
168306
}
169307

170308
debugFprintln("Target file pattern:", target.File)
@@ -182,9 +320,8 @@ func (target *Target) DownloadAndWriteToFile(client *phrase.APIClient, localeFil
182320

183321
if async {
184322
return target.downloadAsynchronously(client, localeFile, localVarOptionals)
185-
} else {
186-
return target.downloadSynchronously(client, localeFile, localVarOptionals)
187323
}
324+
return target.downloadSynchronously(client, localeFile, localVarOptionals)
188325
}
189326

190327
func (target *Target) downloadAsynchronously(client *phrase.APIClient, localeFile *LocaleFile, downloadOpts phrase.LocaleDownloadOpts) error {
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package internal
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestRateGate_AllowsConcurrentReaders(t *testing.T) {
11+
var gate sync.RWMutex
12+
var concurrent int64
13+
var maxConcurrent int64
14+
15+
var wg sync.WaitGroup
16+
for i := 0; i < 10; i++ {
17+
wg.Add(1)
18+
go func() {
19+
defer wg.Done()
20+
21+
// Simulate the read-lock gate pattern from downloadWithRateGate
22+
gate.RLock()
23+
gate.RUnlock()
24+
25+
c := atomic.AddInt64(&concurrent, 1)
26+
for {
27+
old := atomic.LoadInt64(&maxConcurrent)
28+
if c <= old || atomic.CompareAndSwapInt64(&maxConcurrent, old, c) {
29+
break
30+
}
31+
}
32+
33+
time.Sleep(time.Millisecond)
34+
atomic.AddInt64(&concurrent, -1)
35+
}()
36+
}
37+
wg.Wait()
38+
39+
// RLock is shared, so all goroutines should run concurrently
40+
if maxConcurrent < 2 {
41+
t.Errorf("expected concurrent execution with RLock gate, max concurrent was %d", maxConcurrent)
42+
}
43+
}
44+
45+
func TestRateGate_WriteLockBlocksAllReaders(t *testing.T) {
46+
var gate sync.RWMutex
47+
ready := make(chan struct{}, 4)
48+
49+
// Simulate a rate-limited worker holding the write lock
50+
gate.Lock()
51+
52+
var wg sync.WaitGroup
53+
for i := 0; i < 4; i++ {
54+
wg.Add(1)
55+
go func() {
56+
defer wg.Done()
57+
ready <- struct{}{}
58+
gate.RLock()
59+
gate.RUnlock()
60+
}()
61+
}
62+
63+
// Wait for all goroutines to start
64+
for i := 0; i < 4; i++ {
65+
<-ready
66+
}
67+
time.Sleep(10 * time.Millisecond)
68+
69+
// Release the write lock (simulating rate limit wait done)
70+
gate.Unlock()
71+
wg.Wait()
72+
}
73+
74+
func TestRateGate_TryLockPreventsDoubleWait(t *testing.T) {
75+
var gate sync.RWMutex
76+
var waitCount int64
77+
78+
// Simulate two workers hitting rate limit simultaneously
79+
gate.Lock() // first worker takes the write lock
80+
81+
var wg sync.WaitGroup
82+
wg.Add(1)
83+
go func() {
84+
defer wg.Done()
85+
// Second worker tries TryLock, should fail
86+
if gate.TryLock() {
87+
atomic.AddInt64(&waitCount, 1)
88+
gate.Unlock()
89+
}
90+
}()
91+
92+
time.Sleep(10 * time.Millisecond)
93+
atomic.AddInt64(&waitCount, 1) // first worker counts
94+
gate.Unlock()
95+
wg.Wait()
96+
97+
// Only 1 worker should have done the wait (the first one)
98+
if atomic.LoadInt64(&waitCount) != 1 {
99+
t.Errorf("expected exactly 1 rate limit wait, got %d", waitCount)
100+
}
101+
}
102+
103+
func TestBuildDownloadOpts_DefaultFileFormat(t *testing.T) {
104+
target := &Target{
105+
File: "locales/<locale_name>.json",
106+
ProjectID: "proj1",
107+
}
108+
localeFile := &LocaleFile{
109+
FileFormat: "json",
110+
Tag: "",
111+
}
112+
113+
opts, err := target.buildDownloadOpts(localeFile)
114+
if err != nil {
115+
t.Fatalf("unexpected error: %v", err)
116+
}
117+
if opts.FileFormat.Value() != "json" {
118+
t.Errorf("expected file format 'json', got %q", opts.FileFormat.Value())
119+
}
120+
}
121+
122+
func TestBuildDownloadOpts_TagHandling(t *testing.T) {
123+
target := &Target{
124+
File: "locales/<locale_name>/<tag>.json",
125+
ProjectID: "proj1",
126+
}
127+
localeFile := &LocaleFile{
128+
FileFormat: "json",
129+
Tag: "web",
130+
}
131+
132+
opts, err := target.buildDownloadOpts(localeFile)
133+
if err != nil {
134+
t.Fatalf("unexpected error: %v", err)
135+
}
136+
if opts.Tags.Value() != "web" {
137+
t.Errorf("expected tags 'web', got %q", opts.Tags.Value())
138+
}
139+
if opts.Tag.Value() != "" {
140+
t.Errorf("expected tag to be empty string, got %q", opts.Tag.Value())
141+
}
142+
}

clients/cli/cmd/pull.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func initPull() {
2121
Branch: params.GetString("branch"),
2222
UseLocalBranchName: params.GetBool("use-local-branch-name"),
2323
Async: params.GetBool("async"),
24+
Parallel: params.GetBool("parallel"),
2425
}
2526
err := cmdPull.Run(Config)
2627
if err != nil {
@@ -33,5 +34,6 @@ func initPull() {
3334
AddFlag(pullCmd, "string", "branch", "b", "branch", false)
3435
AddFlag(pullCmd, "bool", "use-local-branch-name", "", "use local branch name", false)
3536
AddFlag(pullCmd, "bool", "async", "a", "use asynchronous locale downloads (recommended for large number of keys)", false)
37+
AddFlag(pullCmd, "bool", "parallel", "p", "download locale files in parallel (max 4 concurrent requests)", false)
3638
params.BindPFlags(pullCmd.Flags())
3739
}

clients/cli/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ require (
3737
github.com/pelletier/go-toml v1.2.0 // indirect
3838
github.com/spf13/jwalterweatherman v1.1.0 // indirect
3939
github.com/stretchr/testify v1.9.0 // indirect
40+
golang.org/x/sync v0.12.0
4041
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
4142
)

0 commit comments

Comments
 (0)