-
Notifications
You must be signed in to change notification settings - Fork 413
lakectl: support async commit and merge #9764
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
itaigilo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good.
The only thing is, you wrote:
Verified backwards and forwards compatibility with existing tests
How about testing the added functionality?
No tests added?
cmd/lakectl/cmd/async.go
Outdated
|
|
||
| const ( | ||
| initialInterval = 1 * time.Second // initial interval for exponential backoff | ||
| MaxInterval = 10 * time.Second // max interval for exponential backoff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxInterval?
cmd/lakectl/cmd/merge.go
Outdated
| DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) | ||
| if resp.JSON200 == nil { | ||
| Die("Bad response from server", 1) | ||
| // # try asynchronous merge first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // # try asynchronous merge first | |
| // try asynchronous merge first |
cmd/lakectl/cmd/commit.go
Outdated
| err error | ||
| ) | ||
|
|
||
| // # try asynchronous commit first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // # try asynchronous commit first | |
| // try asynchronous commit first |
Whatever we can test here is already covered in existing tests |
zubron
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor comments, but otherwise looks good!
cmd/lakectl/cmd/commit.go
Outdated
| } | ||
| var ( | ||
| commit *apigen.Commit | ||
| err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pre-declaration of err can be removed since we assign directly below.
cmd/lakectl/cmd/merge.go
Outdated
| } | ||
| var result *apigen.MergeResult | ||
| taskID := startResp.JSON202.Id | ||
| err := pollAsyncOperationStatus(ctx, taskID, "commit", func() (*apigen.AsyncTaskStatus, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/commit/merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is regarding the operation should be "merge" right?
cmd/lakectl/cmd/merge.go
Outdated
| ctx := cmd.Context() | ||
| var ( | ||
| ref string | ||
| err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
Annaseli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost finished reviewing
| resp, err := client.CommitWithResponse(cmd.Context(), branchURI.Repository, branchURI.Ref, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{ | ||
| Message: message, | ||
| Metadata: &metadata, | ||
| body := apigen.CommitJSONRequestBody{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you wrapping it with apigen.CommitJSONRequestBody?
When calling client.CommitAsyncWithResponse, the body is already wrapped using apigen.CommitAsyncJSONRequestBody.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question.
I'm using this body for both sync and async operations.
Since the are of different types with the same underlying type I need to cast it one way or another.
In this case I'm using apigen.CommitJSONRequestBody and casting it to apigen.CommitAsyncJSONRequestBody in the async request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added this comment because i was thinkung that we should only have the apigen.CommitAsyncJSONRequestBody without the apigen.CommitJSONRequestBody because they both rely on the CommitCreation struct
|
|
||
| defaultPollInterval = 3 * time.Second // default interval while pulling tasks status | ||
| minimumPollInterval = time.Second // minimum interval while pulling tasks status | ||
| defaultPollTimeout = time.Hour // default expiry for pull status with no update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn’t one hour too long? For example, on the server I used a 20-minute timeout, as Barak suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no timeout on the client side. This is a parameter that was moved from a different file and is being used by another command
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in this PR you are not planning to add to the client timeout, right? since last time we talked about it you said we need to add timeout to the client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - we don't need a timeout. Once we have a cancel operation user will simply break execution to cancel the operation
| // TODO (niro): We will need to implement timeout and cancel logic here | ||
| func pollAsyncOperationStatus(ctx context.Context, taskID string, operation string, cb asyncStatusCallback) error { | ||
| var bo backoff.BackOff = backoff.NewExponentialBackOff( | ||
| backoff.WithInitialInterval(initialInterval), backoff.WithMaxInterval(maxInterval)) // No timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you won’t be using the defaultPollTimeout parameter you defined for handling the timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not defined, just moved
cmd/lakectl/cmd/async.go
Outdated
|
|
||
| type StatusResponse struct { | ||
| Completed bool | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used - removed
| if resp.JSON200 == nil { | ||
| Die("Bad response from server", 1) | ||
| // try asynchronous merge first | ||
| startResp, err := client.MergeIntoBranchAsyncWithResponse(ctx, destinationRef.Repository, sourceRef.Ref, destinationRef.Ref, apigen.MergeIntoBranchAsyncJSONRequestBody(body)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as in the commit regarding the body
cmd/lakectl/cmd/merge.go
Outdated
| } | ||
| var result *apigen.MergeResult | ||
| taskID := startResp.JSON202.Id | ||
| err := pollAsyncOperationStatus(ctx, taskID, "commit", func() (*apigen.AsyncTaskStatus, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is regarding the operation should be "merge" right?
cmd/lakectl/cmd/merge.go
Outdated
| Result: resp.JSON200, | ||
| Result: &apigen.MergeResult{ | ||
| Reference: ref, | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the commit you pass the commit var you created as the result but here you create the &apigen.MergeResult{} struct - maybe we should align them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
Closes #9747
Change Description
Background
Support async operations in lakectl with fallback
Testing Details
Verified backwards and forwards compatibility with existing tests
Breaking Change?
No