Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/authorship/impl/proposer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace kagome::authorship {

ProposerImpl::ProposerImpl(
std::shared_ptr<BlockBuilderFactory> block_builder_factory,
std::shared_ptr<Clock> clock,
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool,
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine,
std::shared_ptr<subscription::ExtrinsicEventKeyRepository>
extrinsic_event_key_repo)
: block_builder_factory_{std::move(block_builder_factory)},
clock_{std::move(clock)},
transaction_pool_{std::move(transaction_pool)},
ext_sub_engine_{std::move(ext_sub_engine)},
extrinsic_event_key_repo_{std::move(extrinsic_event_key_repo)} {
Expand All @@ -39,6 +41,7 @@ namespace kagome::authorship {

outcome::result<primitives::Block> ProposerImpl::propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) {
Expand Down Expand Up @@ -100,10 +103,14 @@ namespace kagome::authorship {
// number of transactions to be pushed to the block

size_t included_tx_count = 0;
std::vector<primitives::Transaction::Hash> included_hashes;
for (const auto &[hash, tx] : ready_txs) {
const auto &tx_ref = tx;
if (deadline && clock_->now() >= deadline) {
break;
}

scale::ScaleEncoderStream s(true);
s << tx_ref->ext;
s << tx->ext;
auto estimate_tx_size = s.size();

if (block_size + estimate_tx_size > block_size_limit) {
Expand All @@ -121,7 +128,7 @@ namespace kagome::authorship {
break;
}

SL_DEBUG(logger_, "Adding extrinsic: {}", tx_ref->ext.data);
SL_DEBUG(logger_, "Adding extrinsic: {}", tx->ext.data);
auto inserted_res = block_builder->pushExtrinsic(tx->ext);
if (not inserted_res) {
if (BlockBuilderError::EXHAUSTS_RESOURCES == inserted_res.error()) {
Expand All @@ -144,6 +151,7 @@ namespace kagome::authorship {
block_size += estimate_tx_size;
transaction_pushed = true;
++included_tx_count;
included_hashes.emplace_back(hash);
}
}
metric_tx_included_in_block_->set(included_tx_count);
Expand All @@ -156,7 +164,7 @@ namespace kagome::authorship {

OUTCOME_TRY(block, block_builder->bake());

for (const auto &[hash, tx] : ready_txs) {
for (const auto &hash : included_hashes) {
auto removed_res = transaction_pool_->removeOne(hash);
if (not removed_res) {
logger_->error(
Expand Down
3 changes: 3 additions & 0 deletions core/authorship/impl/proposer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace kagome::authorship {

ProposerImpl(
std::shared_ptr<BlockBuilderFactory> block_builder_factory,
std::shared_ptr<Clock> clock,
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool,
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine,
Expand All @@ -38,12 +39,14 @@ namespace kagome::authorship {

outcome::result<primitives::Block> propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) override;

private:
std::shared_ptr<BlockBuilderFactory> block_builder_factory_;
std::shared_ptr<Clock> clock_;
std::shared_ptr<transaction_pool::TransactionPool> transaction_pool_;
std::shared_ptr<primitives::events::ExtrinsicSubscriptionEngine>
ext_sub_engine_;
Expand Down
3 changes: 3 additions & 0 deletions core/authorship/proposer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ namespace kagome::authorship {
*/
class Proposer {
public:
using Clock = clock::SystemClock;

virtual ~Proposer() = default;

/**
Expand All @@ -31,6 +33,7 @@ namespace kagome::authorship {
*/
virtual outcome::result<primitives::Block> propose(
const primitives::BlockInfo &parent_block,
std::optional<Clock::TimePoint> deadline,
const primitives::InherentData &inherent_data,
const primitives::Digest &inherent_digest,
TrieChangesTrackerOpt changes_tracker) = 0;
Expand Down
69 changes: 36 additions & 33 deletions core/consensus/babe/impl/babe_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ namespace kagome::consensus::babe {
* @param authority_key authority
* @return index of authority in list of authorities
*/
std::optional<uint64_t> getAuthorityIndex(
std::optional<primitives::AuthorityIndex> getAuthorityIndex(
const primitives::AuthorityList &authorities,
const primitives::BabeSessionKey &authority_key) {
uint64_t n = 0;
primitives::AuthorityIndex n = 0;
for (auto &authority : authorities) {
if (authority.id.id == authority_key) {
return n;
Expand Down Expand Up @@ -665,11 +665,12 @@ namespace kagome::consensus::babe {
bool rewind_slots; // NOLINT
auto slot = current_slot_;

clock::SystemClock::TimePoint now;
do {
// check that we are really in the middle of the slot, as expected; we
// can cooperate with a relatively little (kMaxLatency) latency, as our
// node will be able to retrieve
auto now = clock_->now();
now = clock_->now();

auto finish_time = babe_util_->slotFinishTime(current_slot_);

Expand All @@ -693,9 +694,9 @@ namespace kagome::consensus::babe {
}
} while (rewind_slots);

// Slot processing begins in 1/3 slot time before end
auto finish_time = babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3;
// Slot processing begins in 1/3 slot time after start
auto finish_time = babe_util_->slotStartTime(current_slot_)
+ babe_config_repo_->slotDuration() / 3;

SL_VERBOSE(log_,
"Starting a slot {} in epoch {} (remains {:.2f} sec.)",
Expand All @@ -708,16 +709,16 @@ namespace kagome::consensus::babe {

// everything is OK: wait for the end of the slot
timer_->expiresAt(finish_time);
timer_->asyncWait([&](auto &&ec) {
timer_->asyncWait([this, now](auto &&ec) {
if (ec) {
log_->error("error happened while waiting on the timer: {}", ec);
return;
}
processSlot();
processSlot(now);
});
}

void BabeImpl::processSlot() {
void BabeImpl::processSlot(clock::SystemClock::TimePoint slot_timestamp) {
BOOST_ASSERT(keypair_ != nullptr);

best_block_ = block_tree_->bestLeaf();
Expand Down Expand Up @@ -763,7 +764,7 @@ namespace kagome::consensus::babe {
const auto &authority_index = authority_index_res.value();

if (lottery_->getEpoch() != current_epoch_) {
changeLotteryEpoch(current_epoch_, babe_config);
changeLotteryEpoch(current_epoch_, authority_index, babe_config);
}

auto slot_leadership = lottery_->getSlotLeadership(current_slot_);
Expand All @@ -777,8 +778,10 @@ namespace kagome::consensus::babe {
common::Buffer(vrf_result.output),
common::Buffer(vrf_result.proof));

processSlotLeadership(
SlotType::Primary, std::cref(vrf_result), authority_index);
processSlotLeadership(SlotType::Primary,
slot_timestamp,
std::cref(vrf_result),
authority_index);
} else if (babe_config.allowed_slots
== primitives::AllowedSlots::PrimaryAndSecondaryPlain
or babe_config.allowed_slots
Expand All @@ -800,16 +803,20 @@ namespace kagome::consensus::babe {
common::Buffer(vrf.output),
common::Buffer(vrf.proof));

processSlotLeadership(
SlotType::SecondaryVRF, std::cref(vrf), authority_index);
processSlotLeadership(SlotType::SecondaryVRF,
slot_timestamp,
std::cref(vrf),
authority_index);
} else { // plain secondary slots mode
SL_DEBUG(
log_,
"Babe author {} is block producer in secondary plain slot",
keypair_->public_key);

processSlotLeadership(
SlotType::SecondaryPlain, std::nullopt, authority_index);
processSlotLeadership(SlotType::SecondaryPlain,
slot_timestamp,
std::nullopt,
authority_index);
}
} else {
SL_TRACE(log_,
Expand Down Expand Up @@ -848,7 +855,7 @@ namespace kagome::consensus::babe {

// everything is OK: wait for the end of the slot
timer_->expiresAt(start_time);
timer_->asyncWait([&](auto &&ec) {
timer_->asyncWait([this](auto &&ec) {
if (ec) {
log_->error("error happened while waiting on the timer: {}", ec);
return;
Expand Down Expand Up @@ -912,6 +919,7 @@ namespace kagome::consensus::babe {

void BabeImpl::processSlotLeadership(
SlotType slot_type,
clock::SystemClock::TimePoint slot_timestamp,
std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
primitives::AuthorityIndex authority_index) {
BOOST_ASSERT(keypair_ != nullptr);
Expand All @@ -930,7 +938,7 @@ namespace kagome::consensus::babe {

primitives::InherentData inherent_data;
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
clock_->now().time_since_epoch())
slot_timestamp.time_since_epoch())
.count();

if (auto res = inherent_data.putData<uint64_t>(kTimestampId, now);
Expand Down Expand Up @@ -986,8 +994,13 @@ namespace kagome::consensus::babe {
std::make_shared<storage::changes_trie::StorageChangesTrackerImpl>();

// create new block
auto pre_seal_block_res = proposer_->propose(
best_block_, inherent_data, {babe_pre_digest}, changes_tracker);
auto pre_seal_block_res =
proposer_->propose(best_block_,
babe_util_->slotFinishTime(current_slot_)
- babe_config_repo_->slotDuration() / 3,
inherent_data,
{babe_pre_digest},
changes_tracker);
if (!pre_seal_block_res) {
SL_ERROR(log_, "Cannot propose a block: {}", pre_seal_block_res.error());
return;
Expand Down Expand Up @@ -1121,22 +1134,12 @@ namespace kagome::consensus::babe {

void BabeImpl::changeLotteryEpoch(
const EpochDescriptor &epoch,
primitives::AuthorityIndex authority_index,
const primitives::BabeConfiguration &babe_config) const {
BOOST_ASSERT(keypair_ != nullptr);

auto authority_index_res =
getAuthorityIndex(babe_config.authorities, keypair_->public_key);
if (not authority_index_res) {
SL_CRITICAL(log_,
"Block production failed: This node is not in the list of "
"authorities. (public key: {})",
keypair_->public_key);
return;
}

auto threshold = calculateThreshold(babe_config.leadership_rate,
babe_config.authorities,
authority_index_res.value());
auto threshold = calculateThreshold(
babe_config.leadership_rate, babe_config.authorities, authority_index);

lottery_->changeEpoch(epoch, babe_config.randomness, threshold, *keypair_);
}
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/babe/impl/babe_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ namespace kagome::consensus::babe {
/**
* Process the current Babe slot
*/
void processSlot();
void processSlot(clock::SystemClock::TimePoint slot_timestamp);

/**
* Gather block and broadcast it
Expand All @@ -163,6 +163,7 @@ namespace kagome::consensus::babe {
*/
void processSlotLeadership(
SlotType slot_type,
clock::SystemClock::TimePoint slot_timestamp,
std::optional<std::reference_wrapper<const crypto::VRFOutput>> output,
primitives::AuthorityIndex authority_index);

Expand All @@ -173,6 +174,7 @@ namespace kagome::consensus::babe {

void changeLotteryEpoch(
const EpochDescriptor &epoch,
primitives::AuthorityIndex authority_index,
const primitives::BabeConfiguration &babe_config) const;

outcome::result<primitives::PreRuntime> babePreDigest(
Expand Down
Loading