Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;

public interface DataColumnSidecarDB extends DataColumnSidecarCoreDB {

static DataColumnSidecarDB create(
final CombinedChainDataClient combinedChainDataClient,
final SidecarQueryChannel sidecarQueryChannel,
final SidecarUpdateChannel sidecarUpdateChannel) {
return new DataColumnSidecarDBImpl(combinedChainDataClient, sidecarUpdateChannel);
return new DataColumnSidecarDBImpl(sidecarQueryChannel, sidecarUpdateChannel);
}

// read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,37 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;

class DataColumnSidecarDBImpl implements DataColumnSidecarDB {
private static final Logger LOG = LogManager.getLogger();

private final CombinedChainDataClient combinedChainDataClient;
private final SidecarQueryChannel sidecarQueryChannel;
private final SidecarUpdateChannel sidecarUpdateChannel;
private final DetailLogger detailLogger = new DetailLogger();

public DataColumnSidecarDBImpl(
final CombinedChainDataClient combinedChainDataClient,
final SidecarQueryChannel sidecarQueryChannel,
final SidecarUpdateChannel sidecarUpdateChannel) {
this.combinedChainDataClient = combinedChainDataClient;
this.sidecarQueryChannel = sidecarQueryChannel;
this.sidecarUpdateChannel = sidecarUpdateChannel;
}

@Override
public SafeFuture<Optional<UInt64>> getFirstCustodyIncompleteSlot() {
return combinedChainDataClient.getFirstCustodyIncompleteSlot();
return sidecarQueryChannel.getFirstCustodyIncompleteSlot();
}

@Override
public SafeFuture<Optional<DataColumnSidecar>> getSidecar(
final DataColumnSlotAndIdentifier identifier) {
return combinedChainDataClient.getSidecar(identifier);
return sidecarQueryChannel.getSidecar(identifier);
}

@Override
public SafeFuture<List<DataColumnSlotAndIdentifier>> getColumnIdentifiers(final UInt64 slot) {
return combinedChainDataClient.getDataColumnIdentifiers(slot);
return sidecarQueryChannel.getDataColumnIdentifiers(slot);
}

@Override
Expand All @@ -74,7 +74,7 @@ public SafeFuture<Void> addSidecar(final DataColumnSidecar sidecar) {

@Override
public SafeFuture<Optional<UInt64>> getEarliestAvailableDataColumnSlot() {
return combinedChainDataClient.getEarliestAvailableDataColumnSlot();
return sidecarQueryChannel.getEarliestAvailableDataColumnSlot();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel;
import tech.pegasys.teku.storage.api.LateBlockReorgPreparationHandler;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;
import tech.pegasys.teku.storage.api.StorageQueryChannel;
import tech.pegasys.teku.storage.api.StorageUpdateChannel;
Expand Down Expand Up @@ -908,7 +909,7 @@ protected void initDasCustody() {

final DataColumnSidecarDB sidecarDB =
DataColumnSidecarDB.create(
combinedChainDataClient,
eventChannels.getPublisher(SidecarQueryChannel.class, beaconAsyncRunner),
eventChannels.getPublisher(SidecarUpdateChannel.class, beaconAsyncRunner));
this.sidecarDB = Optional.of(sidecarDB);
final DataColumnSidecarDbAccessor dbAccessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import tech.pegasys.teku.spec.networks.Eth2Network;
import tech.pegasys.teku.storage.api.CombinedStorageChannel;
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;
import tech.pegasys.teku.storage.api.VoteUpdateChannel;
import tech.pegasys.teku.storage.archive.BlobSidecarsArchiver;
Expand All @@ -47,6 +48,7 @@
import tech.pegasys.teku.storage.server.DatabaseVersion;
import tech.pegasys.teku.storage.server.DepositStorage;
import tech.pegasys.teku.storage.server.RetryingStorageUpdateChannel;
import tech.pegasys.teku.storage.server.SidecarStorageChannelSplitter;
import tech.pegasys.teku.storage.server.StorageConfiguration;
import tech.pegasys.teku.storage.server.VersionedDatabaseFactory;
import tech.pegasys.teku.storage.server.network.EphemeryException;
Expand Down Expand Up @@ -237,11 +239,18 @@ protected SafeFuture<?> doStart() {
chainStorage, serviceConfig.getTimeProvider()),
chainStorage));

final SidecarStorageChannelSplitter sidecarStorageChannelSplitter =
new SidecarStorageChannelSplitter(
serviceConfig.createAsyncRunner("sidecar_storage_query", 1),
chainStorage,
chainStorage);

eventChannels
.subscribe(Eth1DepositStorageChannel.class, depositStorage)
.subscribe(Eth1EventsChannel.class, depositStorage)
.subscribe(VoteUpdateChannel.class, batchingVoteUpdateChannel)
.subscribe(SidecarUpdateChannel.class, chainStorage);
.subscribe(SidecarUpdateChannel.class, sidecarStorageChannelSplitter)
.subscribe(SidecarQueryChannel.class, sidecarStorageChannelSplitter);
})
.thenCompose(
__ ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.api;

import java.util.List;
import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.kzg.KZGProof;
import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;

/** Query channel for data column sidecar storage operations. */
public interface SidecarQueryChannel extends ChannelInterface {

SafeFuture<Optional<UInt64>> getFirstCustodyIncompleteSlot();

SafeFuture<Optional<UInt64>> getEarliestAvailableDataColumnSlot();

SafeFuture<Optional<DataColumnSidecar>> getSidecar(DataColumnSlotAndIdentifier identifier);

SafeFuture<Optional<DataColumnSidecar>> getNonCanonicalSidecar(
DataColumnSlotAndIdentifier identifier);

SafeFuture<List<DataColumnSlotAndIdentifier>> getDataColumnIdentifiers(UInt64 slot);

SafeFuture<List<DataColumnSlotAndIdentifier>> getNonCanonicalDataColumnIdentifiers(UInt64 slot);

SafeFuture<List<DataColumnSlotAndIdentifier>> getDataColumnIdentifiers(
UInt64 startSlot, UInt64 endSlot, UInt64 limit);

SafeFuture<Optional<UInt64>> getEarliestDataColumnSidecarSlot();

SafeFuture<Optional<List<List<KZGProof>>>> getDataColumnSidecarsProofs(UInt64 slot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import tech.pegasys.teku.spec.datastructures.util.SlotAndBlockRootAndBlobIndex;
import tech.pegasys.teku.storage.api.ChainStorageFacade;
import tech.pegasys.teku.storage.api.OnDiskStoreData;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;
import tech.pegasys.teku.storage.api.StorageQueryChannel;
import tech.pegasys.teku.storage.api.StorageUpdate;
Expand All @@ -60,6 +61,7 @@ public class ChainStorage
StorageQueryChannel,
VoteUpdateChannel,
SidecarUpdateChannel,
SidecarQueryChannel,
ChainStorageFacade {
private static final Logger LOG = LogManager.getLogger();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server;

import java.util.List;
import java.util.Optional;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.kzg.KZGProof;
import tech.pegasys.teku.spec.datastructures.blobs.DataColumnSidecar;
import tech.pegasys.teku.spec.datastructures.util.DataColumnSlotAndIdentifier;
import tech.pegasys.teku.storage.api.SidecarQueryChannel;
import tech.pegasys.teku.storage.api.SidecarUpdateChannel;

/**
* Splits sidecar storage operations into separate {@link SidecarUpdateChannel} and {@link
* SidecarQueryChannel} components with updates being handled synchronously and queries being run
* asynchronously.
*
* <p>This guarantees that queries are only ever processed after the updates that were sent before
* them but without allowing queries to delay updates.
*
* <p>This splitter is isolated from {@link CombinedStorageChannelSplitter} to ensure data column
* operations don't interfere with block/state operations under high load.
*/
public class SidecarStorageChannelSplitter implements SidecarUpdateChannel, SidecarQueryChannel {

private final AsyncRunner asyncRunner;
private final SidecarUpdateChannel updateDelegate;
private final SidecarQueryChannel queryDelegate;

public SidecarStorageChannelSplitter(
final AsyncRunner asyncRunner,
final SidecarUpdateChannel updateDelegate,
final SidecarQueryChannel queryDelegate) {
this.asyncRunner = asyncRunner;
this.updateDelegate = updateDelegate;
this.queryDelegate = queryDelegate;
}

// === Update methods (synchronous) ===

@Override
public SafeFuture<Void> onFirstCustodyIncompleteSlot(final UInt64 slot) {
return updateDelegate.onFirstCustodyIncompleteSlot(slot);
}

@Override
public SafeFuture<Void> onEarliestAvailableDataColumnSlot(final UInt64 slot) {
return updateDelegate.onEarliestAvailableDataColumnSlot(slot);
}

@Override
public SafeFuture<Void> onNewSidecar(final DataColumnSidecar sidecar) {
return updateDelegate.onNewSidecar(sidecar);
}

@Override
public SafeFuture<Void> onNewNonCanonicalSidecar(final DataColumnSidecar sidecar) {
return updateDelegate.onNewNonCanonicalSidecar(sidecar);
}

// === Query methods (asynchronous via asyncRunner) ===

@Override
public SafeFuture<Optional<UInt64>> getFirstCustodyIncompleteSlot() {
return asyncRunner.runAsync(queryDelegate::getFirstCustodyIncompleteSlot);
}

@Override
public SafeFuture<Optional<UInt64>> getEarliestAvailableDataColumnSlot() {
return asyncRunner.runAsync(queryDelegate::getEarliestAvailableDataColumnSlot);
}

@Override
public SafeFuture<Optional<DataColumnSidecar>> getSidecar(
final DataColumnSlotAndIdentifier identifier) {
return asyncRunner.runAsync(() -> queryDelegate.getSidecar(identifier));
}

@Override
public SafeFuture<Optional<DataColumnSidecar>> getNonCanonicalSidecar(
final DataColumnSlotAndIdentifier identifier) {
return asyncRunner.runAsync(() -> queryDelegate.getNonCanonicalSidecar(identifier));
}

@Override
public SafeFuture<List<DataColumnSlotAndIdentifier>> getDataColumnIdentifiers(final UInt64 slot) {
return asyncRunner.runAsync(() -> queryDelegate.getDataColumnIdentifiers(slot));
}

@Override
public SafeFuture<List<DataColumnSlotAndIdentifier>> getNonCanonicalDataColumnIdentifiers(
final UInt64 slot) {
return asyncRunner.runAsync(() -> queryDelegate.getNonCanonicalDataColumnIdentifiers(slot));
}

@Override
public SafeFuture<List<DataColumnSlotAndIdentifier>> getDataColumnIdentifiers(
final UInt64 startSlot, final UInt64 endSlot, final UInt64 limit) {
return asyncRunner.runAsync(
() -> queryDelegate.getDataColumnIdentifiers(startSlot, endSlot, limit));
}

@Override
public SafeFuture<Optional<UInt64>> getEarliestDataColumnSidecarSlot() {
return asyncRunner.runAsync(queryDelegate::getEarliestDataColumnSidecarSlot);
}

@Override
public SafeFuture<Optional<List<List<KZGProof>>>> getDataColumnSidecarsProofs(final UInt64 slot) {
return asyncRunner.runAsync(() -> queryDelegate.getDataColumnSidecarsProofs(slot));
}
}
Loading