Skip to content

Conversation

@xumanbu
Copy link
Contributor

@xumanbu xumanbu commented Jan 12, 2024

What changes were proposed in this pull request?

(Please outline the changes and how this PR fixes the issue.)

Why are the changes needed?

Fix: #1373

Does this PR introduce any user-facing change?

1、add config rss.server.dynamic.assign.enabled for whether to reassign a faulty shuffle server.
2、support reassign a new shuffle server for send failed blocks
3、ShuffleReader read partition in muitl server implement will in next pr

How was this patch tested?

wait follow-up

@xumanbu xumanbu changed the title [#1373][part-1] partition write to multi server by reassign a faulty server [#1373][part-2] partition write to multi server by reassign a faulty server Jan 12, 2024
@codecov-commenter
Copy link

codecov-commenter commented Jan 12, 2024

Codecov Report

Attention: 78 lines in your changes are missing coverage. Please review.

Comparison is base (ca0f0ac) 0.00% compared to head (368dc2b) 55.15%.
Report is 31 commits behind head on master.

Files Patch % Lines
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 21.73% 18 Missing ⚠️
...fle/shuffle/manager/ShuffleManagerGrpcService.java 0.00% 14 Missing ⚠️
...request/RssReassignFaultyShuffleServerRequest.java 0.00% 13 Missing ⚠️
...sponse/RssReassignFaultyShuffleServerResponse.java 0.00% 11 Missing ⚠️
...he/uniffle/client/impl/FailedBlockSendTracker.java 52.63% 9 Missing ⚠️
...fle/client/impl/grpc/ShuffleManagerGrpcClient.java 0.00% 5 Missing ⚠️
...pache/uniffle/client/impl/TrackingBlockStatus.java 62.50% 3 Missing ⚠️
...va/org/apache/spark/shuffle/ShuffleHandleInfo.java 0.00% 2 Missing ⚠️
...va/org/apache/spark/shuffle/writer/DataPusher.java 85.71% 0 Missing and 1 partial ⚠️
...uniffle/client/response/SendShuffleDataResult.java 75.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #1445       +/-   ##
=============================================
+ Coverage          0   55.15%   +55.15%     
- Complexity        0     2820     +2820     
=============================================
  Files             0      414      +414     
  Lines             0    22136    +22136     
  Branches          0     2074     +2074     
=============================================
+ Hits              0    12210    +12210     
- Misses            0     9167     +9167     
- Partials          0      759      +759     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@xumanbu
Copy link
Contributor Author

xumanbu commented Jan 12, 2024

@zuston @jerqi PTAL

+ " of concurrent tasks."))
.createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE);

public static final ConfigEntry<Boolean> RSS_DYNAMIC_SERVER_ASSIGNMENT_ENABLED =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the config style like this:


public static final ConfigOption<Boolean> RSS_TASK_FAILED_CALLBACK_ENABLED =
      ConfigOptions.key("rss.task.failed.callback.enable")
          .booleanType()
          .defaultValue(true);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

int shuffleId, Set<String> partitionIds, String faultyShuffleServerId) {
ShuffleServerInfo newShuffleServerInfo;
synchronized (this) {
if (getReassignedFaultyServers().containsKey(faultyShuffleServerId)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract the param of getReassignedFaultyServers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

repeated string faultyServerIds = 12;
}

message ReassignShuffleServerRequest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

return newShuffleServerInfo;
}

private Map<String, ShuffleServerInfo> reassignedFaultyServers = Maps.newConcurrentMap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put this var defined in front of the class file ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get it.

if (dynamicShuffleServer == null) {
dynamicShuffleServer =
reAssignFaultyShuffleServer(partitionIds, t.getKey().getId());
if (dynamicShuffleServer == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw exception directly? Otherwise the failed blocks will be ignored.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

}
}

private boolean needReAssignShuffleServer(Set<Long> failedBlockIds) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check the blocks existence that meet the requirements, current impl looks wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it. I will modify it. thinks.

@xumanbu
Copy link
Contributor Author

xumanbu commented Jan 23, 2024

@jerqi @zuston could you help me take a look again.

@xumanbu
Copy link
Contributor Author

xumanbu commented Jan 25, 2024

I came up with an overlooked question:reportShuffleResult should rewrite. partitionToBlockIds report to primary server and replica server. but the data structure partitionToBlockIds may not be enough for record blockId -> parititionId-> serverId.

@jerqi
Copy link
Contributor

jerqi commented Jan 29, 2024

Is this ready for review?

@xumanbu
Copy link
Contributor Author

xumanbu commented Jan 29, 2024

Is this ready for review?

@zuston tips comments have been corrected. please take a look again.

@zuston
Copy link
Member

zuston commented Jan 31, 2024

Is this ready for review?

@zuston tips comments have been corrected. please take a look again.

Oh It's a big PR, I will finish review in this week.

Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left some comments. Thanks for your effort in advance @xumanbu

@github-actions
Copy link

github-actions bot commented Feb 6, 2024

Test Results

2 429 files  2 429 suites   4h 41m 54s ⏱️
  819 tests   818 ✅  1 💤 0 ❌
9 713 runs  9 699 ✅ 14 💤 0 ❌

Results for commit 368dc2b.

♻️ This comment has been updated with latest results.

zuston
zuston previously approved these changes Feb 8, 2024
Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Wait 1 day for @jerqi 's review

@zuston zuston changed the title [#1373][part-2] partition write to multi server by reassign a faulty server [#1373][part-1] feat: partition write to multi servers leveraging from reassignment mechanism Feb 8, 2024
@jerqi
Copy link
Contributor

jerqi commented Feb 8, 2024

LGTM. Wait 1 day for @jerqi 's review

I'm will review this pr today.

}

message RssReassignFaultyShuffleServerRequest{
int32 shuffleId = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use int as shuffle id? Maybe we should use this for Tez, too. @zhengchenyu @lifeSo Could you take a look?

Copy link
Collaborator

@lifeSo lifeSo Feb 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In tez, it is ok to use int as shuffle id, and will not over-flow, Here is the shuffle id calc:

private static final int SHUFFLE_ID_MAGIC = 1000;

  public static int computeShuffleId(int tezDagID, int upVertexId, int downVertexId) {
    int shuffleId = tezDagID * (SHUFFLE_ID_MAGIC * SHUFFLE_ID_MAGIC)  + upVertexId * SHUFFLE_ID_MAGIC + downVertexId;
    LOG.info("Compute Shuffle Id:{}, up vertex id:{}, down vertex id:{}", shuffleId, upVertexId, downVertexId);
    return shuffleId;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use int as shuffleId following the definitions of other messages.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.StatusCode;

public class TrackBlockStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrackBlockStatus -> TrackingBlockStatus?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TrackingBlockStatus is better. rename it.

@zuston
Copy link
Member

zuston commented Feb 20, 2024

cc @jerqi .

@zuston
Copy link
Member

zuston commented Feb 20, 2024

Do you have any plan to implement the read side change? @xumanbu

@xumanbu
Copy link
Contributor Author

xumanbu commented Feb 20, 2024

Do you have any plan to implement the read side change? @xumanbu

in progress.I'll submit next PR once ready.

@zuston zuston merged commit 59aa30d into apache:master Feb 21, 2024
@dingshun3016
Copy link
Contributor

dingshun3016 commented Feb 21, 2024

Do you have any plan to implement the read side change? @xumanbu

in progress.I'll submit next PR once ready.

let me have try

@xumanbu
Copy link
Contributor Author

xumanbu commented Feb 21, 2024

Do you have any plan to implement the read side change? @xumanbu

in progress.I'll submit next PR once ready.

let me have try

it's great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] support shuffle partition data write to multiple shuffle servers

6 participants