Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ protected DiscoveryNetwork<?> buildNetwork(
dataColumnSidecarSubnetTopicProvider,
dataColumnSidecarSubnetService,
config.getTargetSubnetSubscriberCount(),
config.getTargetPerSubnetSubscriberCount(),
subnetPeerCountGauge),
reputationManager,
Collections::shuffle))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class P2PConfig {

public static final boolean DEFAULT_PEER_ALL_TOPIC_FILTER_ENABLED = true;
public static final int DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT = 2;
public static final int DEFAULT_P2P_TARGET_PER_SUBNET_SUBSCRIBER_COUNT = -1;
public static final boolean DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED = false;
public static final boolean DEFAULT_GOSSIP_SCORING_ENABLED = true;
public static final boolean DEFAULT_GOSSIP_BLOBS_AFTER_BLOCK_ENABLED = true;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class P2PConfig {

private final GossipEncoding gossipEncoding;
private final int targetSubnetSubscriberCount;
private final OptionalInt targetPerSubnetSubscriberCount;
private final boolean subscribeAllSubnetsEnabled;
private final int custodyGroupCountOverride;
private final OptionalInt dasPublishWithholdColumnsEverySlots;
Expand Down Expand Up @@ -107,6 +109,7 @@ private P2PConfig(
final GossipConfigurator gossipConfigurator,
final GossipEncoding gossipEncoding,
final int targetSubnetSubscriberCount,
final OptionalInt targetPerSubnetSubscriberCount,
final boolean subscribeAllSubnetsEnabled,
final int custodyGroupCountOverride,
final OptionalInt dasPublishWithholdColumnsEverySlots,
Expand Down Expand Up @@ -135,6 +138,7 @@ private P2PConfig(
this.gossipConfigurator = gossipConfigurator;
this.gossipEncoding = gossipEncoding;
this.targetSubnetSubscriberCount = targetSubnetSubscriberCount;
this.targetPerSubnetSubscriberCount = targetPerSubnetSubscriberCount;
this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled;
this.custodyGroupCountOverride = custodyGroupCountOverride;
this.dasPublishWithholdColumnsEverySlots = dasPublishWithholdColumnsEverySlots;
Expand Down Expand Up @@ -188,6 +192,10 @@ public int getTargetSubnetSubscriberCount() {
return targetSubnetSubscriberCount;
}

public OptionalInt getTargetPerSubnetSubscriberCount() {
return targetPerSubnetSubscriberCount;
}

public boolean isSubscribeAllSubnetsEnabled() {
return subscribeAllSubnetsEnabled;
}
Expand Down Expand Up @@ -292,6 +300,7 @@ public static class Builder {
private Boolean isGossipScoringEnabled = DEFAULT_GOSSIP_SCORING_ENABLED;
private final GossipEncoding gossipEncoding = GossipEncoding.SSZ_SNAPPY;
private Integer targetSubnetSubscriberCount = DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT;
private Integer targetPerSubnetSubscriberCount = DEFAULT_P2P_TARGET_PER_SUBNET_SUBSCRIBER_COUNT;
private Boolean subscribeAllSubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED;
private Boolean subscribeAllCustodySubnetsEnabled = DEFAULT_SUBSCRIBE_ALL_SUBNETS_ENABLED;
private int custodyGroupCountOverride = DEFAULT_CUSTODY_GROUP_COUNT_OVERRIDE;
Expand Down Expand Up @@ -363,6 +372,10 @@ public P2PConfig build() {
dasPublishWithholdColumnsEverySlotsOptional =
OptionalInt.of(dasPublishWithholdColumnsEverySlots);
}
final OptionalInt targetPerSubnetSubscriberCountOptional =
targetPerSubnetSubscriberCount >= 0
? OptionalInt.of(targetPerSubnetSubscriberCount)
: OptionalInt.empty();

return new P2PConfig(
spec,
Expand All @@ -371,6 +384,7 @@ public P2PConfig build() {
gossipConfigurator,
gossipEncoding,
targetSubnetSubscriberCount,
targetPerSubnetSubscriberCountOptional,
subscribeAllSubnetsEnabled,
custodyGroupCountOverride,
dasPublishWithholdColumnsEverySlotsOptional,
Expand Down Expand Up @@ -431,6 +445,12 @@ public Builder targetSubnetSubscriberCount(final Integer targetSubnetSubscriberC
return this;
}

public Builder targetPerSubnetSubscriberCount(final Integer targetPerSubnetSubscriberCount) {
checkNotNull(targetPerSubnetSubscriberCount);
this.targetPerSubnetSubscriberCount = targetPerSubnetSubscriberCount;
return this;
}

public Builder subscribeAllSubnetsEnabled(final Boolean subscribeAllSubnetsEnabled) {
checkNotNull(subscribeAllSubnetsEnabled);
this.subscribeAllSubnetsEnabled = subscribeAllSubnetsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,21 @@ public class PeerSubnetSubscriptions {
private final NodeIdToDataColumnSidecarSubnetsCalculator
nodeIdToDataColumnSidecarSubnetsCalculator;
private final int targetSubnetSubscriberCount;
private final OptionalInt targetPerSubnetSubscriberCount;

private PeerSubnetSubscriptions(
final SubnetSubscriptions attestationSubnetSubscriptions,
final SubnetSubscriptions syncCommitteeSubnetSubscriptions,
final SubnetSubscriptions dataColumnSidecarSubnetSubscriptions,
final NodeIdToDataColumnSidecarSubnetsCalculator nodeIdToDataColumnSidecarSubnetsCalculator,
final int targetSubnetSubscriberCount) {
final int targetSubnetSubscriberCount,
final OptionalInt targetPerSubnetSubscriberCount) {
this.attestationSubnetSubscriptions = attestationSubnetSubscriptions;
this.syncCommitteeSubnetSubscriptions = syncCommitteeSubnetSubscriptions;
this.dataColumnSidecarSubnetSubscriptions = dataColumnSidecarSubnetSubscriptions;
this.nodeIdToDataColumnSidecarSubnetsCalculator = nodeIdToDataColumnSidecarSubnetsCalculator;
this.targetSubnetSubscriberCount = targetSubnetSubscriberCount;
this.targetPerSubnetSubscriberCount = targetPerSubnetSubscriberCount;
}

public static PeerSubnetSubscriptions create(
Expand All @@ -72,6 +75,7 @@ public static PeerSubnetSubscriptions create(
final DataColumnSidecarSubnetTopicProvider dataColumnSidecarSubnetTopicProvider,
final SubnetSubscriptionService dataColumnSidecarSubnetService,
final int targetSubnetSubscriberCount,
final OptionalInt targetPerSubnetSubscriberCount,
final SettableLabelledGauge subnetPeerCountGauge) {
final Map<String, Collection<NodeId>> subscribersByTopic = network.getSubscribersByTopic();

Expand All @@ -87,6 +91,7 @@ public static PeerSubnetSubscriptions create(
final PeerSubnetSubscriptions subscriptions =
builder(currentSchemaDefinitions, SszBitvectorSchema.create(dataColumnSidecarSubnetCount))
.targetSubnetSubscriberCount(targetSubnetSubscriberCount)
.targetPerSubnetSubscriberCount(targetPerSubnetSubscriberCount)
.nodeIdToDataColumnSidecarSubnetsCalculator(nodeIdToDataColumnSidecarSubnetsCalculator)
.attestationSubnetSubscriptions(
b ->
Expand Down Expand Up @@ -244,11 +249,16 @@ public PeerScorer createScorer() {
}

public int getSubscribersRequired() {
OptionalInt count = getMinSubscriberCount();
if (count.isPresent()) {
return Math.max(targetSubnetSubscriberCount - count.getAsInt(), 0);
final OptionalInt minSubscribersCount = getMinSubscriberCount();
final OptionalInt targetSubscribersShortage = getTargetSubscribersShortage();
final int allSubnetSubscribersShortage =
minSubscribersCount.isPresent()
? Math.max(targetSubnetSubscriberCount - minSubscribersCount.getAsInt(), 0)
: 0;
if (targetSubscribersShortage.isPresent()) {
return Math.max(targetSubscribersShortage.getAsInt(), allSubnetSubscribersShortage);
} else {
return 0;
return allSubnetSubscribersShortage;
}
}

Expand All @@ -260,6 +270,38 @@ private OptionalInt getMinSubscriberCount() {
dataColumnSidecarSubnetSubscriptions.getMinSubscriberCount()));
}

private OptionalInt getTargetSubscribersShortage() {
if (targetPerSubnetSubscriberCount.isEmpty()) {
return OptionalInt.empty();
}

final int attestationSubnetSubscribersShortage =
calcSubnetSubscribersShortage(
attestationSubnetSubscriptions, targetPerSubnetSubscriberCount.getAsInt());
final int syncCommitteeSubnetSubscribersShortage =
calcSubnetSubscribersShortage(
syncCommitteeSubnetSubscriptions, targetPerSubnetSubscriberCount.getAsInt());
final int dataColumnSidecarSubnetSubscribersShortage =
calcSubnetSubscribersShortage(
dataColumnSidecarSubnetSubscriptions, targetPerSubnetSubscriberCount.getAsInt());

return OptionalInt.of(
attestationSubnetSubscribersShortage
+ syncCommitteeSubnetSubscribersShortage
+ dataColumnSidecarSubnetSubscribersShortage);
}

protected int calcSubnetSubscribersShortage(
final SubnetSubscriptions subnetSubscriptions, final int perSubnetTarget) {
final long shortage =
perSubnetTarget * subnetSubscriptions.streamRelevantSubnets().count()
- subnetSubscriptions
.streamRelevantSubnets()
.mapToLong(subnetSubscriptions::getSubscriberCountForSubnet)
.sum();
return (int) Math.max(shortage, 0);
}

private static OptionalInt optionalMin(final List<OptionalInt> optionalInts) {
return optionalInts.stream().flatMapToInt(OptionalInt::stream).min();
}
Expand Down Expand Up @@ -366,6 +408,7 @@ public static class Builder {
private final SubnetSubscriptions.Builder dataColumnSidecarSubnetSubscriptions;
private NodeIdToDataColumnSidecarSubnetsCalculator nodeIdToDataColumnSidecarSubnetsCalculator;
private int targetSubnetSubscriberCount = 2;
private OptionalInt targetPerSubnetSubscriberCount = OptionalInt.empty();

private Builder(
final SchemaDefinitionsSupplier currentSchemaDefinitions,
Expand All @@ -384,7 +427,8 @@ public PeerSubnetSubscriptions build() {
syncCommitteeSubnetSubscriptions.build(),
dataColumnSidecarSubnetSubscriptions.build(),
nodeIdToDataColumnSidecarSubnetsCalculator,
targetSubnetSubscriberCount);
targetSubnetSubscriberCount,
targetPerSubnetSubscriberCount);
}

public Builder targetSubnetSubscriberCount(final int targetSubnetSubscriberCount) {
Expand All @@ -396,6 +440,12 @@ public Builder targetSubnetSubscriberCount(final int targetSubnetSubscriberCount
return this;
}

public Builder targetPerSubnetSubscriberCount(
final OptionalInt targetPerSubnetSubscriberCount) {
this.targetPerSubnetSubscriberCount = targetPerSubnetSubscriberCount;
return this;
}

public Builder attestationSubnetSubscriptions(
final Consumer<SubnetSubscriptions.Builder> consumer) {
consumer.accept(attestationSubnetSubscriptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.IntStream;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -151,6 +152,49 @@ public void getSubscribersRequired_allSubnetsAreJustBelowTarget() {
assertThat(requiredPeers).isEqualTo(TARGET_SUBSCRIBER_COUNT - 1);
}

@Test
public void getSubscribersRequiredWithTargetPerSubnetSubscribers() {
// Set up some sync committee subnets we should be participating in
syncnetSubscriptions.setSubscriptions(IntList.of(0, 1));
dataColumnSubscriptions.setSubscriptions(IntList.of(0, 1));

// Set up subscribers
final List<NodeId> subscribers1 = new ArrayList<>();
IntStream.range(0, 1).mapToObj(MockNodeId::new).forEach(subscribers1::add);
final List<NodeId> subscribers2 = new ArrayList<>();
IntStream.range(0, 2).mapToObj(MockNodeId::new).forEach(subscribers2::add);
// Set up attestation topic subscriptions
final Map<String, Collection<NodeId>> subscribersByTopic = new HashMap<>();
// 10 attestation subnets x2 peers
IntStream.range(0, 10)
.forEach(
subnetId ->
subscribersByTopic.put(
attestationTopicProvider.getTopicForSubnet(subnetId), subscribers2));
// 54 attestation subnets x1 peer
IntStream.range(10, spec.getNetworkingConfig().getAttestationSubnetCount())
.forEach(
subnetId ->
subscribersByTopic.put(
attestationTopicProvider.getTopicForSubnet(subnetId), subscribers1));
// 2 sync committee subnets x2 peers
syncnetSubscriptions
.getSubnets()
.forEach(
subnetId ->
subscribersByTopic.put(
syncCommitteeTopicProvider.getTopicForSubnet(subnetId), subscribers2));
// all peers for data column sidecars 0 topic, no for 1
subscribersByTopic.put(dataColumnSidecarSubnetTopicProvider.getTopicForSubnet(0), subscribers2);

when(gossipNetwork.getSubscribersByTopic()).thenReturn(subscribersByTopic);

final int requiredPeers = createPeerSubnetSubscriptions(2).getSubscribersRequired();
// Missing: 0 for sync committees, 54 subnets x 1 peers for attestations, 1 subnet x 2 peers for
// sidecars
assertThat(requiredPeers).isEqualTo(56);
}

@Test
public void getSubscribersRequired_allSubnetsHaveExactlyEnoughSubscribers() {
// Set up some sync committee subnets we should be participating in
Expand Down Expand Up @@ -257,6 +301,23 @@ private PeerSubnetSubscriptions createPeerSubnetSubscriptions() {
dataColumnSidecarSubnetTopicProvider,
dataColumnSubscriptions,
TARGET_SUBSCRIBER_COUNT,
OptionalInt.empty(),
subnetPeerCountGauge);
}

private PeerSubnetSubscriptions createPeerSubnetSubscriptions(
final int targetPerSubnetSubscribers) {
return PeerSubnetSubscriptions.create(
currentSpecVersionSupplier.get(),
NodeIdToDataColumnSidecarSubnetsCalculator.NOOP,
gossipNetwork,
attestationTopicProvider,
syncCommitteeTopicProvider,
syncnetSubscriptions,
dataColumnSidecarSubnetTopicProvider,
dataColumnSubscriptions,
TARGET_SUBSCRIBER_COUNT,
OptionalInt.of(targetPerSubnetSubscribers),
subnetPeerCountGauge);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -354,6 +355,7 @@ protected Eth2P2PNetwork buildNetwork(final P2PConfig config) {
dataColumnSidecarSubnetTopicProvider,
dataColumnSidecarSubnetService,
config.getTargetSubnetSubscriberCount(),
OptionalInt.empty(),
subnetPeerCountGauge),
reputationManager,
Collections::shuffle))
Expand Down
13 changes: 12 additions & 1 deletion teku/src/main/java/tech/pegasys/teku/cli/options/P2POptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,21 @@ The network interface(s) on which the node listens for P2P communication.
@Option(
names = {"--Xp2p-target-subnet-subscriber-count"},
paramLabel = "<INTEGER>",
description = "Target number of peers subscribed to each attestation subnet",
description = "Target number of peers subscribed to all subnets.",
arity = "1",
hidden = true)
private int p2pTargetSubnetSubscriberCount = P2PConfig.DEFAULT_P2P_TARGET_SUBNET_SUBSCRIBER_COUNT;

@Option(
names = {"--Xp2p-target-per-subnet-subscriber-count"},
paramLabel = "<INTEGER>",
description =
"Target number of peers subscribed to each subnet including attestation, sync committee, data column sidecars",
arity = "1",
hidden = true)
private int p2pTargetPerSubnetSubscriberCount =
P2PConfig.DEFAULT_P2P_TARGET_PER_SUBNET_SUBSCRIBER_COUNT;

@Option(
names = {"--Xp2p-minimum-randomly-selected-peer-count"},
paramLabel = "<INTEGER>",
Expand Down Expand Up @@ -683,6 +693,7 @@ public void configure(final TekuConfiguration.Builder builder) {
.batchVerifyMaxBatchSize(batchVerifyMaxBatchSize)
.batchVerifyStrictThreadLimitEnabled(batchVerifyStrictThreadLimitEnabled)
.targetSubnetSubscriberCount(p2pTargetSubnetSubscriberCount)
.targetPerSubnetSubscriberCount(p2pTargetPerSubnetSubscriberCount)
.isGossipScoringEnabled(gossipScoringEnabled)
.peerBlocksRateLimit(peerBlocksRateLimit)
.peerBlobSidecarsRateLimit(peerBlobSidecarsRateLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.List;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Fail;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void shouldReadFromConfigurationFile() {

final P2PConfig p2pConfig = tekuConfig.p2p();
assertThat(p2pConfig.getTargetSubnetSubscriberCount()).isEqualTo(5);
assertThat(p2pConfig.getTargetPerSubnetSubscriberCount()).isEqualTo(OptionalInt.of(2));
assertThat(p2pConfig.getPeerBlocksRateLimit()).isEqualTo(100);
assertThat(p2pConfig.getPeerBlobSidecarsRateLimit()).isEqualTo(400);
assertThat(p2pConfig.getPeerRequestLimit()).isEqualTo(101);
Expand Down
1 change: 1 addition & 0 deletions teku/src/test/resources/P2POptions_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ p2p-static-peers: "127.1.0.1,127.1.1.1"
p2p-peer-lower-bound: 70
p2p-peer-upper-bound: 85
Xp2p-target-subnet-subscriber-count: 5
Xp2p-target-per-subnet-subscriber-count: 2
Xp2p-minimum-randomly-selected-peer-count: 1
Xpeer-blocks-rate-limit: 100
Xpeer-blob-sidecars-rate-limit: 400
Expand Down
Loading