Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/common/tagged.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ namespace kagome {
}
};

template <
typename T,
typename Tag,
typename Base = std::conditional_t<std::is_scalar_v<T>, Wrapper<T>, T>>
inline std::ostream &operator<<(std::ostream &os,
const Tagged<T, Tag, Base> &view) {
return os << (const T &)view;
}

} // namespace kagome

#endif // KAGOME_TAGGED
4 changes: 4 additions & 0 deletions core/network/types/collator_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ namespace kagome::network {
};
using SignedStatement = IndexedAndSigned<Statement>;

inline std::ostream &operator<<(std::ostream &os, const SignedStatement &t) {
return os << "Statement (validator index:" << t.payload.ix << ')';
}

struct Seconded {
SCALE_TIE(2);

Expand Down
15 changes: 10 additions & 5 deletions core/parachain/backing/store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef KAGOME_PARACHAIN_BACKING_STORE_HPP
#define KAGOME_PARACHAIN_BACKING_STORE_HPP

#include "common/tagged.hpp"
#include "network/types/collator_messages.hpp"

namespace kagome::parachain {
Expand All @@ -28,13 +29,17 @@ namespace kagome::parachain {
uint64_t validity_votes;
};

using ValidityVoteIssued = Tagged<Statement, struct Issued>;
using ValidityVoteValid = Tagged<Statement, struct Valid>;
using ValidityVote = boost::variant<ValidityVoteIssued, ValidityVoteValid>;
Comment thread
iceseer marked this conversation as resolved.

using StatementInfo =
std::pair<network::ParachainId, std::map<ValidatorIndex, Statement>>;
std::pair<network::ParachainId, std::map<ValidatorIndex, ValidityVote>>;

virtual ~BackingStore() = default;

virtual std::optional<ImportResult> put(
std::unordered_map<ParachainId, std::vector<ValidatorIndex>> const
const std::unordered_map<ParachainId, std::vector<ValidatorIndex>>
&groups,
Statement statement) = 0;

Expand All @@ -47,10 +52,10 @@ namespace kagome::parachain {
BackedCandidate &&candidate) = 0;

virtual std::optional<network::CommittedCandidateReceipt> get_candidate(
network::CandidateHash const &candidate_hash) const = 0;
const network::CandidateHash &candidate_hash) const = 0;

virtual std::optional<std::reference_wrapper<StatementInfo const>>
get_validity_votes(network::CandidateHash const &candidate_hash) const = 0;
virtual std::optional<std::reference_wrapper<const StatementInfo>>
get_validity_votes(const network::CandidateHash &candidate_hash) const = 0;
};
} // namespace kagome::parachain

Expand Down
34 changes: 22 additions & 12 deletions core/parachain/backing/store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ namespace kagome::parachain {
void BackingStoreImpl::remove(const BlockHash &relay_parent) {
backed_candidates_.erase(relay_parent);
if (auto it = candidates_.find(relay_parent); it != candidates_.end()) {
for (auto const &candidate : it->second) {
for (const auto &candidate : it->second) {
statements_.erase(candidate);
}
candidates_.erase(it);
}
}

bool BackingStoreImpl::is_in_group(
std::unordered_map<ParachainId, std::vector<ValidatorIndex>> const
const std::unordered_map<ParachainId, std::vector<ValidatorIndex>>
&groups,
GroupIndex group,
ValidatorIndex authority) {
if (auto it = groups.find(group); it != groups.end()) {
for (auto const a : it->second) {
for (const auto a : it->second) {
if (a == authority) {
return true;
}
Expand All @@ -39,7 +39,7 @@ namespace kagome::parachain {
}

std::optional<BackingStore::ImportResult> BackingStoreImpl::put(
std::unordered_map<ParachainId, std::vector<ValidatorIndex>> const
const std::unordered_map<ParachainId, std::vector<ValidatorIndex>>
&groups,
Statement statement) {
auto candidate_hash =
Expand All @@ -51,31 +51,34 @@ namespace kagome::parachain {
return std::nullopt;
}
s = &s_it->second;
s->second.emplace(statement.payload.ix,
ValidityVoteValid{std::move(statement)});
} else if (auto seconded{boost::get<network::CommittedCandidateReceipt>(
&statement.payload.payload.candidate_state)}) {
auto const group = seconded->descriptor.para_id;
const auto group = seconded->descriptor.para_id;
if (!is_in_group(groups, group, statement.payload.ix)) {
return std::nullopt;
}

s = &statements_[candidate_hash];
s->first = seconded->descriptor.para_id;
candidates_[seconded->descriptor.relay_parent].emplace(candidate_hash);
s->second.emplace(statement.payload.ix,
ValidityVoteIssued{std::move(statement)});
} else {
return std::nullopt;
}

s->second.emplace(statement.payload.ix, std::move(statement));
return BackingStore::ImportResult{
.candidate = candidate_hash,
.group_id = s->first,
.validity_votes = s->second.size(),
};
}

std::optional<std::reference_wrapper<BackingStore::StatementInfo const>>
std::optional<std::reference_wrapper<const BackingStore::StatementInfo>>
BackingStoreImpl::get_validity_votes(
network::CandidateHash const &candidate_hash) const {
const network::CandidateHash &candidate_hash) const {
if (auto it = statements_.find(candidate_hash); it != statements_.end()) {
return {{it->second}};
}
Expand All @@ -89,11 +92,18 @@ namespace kagome::parachain {

std::optional<network::CommittedCandidateReceipt>
BackingStoreImpl::get_candidate(
network::CandidateHash const &candidate_hash) const {
const network::CandidateHash &candidate_hash) const {
if (auto it = statements_.find(candidate_hash); it != statements_.end()) {
for (auto &[_, statement] : it->second.second) {
if (auto seconded = boost::get<network::CommittedCandidateReceipt>(
&statement.payload.payload.candidate_state)) {
for (auto &[_, validity_vote] : it->second.second) {
const auto &statement = visit_in_place(
validity_vote,
[](const auto &val) -> std::reference_wrapper<Statement> {
return {(Statement &)val};
});

if (auto seconded =
boost::get<const network::CommittedCandidateReceipt>(
&statement.get().payload.payload.candidate_state)) {
return *seconded;
}
}
Expand Down
159 changes: 101 additions & 58 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,19 @@ namespace kagome::parachain {
if (auto const value = if_type<
const primitives::events::RemoveAfterFinalizationParams>(
event)) {
for (auto const &lost : value->get()) {
SL_TRACE(self->logger_,
"Remove from storages.(relay parent={})",
lost);

self->backing_store_->remove(lost);
self->av_store_->remove(lost);
self->bitfield_store_->remove(lost);
}
self->our_current_state_.active_leaves.exclusiveAccess(
[&](auto &active_leaves) {
for (auto const &lost : value->get()) {
SL_TRACE(self->logger_,
"Remove from storages.(relay parent={})",
lost);

self->backing_store_->remove(lost);
self->av_store_->remove(lost);
self->bitfield_store_->remove(lost);
active_leaves.erase(lost);
}
});
}
}
});
Expand All @@ -220,40 +224,46 @@ namespace kagome::parachain {
peer_view_->getMyViewObservable(), false);
my_view_sub_->subscribe(my_view_sub_->generateSubscriptionSetId(),
network::PeerView::EventType::kViewUpdated);
my_view_sub_->setCallback([wptr{weak_from_this()}](
auto /*set_id*/,
auto && /*internal_obj*/,
auto /*event_type*/,
const network::ExView &event) {
if (auto self = wptr.lock()) {
/// clear caches
BOOST_ASSERT(
self->this_context_->get_executor().running_in_this_thread());
for (auto const &lost : event.lost) {
SL_TRACE(
self->logger_, "Removed backing task.(relay parent={})", lost);

self->our_current_state_.state_by_relay_parent.erase(lost);
self->pending_candidates.exclusiveAccess(
[&](auto &container) { container.erase(lost); });
}

if (auto r = self->canProcessParachains(); r.has_error()) {
return;
}
my_view_sub_->setCallback(
[wptr{weak_from_this()}](auto /*set_id*/,
auto && /*internal_obj*/,
auto /*event_type*/,
const network::ExView &event) {
if (auto self = wptr.lock()) {
/// clear caches
BOOST_ASSERT(
self->this_context_->get_executor().running_in_this_thread());
auto const &relay_parent =
primitives::calculateBlockHash(event.new_head, *self->hasher_)
.value();

self->our_current_state_.active_leaves.exclusiveAccess(
[&](auto &active_leaves) {
for (auto const &lost : event.lost) {
SL_TRACE(self->logger_,
"Removed backing task.(relay parent={})",
lost);

self->our_current_state_.state_by_relay_parent.erase(lost);
self->pending_candidates.exclusiveAccess(
[&](auto &container) { container.erase(lost); });
active_leaves.erase(lost);
}
active_leaves.insert(relay_parent);
});
if (auto r = self->canProcessParachains(); r.has_error()) {
return;
}

auto const &relay_parent =
primitives::calculateBlockHash(event.new_head, *self->hasher_)
.value();
self->createBackingTask(relay_parent);
SL_TRACE(self->logger_,
"Update my view.(new head={}, finalized={}, leaves={})",
relay_parent,
event.view.finalized_number_,
event.view.heads_.size());
self->broadcastView(event.view);
}
});
self->createBackingTask(relay_parent);
SL_TRACE(self->logger_,
"Update my view.(new head={}, finalized={}, leaves={})",
relay_parent,
event.view.finalized_number_,
event.view.heads_.size());
self->broadcastView(event.view);
}
});
return true;
}

Expand Down Expand Up @@ -870,23 +880,24 @@ namespace kagome::parachain {
validity_votes_out;
validity_votes_out.reserve(validity_votes.size());

for (auto &[validator_index, statement] : validity_votes) {
if (is_type<network::CommittedCandidateReceipt>(
statement.payload.payload.candidate_state)) {
validity_votes_out.emplace_back(
validator_index,
network::ValidityAttestation{
for (auto &[validator_index, validity_vote] : validity_votes) {
auto validity_attestation = visit_in_place(
validity_vote,
[](const BackingStore::ValidityVoteIssued &val) {
return network::ValidityAttestation{
network::ValidityAttestation::Implicit{},
statement.signature,
});
} else {
validity_votes_out.emplace_back(
validator_index,
network::ValidityAttestation{
((BackingStore::Statement &)val).signature,
};
},
[](const BackingStore::ValidityVoteValid &val) {
return network::ValidityAttestation{
network::ValidityAttestation::Explicit{},
statement.signature,
});
}
((BackingStore::Statement &)val).signature,
};
});

validity_votes_out.emplace_back(validator_index,
std::move(validity_attestation));
}

return AttestedCandidate{
Expand Down Expand Up @@ -1385,8 +1396,24 @@ namespace kagome::parachain {
const primitives::BlockHash &relay_parent,
size_t n_validators) {
TicToc _measure{"Parachain validation", logger_};

const auto candidate_hash{candidateHashFrom(candidate)};

/// checks if we still need to execute parachain task
auto need_to_process = our_current_state_.active_leaves.sharedAccess(
[&](const auto &active_leaves) {
return active_leaves.count(relay_parent) != 0ull;
});

if (!need_to_process) {
SL_TRACE(logger_,
"Candidate validation skipped because of extruded relay parent. "
"(relay_parent={}, parachain_id={}, candidate_hash={})",
relay_parent,
candidate.descriptor.para_id,
candidate_hash);
return Error::VALIDATION_FAILED;
}

auto validation_result = validateCandidate(candidate, pov, relay_parent);
if (!validation_result) {
logger_->warn(
Expand All @@ -1399,6 +1426,22 @@ namespace kagome::parachain {
return Error::VALIDATION_FAILED;
}

need_to_process = our_current_state_.active_leaves.sharedAccess(
[&](const auto &active_leaves) {
return active_leaves.count(relay_parent) != 0ull;
});

if (!need_to_process) {
SL_TRACE(logger_,
"Candidate validation skipped before erasure-coding because of "
"extruded relay parent. "
"(relay_parent={}, parachain_id={}, candidate_hash={})",
relay_parent,
candidate.descriptor.para_id,
candidate_hash);
return Error::VALIDATION_FAILED;
}

auto &[comms, data] = validation_result.value();
runtime::AvailableData available_data{
.pov = std::move(pov),
Expand Down
4 changes: 3 additions & 1 deletion core/parachain/validator/parachain_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,10 @@ namespace kagome::parachain {
libp2p::peer::PeerId,
std::deque<std::pair<RelayHash, network::SignedStatement>>>
seconded_statements;
/// Added as independent member to prevent extra locks for
/// `state_by_relay_parent` which is used in internal thread only
SafeObject<std::unordered_set<RelayHash>> active_leaves;
} our_current_state_;

SafeObject<std::unordered_map<RelayHash, network::CollationEvent>>
pending_candidates;
std::shared_ptr<WorkersContext> this_context_;
Expand Down