-
Notifications
You must be signed in to change notification settings - Fork 168
[#1373][part-1] feat: partition write to multi servers leveraging from reassignment mechanism #1445
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| + " of concurrent tasks.")) | ||
| .createWithDefault(RssClientConfig.RSS_ESTIMATE_SERVER_ASSIGNMENT_ENABLED_DEFAULT_VALUE); | ||
|
|
||
| public static final ConfigEntry<Boolean> RSS_DYNAMIC_SERVER_ASSIGNMENT_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the config style like this:
public static final ConfigOption<Boolean> RSS_TASK_FAILED_CALLBACK_ENABLED =
ConfigOptions.key("rss.task.failed.callback.enable")
.booleanType()
.defaultValue(true);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it.
| int shuffleId, Set<String> partitionIds, String faultyShuffleServerId) { | ||
| ShuffleServerInfo newShuffleServerInfo; | ||
| synchronized (this) { | ||
| if (getReassignedFaultyServers().containsKey(faultyShuffleServerId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about extract the param of getReassignedFaultyServers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
proto/src/main/proto/Rss.proto
Outdated
| repeated string faultyServerIds = 12; | ||
| } | ||
|
|
||
| message ReassignShuffleServerRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it.
| return newShuffleServerInfo; | ||
| } | ||
|
|
||
| private Map<String, ShuffleServerInfo> reassignedFaultyServers = Maps.newConcurrentMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you put this var defined in front of the class file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get it.
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Show resolved
Hide resolved
| if (dynamicShuffleServer == null) { | ||
| dynamicShuffleServer = | ||
| reAssignFaultyShuffleServer(partitionIds, t.getKey().getId()); | ||
| if (dynamicShuffleServer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw exception directly? Otherwise the failed blocks will be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
| } | ||
| } | ||
|
|
||
| private boolean needReAssignShuffleServer(Set<Long> failedBlockIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should check the blocks existence that meet the requirements, current impl looks wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it. I will modify it. thinks.
client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
Show resolved
Hide resolved
|
I came up with an overlooked question:reportShuffleResult should rewrite. |
b35b34e to
f586a24
Compare
|
Is this ready for review? |
f586a24 to
e2aad86
Compare
@zuston tips comments have been corrected. please take a look again. |
Oh It's a big PR, I will finish review in this week. |
zuston
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some comments. Thanks for your effort in advance @xumanbu
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/impl/FailedBlockSendTracker.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
Test Results2 429 files 2 429 suites 4h 41m 54s ⏱️ Results for commit 368dc2b. ♻️ This comment has been updated with latest results. |
zuston
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Wait 1 day for @jerqi 's review
I'm will review this pr today. |
| } | ||
|
|
||
| message RssReassignFaultyShuffleServerRequest{ | ||
| int32 shuffleId = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use int as shuffle id? Maybe we should use this for Tez, too. @zhengchenyu @lifeSo Could you take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In tez, it is ok to use int as shuffle id, and will not over-flow, Here is the shuffle id calc:
private static final int SHUFFLE_ID_MAGIC = 1000;
public static int computeShuffleId(int tezDagID, int upVertexId, int downVertexId) {
int shuffleId = tezDagID * (SHUFFLE_ID_MAGIC * SHUFFLE_ID_MAGIC) + upVertexId * SHUFFLE_ID_MAGIC + downVertexId;
LOG.info("Compute Shuffle Id:{}, up vertex id:{}, down vertex id:{}", shuffleId, upVertexId, downVertexId);
return shuffleId;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use int as shuffleId following the definitions of other messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
| import org.apache.uniffle.common.ShuffleServerInfo; | ||
| import org.apache.uniffle.common.rpc.StatusCode; | ||
|
|
||
| public class TrackBlockStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TrackBlockStatus -> TrackingBlockStatus?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TrackingBlockStatus is better. rename it.
|
cc @jerqi . |
|
Do you have any plan to implement the read side change? @xumanbu |
in progress.I'll submit next PR once ready. |
let me have try |
it's great. |
What changes were proposed in this pull request?
(Please outline the changes and how this PR fixes the issue.)
Why are the changes needed?
Fix: #1373
Does this PR introduce any user-facing change?
1、add config
rss.server.dynamic.assign.enabledfor whether to reassign a faulty shuffle server.2、support reassign a new shuffle server for send failed blocks
3、ShuffleReader read partition in muitl server implement will in next pr
How was this patch tested?
wait follow-up