From 353da0eed91dc9af1df78cdfad3b939b4bd0c14a Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Thu, 22 Jun 2017 13:02:45 -0700 Subject: [PATCH 01/11] FIFO Compaction with TTL. - Added a new TableProperty, creation_time, to keep track of when the SST file is created. - Creation_time: - On Flush: Set to the time of flush. - On Compaction: Set to the max creation_time of all the files involved in the compaction. - Added a new TTL option to FIFO compaction options. --- db/builder.cc | 11 +++-- db/builder.h | 8 ++-- db/compaction.cc | 13 ++++++ db/compaction.h | 2 + db/compaction_job.cc | 7 ++- db/compaction_picker.cc | 73 ++++++++++++++++++++++++------ db/flush_job.cc | 8 +++- db/flush_job.h | 2 +- include/rocksdb/advanced_options.h | 10 +++- include/rocksdb/listener.h | 4 ++ include/rocksdb/table_properties.h | 4 ++ options/options.cc | 2 + table/block_based_table_builder.cc | 11 +++-- table/block_based_table_builder.h | 3 +- table/block_based_table_factory.cc | 3 +- table/meta_blocks.cc | 3 ++ table/table_builder.h | 7 ++- table/table_properties.cc | 3 ++ tools/db_bench_tool.cc | 3 +- 19 files changed, 139 insertions(+), 38 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 0c0bbb236b60..83a425d3a986 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,9 +47,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, - const std::string* compression_dict, const bool skip_filters) { + const CompressionOptions& compression_opts, int level, + const std::string* compression_dict, const bool skip_filters, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -57,7 +57,7 @@ TableBuilder* NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, compression_opts, compression_dict, skip_filters, - column_family_name, level), + column_family_name, level, creation_time), column_family_id, file); } @@ -76,7 +76,8 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level) { + TableProperties* table_properties, int level, + const uint64_t creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); diff --git a/db/builder.h b/db/builder.h index b438aad8f9ec..1f8102df1d9d 100644 --- a/db/builder.h +++ b/db/builder.h @@ -50,10 +50,9 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, - int level, + const CompressionOptions& compression_opts, int level, const std::string* compression_dict = nullptr, - const bool skip_filters = false); + const bool skip_filters = false, const uint64_t creation_time = 0); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of @@ -79,6 +78,7 @@ extern Status BuildTable( InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr, int level = -1); + TableProperties* table_properties = nullptr, int level = -1, + const uint64_t creation_time = 0); } // namespace rocksdb diff --git a/db/compaction.cc b/db/compaction.cc index 5c382d163246..0d5253814f0e 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -461,4 +461,17 @@ bool Compaction::ShouldFormSubcompactions() const { } } +uint64_t Compaction::MaxInputFileCreationTime() const { + uint64_t max_creation_time = 0; + if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { + for (const auto& file : inputs_[0].files) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = + creation_time > max_creation_time ? creation_time : max_creation_time; + } + } + return max_creation_time; +} + } // namespace rocksdb diff --git a/db/compaction.h b/db/compaction.h index 457c2cd075dd..0167b16f4c47 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -243,6 +243,8 @@ class Compaction { uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + uint64_t MaxInputFileCreationTime() const; + private: // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool mark_as_compacted); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 41765f19420d..4778bc2b46cf 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1025,7 +1025,6 @@ Status CompactionJob::FinishCompactionOutputFile( uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); - TableProperties table_properties; // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; @@ -1263,14 +1262,14 @@ Status CompactionJob::OpenCompactionOutputFile( // data is going to be found bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, - sub_compact->compaction->output_level(), - &sub_compact->compression_dict, - skip_filters)); + sub_compact->compaction->output_level(), &sub_compact->compression_dict, + skip_filters, sub_compact->compaction->MaxInputFileCreationTime())); LogFlush(db_options_.info_log); return s; } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index c5d2d94c0292..e7897d1a576f 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1457,27 +1457,74 @@ Compaction* FIFOCompactionPicker::PickCompaction( return nullptr; } + auto reason = CompactionReason::kFIFOTtl; std::vector inputs; inputs.emplace_back(); inputs[0].level = 0; - // delete old files (FIFO) - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - char tmp_fsize[16]; - AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); - ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 - " with size %s for deletion", - cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { - break; + + // Delete files based on TTL + const bool ttl_enabled = ioptions_.compaction_options_fifo.ttl > 0; + if (ttl_enabled) { + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + const uint64_t current_time = static_cast(_current_time); + if (status.ok()) { + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); + ++ritr) { + auto f = *ritr; + auto props = f->fd.table_reader->GetTableProperties(); + auto creation_time = props->creation_time; + if (creation_time == 0) { + // Newer files that we might encounter later might have a + // creation_time embedded in them. But we don't want to proceed to + // them before dropping all the old files with the default '0' + // timestamp. Hence, stop. + break; + } + if (creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } + } + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Falling back to size-based table file deletions. ", + cf_name.c_str(), status.ToString().c_str()); } } + + // If TTL-based-deletion is not enabled, or enabled but the table files have + // a creation time of 0 (old files), then we fall back to deleting oldest + // files based on size. + const bool should_proceed_to_size_based_deletion = inputs[0].files.empty(); + if (should_proceed_to_size_based_deletion) { + reason = CompactionReason::kFIFOMaxSize; + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); + if (total_size <= + ioptions_.compaction_options_fifo.max_table_files_size) { + break; + } + } + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + /* is deletion compaction */ true, reason); RegisterCompaction(c); return c; } diff --git a/db/flush_job.cc b/db/flush_job.cc index 5c4645eeb423..adeb7051df22 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -243,6 +243,7 @@ Status FlushJob::WriteLevel0Table() { ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); + Status s; { db_mutex_->Unlock(); @@ -298,6 +299,11 @@ Status FlushJob::WriteLevel0Table() { &output_compression_); EnvOptions optimized_env_options = db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); + + int64_t _current_time; + db_options_.env->GetCurrentTime(&_current_time); + const uint64_t current_time = static_cast(_current_time); + s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, optimized_env_options, cfd_->table_cache(), iter.get(), @@ -308,7 +314,7 @@ Status FlushJob::WriteLevel0Table() { cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */); + Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index b06654d77311..6a685c09f89a 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -70,7 +70,7 @@ class FlushJob { ~FlushJob(); // Require db_mutex held. - // Once PickMemTable() is called, either Run() or Cancel() has to be call. + // Once PickMemTable() is called, either Run() or Cancel() has to be called. void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); void Cancel(); diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 1e271483628b..07f4c8a3cd1d 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -62,6 +62,13 @@ struct CompactionOptionsFIFO { // Default: 1GB uint64_t max_table_files_size; + // Drop files older than TTL. TTL based deletion will take precedence over + // size based deletion if ttl > 0. + // delete if sst_file_creation_time < (current_time - ttl) + // unit: seconds. Ex: 1 day = 1 * 24 * 60 * 60 + // Default: 0 (disabled) + uint64_t ttl = 0; + // If true, try to do compaction to compact smaller files into larger ones. // Minimum files to compact follows options.level0_file_num_compaction_trigger // and compaction won't trigger if average compact bytes per del file is @@ -71,9 +78,10 @@ struct CompactionOptionsFIFO { bool allow_compaction = false; CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} - CompactionOptionsFIFO(uint64_t _max_table_files_size, + CompactionOptionsFIFO(uint64_t _max_table_files_size, uint64_t _ttl, bool _allow_compaction) : max_table_files_size(_max_table_files_size), + ttl(_ttl), allow_compaction(_allow_compaction) {} }; diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 17fee59844df..065fc9fd0af4 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,6 +27,8 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, + kBulkLoading, + kUnknown, }; struct TableFileCreationBriefInfo { @@ -71,6 +73,8 @@ enum class CompactionReason { kFIFOMaxSize, // [FIFO] reduce number of files. kFIFOReduceNumFiles, + // [FIFO] files with creation time < (current_time - interval) + kFIFOTtl, // Manual compaction kManualCompaction, // DB::SuggestCompactRange() marked files for compaction diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 6559b1f3add8..08360d1794ab 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -48,6 +48,7 @@ struct TablePropertiesNames { static const std::string kPrefixExtractorName; static const std::string kPropertyCollectors; static const std::string kCompression; + static const std::string kCreationTime; }; extern const std::string kPropertiesBlock; @@ -158,6 +159,9 @@ struct TableProperties { // by column_family_name. uint64_t column_family_id = rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; + // The time when the SST file was created. + // Since SST files are immutable, this is equivalent to last modified time. + uint64_t creation_time = 0; // Name of the column family with which this SST file is associated. // If column family is unknown, `column_family_name` will be an empty string. diff --git a/options/options.cc b/options/options.cc index 4aaedefda731..3f9fb3027e0e 100644 --- a/options/options.cc +++ b/options/options.cc @@ -367,6 +367,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.allow_compaction: %d", compaction_options_fifo.allow_compaction); + ROCKS_LOG_HEADER(log, "Options.compaction_options_fifo.ttl: %" PRIu64, + compaction_options_fifo.ttl); std::string collector_names; for (const auto& collector_factory : table_properties_collector_factories) { collector_names.append(collector_factory->Name()); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 910a70fb251c..88258994c335 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -269,6 +269,7 @@ struct BlockBasedTableBuilder::Rep { std::unique_ptr flush_block_policy; uint32_t column_family_id; const std::string& column_family_name; + uint64_t creation_time = 0; std::vector> table_properties_collectors; @@ -281,7 +282,7 @@ struct BlockBasedTableBuilder::Rep { const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, - const std::string& _column_family_name) + const std::string& _column_family_name, const uint64_t _creation_time) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -297,7 +298,8 @@ struct BlockBasedTableBuilder::Rep { table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)), column_family_id(_column_family_id), - column_family_name(_column_family_name) { + column_family_name(_column_family_name), + creation_time(_creation_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -336,7 +338,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name) { + const std::string& column_family_name, const uint64_t creation_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -352,7 +354,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, - skip_filters, column_family_name); + skip_filters, column_family_name, creation_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -728,6 +730,7 @@ Status BlockBasedTableBuilder::Finish() { r->props.top_level_index_size = r->p_index_builder_->EstimateTopLevelIndexSize(r->offset); } + r->props.creation_time = r->creation_time; // Add basic properties property_block_builder.AddTableProperty(r->props); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 6f7f494c62d3..3b351443acd3 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -17,6 +17,7 @@ #include #include "rocksdb/flush_block_policy.h" +#include "rocksdb/listener.h" #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" @@ -48,7 +49,7 @@ class BlockBasedTableBuilder : public TableBuilder { const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name); + const std::string& column_family_name, const uint64_t creation_time = 0); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 8db76ea38edd..8745027a6d1b 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -72,7 +72,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.compression_opts, table_builder_options.compression_dict, table_builder_options.skip_filters, - table_builder_options.column_family_name); + table_builder_options.column_family_name, + table_builder_options.creation_time); return table_builder; } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 6af536fbcd49..229b7a7cfaa8 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -77,6 +77,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kFormatVersion, props.format_version); Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); + Add(TablePropertiesNames::kCreationTime, props.creation_time); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); @@ -208,6 +209,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, &new_table_properties->fixed_key_len}, {TablePropertiesNames::kColumnFamilyId, &new_table_properties->column_family_id}, + {TablePropertiesNames::kCreationTime, + &new_table_properties->creation_time}, }; std::string last_key; diff --git a/table/table_builder.h b/table/table_builder.h index cacb2a65dd87..4e413b41110c 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -56,7 +56,8 @@ struct TableBuilderOptions { CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, bool _skip_filters, - const std::string& _column_family_name, int _level) + const std::string& _column_family_name, int _level, + const uint64_t _creation_time = 0) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), @@ -65,7 +66,8 @@ struct TableBuilderOptions { compression_dict(_compression_dict), skip_filters(_skip_filters), column_family_name(_column_family_name), - level(_level) {} + level(_level), + creation_time(_creation_time) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -77,6 +79,7 @@ struct TableBuilderOptions { bool skip_filters; // only used by BlockBasedTableBuilder const std::string& column_family_name; int level; // what level this table/file is on, -1 for "not set, don't know" + const uint64_t creation_time; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_properties.cc b/table/table_properties.cc index b03928e8868a..f3373ba539dc 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -139,6 +139,8 @@ std::string TableProperties::ToString( compression_name.empty() ? std::string("N/A") : compression_name, prop_delim, kv_delim); + AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim); + return result; } @@ -190,6 +192,7 @@ const std::string TablePropertiesNames::kPrefixExtractorName = const std::string TablePropertiesNames::kPropertyCollectors = "rocksdb.property.collectors"; const std::string TablePropertiesNames::kCompression = "rocksdb.compression"; +const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 1ecae9a49d62..497c28a082d9 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -638,6 +638,7 @@ DEFINE_uint64(fifo_compaction_max_table_files_size_mb, 0, "The limit of total table file sizes to trigger FIFO compaction"); DEFINE_bool(fifo_compaction_allow_compaction, true, "Allow compaction in FIFO compaction."); +DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds."); #endif // ROCKSDB_LITE DEFINE_bool(report_bg_io_stats, false, @@ -2864,7 +2865,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, - FLAGS_fifo_compaction_allow_compaction); + FLAGS_fifo_compaction_ttl, FLAGS_fifo_compaction_allow_compaction); #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset( From 2d87ed4b8e54537a0d165e6b14f3c46770444dbf Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Thu, 22 Jun 2017 13:49:34 -0700 Subject: [PATCH 02/11] Remove accidental additions to TableFileCreationReason enum. --- include/rocksdb/listener.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 065fc9fd0af4..40d318e0941c 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -27,8 +27,6 @@ enum class TableFileCreationReason { kFlush, kCompaction, kRecovery, - kBulkLoading, - kUnknown, }; struct TableFileCreationBriefInfo { From 5d8f1844978f97ad9e773a483aa5338612c82565 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 12:58:29 -0700 Subject: [PATCH 03/11] Cleaned up FIFO compaction with TTL logic, and added tests. - Added unit tests. - Split PickCompaction into PickTTLCompaction and PickSizeCompaction. - Fixed a bug in BuildTable where time wasn't being passed to NewTableBuilder. --- db/builder.cc | 3 +- db/compaction_picker.cc | 159 +++++++++++++++++++------------- db/compaction_picker.h | 13 +++ db/db_test.cc | 200 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 309 insertions(+), 66 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 83a425d3a986..6d34c9efe962 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -126,7 +126,8 @@ Status BuildTable( builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, - compression_opts, level); + compression_opts, level, nullptr /* compression_dict */, + false /* skip_filters */, creation_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index e7897d1a576f..a0c177b5ad64 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1405,17 +1405,79 @@ bool FIFOCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } -Compaction* FIFOCompactionPicker::PickCompaction( +uint64_t FIFOCompactionPicker::GetTotalFilesSize( + const std::vector& files) { + uint64_t total_size = 0; + for (const auto& f : files) { + total_size += f->fd.file_size; + } + return total_size; +} + +Compaction* FIFOCompactionPicker::PickTTLCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - assert(vstorage->num_levels() == 1); + assert(ioptions_.compaction_options_fifo.ttl > 0); + const int kLevel0 = 0; const std::vector& level_files = vstorage->LevelFiles(kLevel0); - uint64_t total_size = 0; - for (const auto& file : level_files) { - total_size += file->fd.file_size; + uint64_t total_size = GetTotalFilesSize(level_files); + + int64_t _current_time; + auto status = ioptions_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Not doing compactions based on TTL. ", + cf_name.c_str(), status.ToString().c_str()); + return nullptr; + } + const uint64_t current_time = static_cast(_current_time); + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + auto props = f->fd.table_reader->GetTableProperties(); + auto creation_time = props->creation_time; + if (creation_time == 0) { + continue; + } else if (creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } } + // Return a nullptr and proceed to size-based FIFO compaction if: + // 1. there are no files older than ttl OR + // 2. there are a few files older than ttl, but deleting them will not bring + // the total size to be less than max_table_files_size threshold. + if (inputs[0].files.empty() || + total_size > ioptions_.compaction_options_fifo.max_table_files_size) { + return nullptr; + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, + kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), + /* is deletion compaction */ true, CompactionReason::kFIFOTtl); + return c; +} + +Compaction* FIFOCompactionPicker::PickSizeCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + uint64_t total_size = GetTotalFilesSize(level_files); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || level_files.size() == 0) { // total size not exceeded @@ -1435,7 +1497,6 @@ Compaction* FIFOCompactionPicker::PickCompaction( /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); - RegisterCompaction(c); return c; } } @@ -1457,74 +1518,44 @@ Compaction* FIFOCompactionPicker::PickCompaction( return nullptr; } - auto reason = CompactionReason::kFIFOTtl; std::vector inputs; inputs.emplace_back(); inputs[0].level = 0; - // Delete files based on TTL - const bool ttl_enabled = ioptions_.compaction_options_fifo.ttl > 0; - if (ttl_enabled) { - int64_t _current_time; - auto status = ioptions_.env->GetCurrentTime(&_current_time); - const uint64_t current_time = static_cast(_current_time); - if (status.ok()) { - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); - ++ritr) { - auto f = *ritr; - auto props = f->fd.table_reader->GetTableProperties(); - auto creation_time = props->creation_time; - if (creation_time == 0) { - // Newer files that we might encounter later might have a - // creation_time embedded in them. But we don't want to proceed to - // them before dropping all the old files with the default '0' - // timestamp. Hence, stop. - break; - } - if (creation_time < - (current_time - ioptions_.compaction_options_fifo.ttl)) { - inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); - } - } - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: Couldn't get current time: %s. " - "Falling back to size-based table file deletions. ", - cf_name.c_str(), status.ToString().c_str()); - } - } - - // If TTL-based-deletion is not enabled, or enabled but the table files have - // a creation time of 0 (old files), then we fall back to deleting oldest - // files based on size. - const bool should_proceed_to_size_based_deletion = inputs[0].files.empty(); - if (should_proceed_to_size_based_deletion) { - reason = CompactionReason::kFIFOMaxSize; - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - char tmp_fsize[16]; - AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with size %s for deletion", - cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= - ioptions_.compaction_options_fifo.max_table_files_size) { - break; - } + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { + break; } } Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), - /* is deletion compaction */ true, reason); + /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); + return c; +} + +Compaction* FIFOCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(vstorage->num_levels() == 1); + + Compaction* c = nullptr; + if (ioptions_.compaction_options_fifo.ttl > 0) { + c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } + if (c == nullptr) { + c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); + } RegisterCompaction(c); return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ae829cb4961b..c289fcb37a04 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -244,6 +244,19 @@ class FIFOCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; + + private: + Compaction* PickTTLCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + Compaction* PickSizeCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); + + uint64_t GetTotalFilesSize(const std::vector& files); }; class NullCompactionPicker : public CompactionPicker { diff --git a/db/db_test.cc b/db/db_test.cc index 52ee7306842a..e88bfeb3b7bd 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2792,7 +2792,7 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_EQ(NumTableFilesAtLevel(0), 10); for (int i = 0; i < 60; i++) { - // Generate and flush a file about 10KB. + // Generate and flush a file about 20KB. for (int j = 0; j < 20; j++) { ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); } @@ -2807,6 +2807,204 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { ASSERT_LE(SizeAtLevel(0), options.compaction_options_fifo.max_table_files_size); } + +TEST_F(DBTest, FIFOCompactionTestWithTTL) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.write_buffer_size = 10 << 10; // 10KB + options.arena_block_size = 4096; + options.compression = kNoCompression; + options.create_if_missing = true; + + // Test to make sure that all files with expired ttl are deleted on next + // manual compaction. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 600; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // sleep for 5 seconds + env_->SleepForMicroseconds(5 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // change ttl to 1 sec. So all files should be deleted on next compaction. + options.compaction_options_fifo.ttl = 1; + Reopen(options); + + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + } + + // Test to make sure that all files with expired ttl are deleted on compaction + // that is triggerred by size going beyond max_table_files_size threshold. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + // Create 10 more files. The old 10 files are dropped. + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + + // Only the new 10 files remain. + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test that shows the fall back to size-based FIFO compaction if TTL-based + // deletion doesn't move the total size to be less than max_table_files_size. + { + options.write_buffer_size = 110 << 10; // 10KB + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = false; + options.compaction_options_fifo.ttl = 5; // seconds + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 3; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 140; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with TTL + Intra-L0 compactions. + { + options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 5; // seconds + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 10; i++) { + // Generate and flush a file about 10KB. + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1 + // (due to level0_file_num_compaction_trigger = 6). + // So total files = 1 + remaining 4 = 5. + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Sleep for a little over ttl time. + env_->SleepForMicroseconds(6 * 1000 * 1000); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + + // Create 10 more files. The old 5 files are dropped as their ttl expired. + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } + + // Test with large TTL + Intra-L0 compactions. + // Files dropped based on size, as ttl doesn't kick in. + { + options.write_buffer_size = 20 << 10; // 20K + options.compaction_options_fifo.max_table_files_size = 1500 << 10; // 1.5MB + options.compaction_options_fifo.allow_compaction = true; + options.compaction_options_fifo.ttl = 60 * 60; // 1 hour + options.level0_file_num_compaction_trigger = 6; + options = CurrentOptions(options); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + // It should be compacted to 10 files. + ASSERT_EQ(NumTableFilesAtLevel(0), 10); + + for (int i = 0; i < 60; i++) { + // Generate and flush a file about 20KB. + for (int j = 0; j < 20; j++) { + ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980))); + } + Flush(); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // It should be compacted to no more than 20 files. + ASSERT_GT(NumTableFilesAtLevel(0), 10); + ASSERT_LT(NumTableFilesAtLevel(0), 18); + // Size limit is still guaranteed. + ASSERT_LE(SizeAtLevel(0), + options.compaction_options_fifo.max_table_files_size); + } +} #endif // ROCKSDB_LITE #ifndef ROCKSDB_LITE From 3a90501f687efcf1d92356d900665b048d648be0 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 13:02:15 -0700 Subject: [PATCH 04/11] Added creation_time to the tables built during repair and recovery. Also handled the return status of GetCurrentTime, so that a garbage value is not used when return statu is not ok. time is set to 0 instead. --- db/compaction_job.cc | 13 ++++++++++++- db/db_impl_open.cc | 18 +++++++++++++++++- db/flush_job.cc | 6 ++++-- db/repair.cc | 12 +++++++++++- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4778bc2b46cf..af83532a1e8a 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1263,13 +1263,24 @@ Status CompactionJob::OpenCompactionOutputFile( bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; + uint64_t output_file_creation_time = + sub_compact->compaction->MaxInputFileCreationTime(); + if (output_file_creation_time == 0) { + int64_t _current_time; + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + output_file_creation_time = static_cast(_current_time); + } + sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, sub_compact->compaction->output_level(), &sub_compact->compression_dict, - skip_filters, sub_compact->compaction->MaxInputFileCreationTime())); + skip_filters, output_file_creation_time)); LogFlush(db_options_.info_log); return s; } diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 995b329bfa62..4a81ff4b9f6a 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -18,6 +18,7 @@ #include "db/builder.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" +#include "table/block_based_table_factory.h" #include "util/rate_limiter.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" @@ -164,6 +165,12 @@ static Status ValidateOptions( "universal and level compaction styles. "); } } + if (cfd.options.compaction_options_fifo.ttl > 0 && + cfd.options.table_factory->Name() != BlockBasedTableFactory().Name()) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported in " + "Block-Based Table format. "); + } } if (db_options.db_paths.size() > 4) { @@ -832,6 +839,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, *cfd->GetLatestMutableCFOptions(); bool paranoid_file_checks = cfd->GetLatestMutableCFOptions()->paranoid_file_checks; + + int64_t _current_time; + s = env_->GetCurrentTime(&_current_time); + if (!s.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + { mutex_.Unlock(); @@ -851,7 +866,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, - &event_logger_, job_id); + &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, + -1 /* level */, current_time); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" diff --git a/db/flush_job.cc b/db/flush_job.cc index adeb7051df22..d93e2c64e819 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -243,7 +243,6 @@ Status FlushJob::WriteLevel0Table() { ThreadStatus::STAGE_FLUSH_WRITE_L0); db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); - Status s; { db_mutex_->Unlock(); @@ -301,7 +300,10 @@ Status FlushJob::WriteLevel0Table() { db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); int64_t _current_time; - db_options_.env->GetCurrentTime(&_current_time); + auto status = db_options_.env->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } const uint64_t current_time = static_cast(_current_time); s = BuildTable( diff --git a/db/repair.cc b/db/repair.cc index 1f9e344e130f..da6e0f958d02 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -382,6 +382,14 @@ class Repairer { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); EnvOptions optimized_env_options = env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); + + int64_t _current_time; + status = env_->GetCurrentTime(&_current_time); + if (!status.ok()) { + _current_time = 0; + } + const uint64_t current_time = static_cast(_current_time); + status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), optimized_env_options, table_cache_, iter.get(), @@ -389,7 +397,9 @@ class Repairer { &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */, TableFileCreationReason::kRecovery); + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), From 20514a969f223a5a5d0f70ac30382c9a345314a8 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Sun, 25 Jun 2017 13:05:51 -0700 Subject: [PATCH 05/11] Add ttl param to the end in CompactionOptionsFIFO constructor. with a default value of 0. --- include/rocksdb/advanced_options.h | 4 ++-- tools/db_bench_tool.cc | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 07f4c8a3cd1d..701bcb320a91 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -78,8 +78,8 @@ struct CompactionOptionsFIFO { bool allow_compaction = false; CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} - CompactionOptionsFIFO(uint64_t _max_table_files_size, uint64_t _ttl, - bool _allow_compaction) + CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction, + uint64_t _ttl = 0) : max_table_files_size(_max_table_files_size), ttl(_ttl), allow_compaction(_allow_compaction) {} diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 497c28a082d9..6193e603fd49 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -2865,7 +2865,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, - FLAGS_fifo_compaction_ttl, FLAGS_fifo_compaction_allow_compaction); + FLAGS_fifo_compaction_allow_compaction, FLAGS_fifo_compaction_ttl); #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset( From ee95191856e14488f91979767ba0c2e856999b8f Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 00:39:38 -0700 Subject: [PATCH 06/11] Add to HISTORY.md and update score based on TTL. --- HISTORY.md | 1 + db/compaction.cc | 9 +++++---- db/compaction_picker.cc | 25 +++++++++++++------------ db/db_test.cc | 11 ++++++----- db/version_set.cc | 30 ++++++++++++++++++++++++++++++ 5 files changed, 55 insertions(+), 21 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index cabf93b73e29..d13fa01f98e2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ ### New Features * Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property. * RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions. +* [Experimental] FIFO compaction with TTL support. It can be enabled by setting CompactionOptionsFIFO.ttl > 0. ## 5.6.0 (06/06/2017) ### Public API Change diff --git a/db/compaction.cc b/db/compaction.cc index 0d5253814f0e..e7dfb529cffb 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -465,10 +465,11 @@ uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { for (const auto& file : inputs_[0].files) { - uint64_t creation_time = - file->fd.table_reader->GetTableProperties()->creation_time; - max_creation_time = - creation_time > max_creation_time ? creation_time : max_creation_time; + if (file->fd.table_reader != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); + } } } return max_creation_time; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index a0c177b5ad64..aac7db967f10 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1440,18 +1440,19 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; - auto props = f->fd.table_reader->GetTableProperties(); - auto creation_time = props->creation_time; - if (creation_time == 0) { - continue; - } else if (creation_time < - (current_time - ioptions_.compaction_options_fifo.ttl)) { - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); + if (f->fd.table_reader != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions_.compaction_options_fifo.ttl)) { + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), creation_time); + } } } diff --git a/db/db_test.cc b/db/db_test.cc index e88bfeb3b7bd..2dac10f984b5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2849,8 +2849,8 @@ TEST_F(DBTest, FIFOCompactionTestWithTTL) { ASSERT_EQ(NumTableFilesAtLevel(0), 0); } - // Test to make sure that all files with expired ttl are deleted on compaction - // that is triggerred by size going beyond max_table_files_size threshold. + // Test to make sure that all files with expired ttl are deleted on next + // automatic compaction. { options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.allow_compaction = false; @@ -2873,16 +2873,17 @@ TEST_F(DBTest, FIFOCompactionTestWithTTL) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_EQ(NumTableFilesAtLevel(0), 10); - // Create 10 more files. The old 10 files are dropped. - for (int i = 0; i < 10; i++) { + // Create 1 more file to trigger TTL compaction. The old files are dropped. + for (int i = 0; i < 1; i++) { for (int j = 0; j < 10; j++) { ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980))); } Flush(); } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Only the new 10 files remain. - ASSERT_EQ(NumTableFilesAtLevel(0), 10); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_LE(SizeAtLevel(0), options.compaction_options_fifo.max_table_files_size); } diff --git a/db/version_set.cc b/db/version_set.cc index 6c220b5ef82f..008e22c127be 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1318,6 +1318,31 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded( } } +namespace { +uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, + const std::vector& files) { + uint32_t ttl_expired_files_count = 0; + + int64_t _current_time; + auto status = ioptions.env->GetCurrentTime(&_current_time); + if (status.ok()) { + const uint64_t current_time = static_cast(_current_time); + for (auto f : files) { + if (!f->being_compacted && f->fd.table_reader != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions.compaction_options_fifo.ttl)) { + ttl_expired_files_count++; + } + } + } + } + return ttl_expired_files_count; +} +} // anonymous namespace + void VersionStorageInfo::ComputeCompactionScore( const ImmutableCFOptions& immutable_cf_options, const MutableCFOptions& mutable_cf_options) { @@ -1364,6 +1389,11 @@ void VersionStorageInfo::ComputeCompactionScore( mutable_cf_options.level0_file_num_compaction_trigger, score); } + if (immutable_cf_options.compaction_options_fifo.ttl > 0) { + score = std::max(static_cast(GetExpiredTtlFilesCount( + immutable_cf_options, files_[level])), + score); + } } else { score = static_cast(num_sorted_runs) / From e6fc920860db84005e25fa4f848768718a6aa5b2 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 11:48:30 -0700 Subject: [PATCH 07/11] Return MaxInputFileCreationTime irrespective of compaction. Also moved GetTotalFilesSize into anonymouse namespace from FIFOCompactionPicker class. --- db/compaction.cc | 12 +++++------- db/compaction_picker.cc | 4 +++- db/compaction_picker.h | 2 -- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index e7dfb529cffb..e4690fc4b5aa 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -463,13 +463,11 @@ bool Compaction::ShouldFormSubcompactions() const { uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; - if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { - for (const auto& file : inputs_[0].files) { - if (file->fd.table_reader != nullptr) { - uint64_t creation_time = - file->fd.table_reader->GetTableProperties()->creation_time; - max_creation_time = std::max(max_creation_time, creation_time); - } + for (const auto& file : inputs_[0].files) { + if (file->fd.table_reader != nullptr) { + uint64_t creation_time = + file->fd.table_reader->GetTableProperties()->creation_time; + max_creation_time = std::max(max_creation_time, creation_time); } } return max_creation_time; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index aac7db967f10..04038f83b47f 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1405,7 +1405,8 @@ bool FIFOCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } -uint64_t FIFOCompactionPicker::GetTotalFilesSize( +namespace { +uint64_t GetTotalFilesSize( const std::vector& files) { uint64_t total_size = 0; for (const auto& f : files) { @@ -1413,6 +1414,7 @@ uint64_t FIFOCompactionPicker::GetTotalFilesSize( } return total_size; } +} // anonymous namespace Compaction* FIFOCompactionPicker::PickTTLCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c289fcb37a04..eb5f06819b63 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -255,8 +255,6 @@ class FIFOCompactionPicker : public CompactionPicker { const MutableCFOptions& mutable_cf_options, VersionStorageInfo* version, LogBuffer* log_buffer); - - uint64_t GetTotalFilesSize(const std::vector& files); }; class NullCompactionPicker : public CompactionPicker { From 7c096ef84275ec0626f993b3a83fe6a353338678 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 14:47:17 -0700 Subject: [PATCH 08/11] Move log message to the right place. Since there is a potential chance that we could discard the files picked in the main loop in PickTTLCompaction, move the log message to the right place, to be right before creating the Compaction object. --- db/compaction_picker.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 04038f83b47f..17f223353e97 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1450,10 +1450,6 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( (current_time - ioptions_.compaction_options_fifo.ttl)) { total_size -= f->compensated_file_size; inputs[0].files.push_back(f); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] FIFO compaction: picking file %" PRIu64 - " with creation time %" PRIu64 " for deletion", - cf_name.c_str(), f->fd.GetNumber(), creation_time); } } } @@ -1467,6 +1463,14 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( return nullptr; } + for (const auto& f : inputs[0].files) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with creation time %" PRIu64 " for deletion", + cf_name.c_str(), f->fd.GetNumber(), + f->fd.table_reader->GetTableProperties()->creation_time); + } + Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0), From a87140a1c50b776f51c05cac8101b69e3a91876a Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 16:30:35 -0700 Subject: [PATCH 09/11] Support FIFO-Compaction-with-TTL only with unlimited open files. Support FIFO-compaction-with-TTL only when max_open_files=-1, as the creation_time embedded in every file's table properties need to be consulted to figure out the files that need to be deleted. table_reader embedded in a FileDescriptor could potentially get deleted if max_open_files is not set to -1. We could initialize the table_reader again to get the table properties again, but it would involve a performance penalty due to reading new blocks from disk. We could potentially support it in the future, but may be not in the first version. --- db/db_impl_open.cc | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 4a81ff4b9f6a..9641d492dbc7 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -165,11 +165,18 @@ static Status ValidateOptions( "universal and level compaction styles. "); } } - if (cfd.options.compaction_options_fifo.ttl > 0 && - cfd.options.table_factory->Name() != BlockBasedTableFactory().Name()) { - return Status::NotSupported( - "FIFO Compaction with TTL is only supported in " - "Block-Based Table format. "); + if (cfd.options.compaction_options_fifo.ttl > 0) { + if (db_options.max_open_files != -1) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported when files are always " + "kept open (set max_open_files = -1). "); + } + if (cfd.options.table_factory->Name() != + BlockBasedTableFactory().Name()) { + return Status::NotSupported( + "FIFO Compaction with TTL is only supported in " + "Block-Based Table format. "); + } } } From 05bf810a78a387495b868beb1d483217fd270776 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Mon, 26 Jun 2017 23:35:20 -0700 Subject: [PATCH 10/11] Add tests to check the compatiblity of FIFO-with-TTL with other options. 1. It is only supported with max_open_files = -1. 2. It is only supported with Block based table format. --- db/db_test.cc | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/db/db_test.cc b/db/db_test.cc index 2dac10f984b5..2a3546defd08 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2808,7 +2808,47 @@ TEST_F(DBTest, FIFOCompactionTestWithCompaction) { options.compaction_options_fifo.max_table_files_size); } -TEST_F(DBTest, FIFOCompactionTestWithTTL) { +// Check that FIFO-with-TTL is not supported with max_open_files != -1. +TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + // Check that it is not supported with max_open_files != -1. + options.max_open_files = 100; + options = CurrentOptions(options); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + options.max_open_files = -1; + ASSERT_OK(TryReopen(options)); +} + +// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory. +TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) { + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.create_if_missing = true; + options.compaction_options_fifo.ttl = 600; // seconds + + options = CurrentOptions(options); + options.table_factory.reset(NewBlockBasedTableFactory()); + ASSERT_OK(TryReopen(options)); + + Destroy(options); + options.table_factory.reset(NewPlainTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewCuckooTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); + + Destroy(options); + options.table_factory.reset(NewAdaptiveTableFactory()); + ASSERT_TRUE(TryReopen(options).IsNotSupported()); +} + +TEST_F(DBTest, FIFOCompactionWithTTLTest) { Options options; options.compaction_style = kCompactionStyleFIFO; options.write_buffer_size = 10 << 10; // 10KB From 12a5edac7042b3b800cbc5fa47767032489b1f06 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Tue, 27 Jun 2017 16:27:31 -0700 Subject: [PATCH 11/11] Pick only continguous files for deletion based on TTL. Addressing review comments: 1. Pick only continguous files for deletion in FIFO-with-TTL. 2. Add a check to make sure that GetTableProperties() does not return a null pointer, before accessing fields fruther. --- db/compaction.cc | 3 ++- db/compaction_picker.cc | 12 +++++++----- db/version_set.cc | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index e4690fc4b5aa..bb2384a35987 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -464,7 +464,8 @@ bool Compaction::ShouldFormSubcompactions() const { uint64_t Compaction::MaxInputFileCreationTime() const { uint64_t max_creation_time = 0; for (const auto& file : inputs_[0].files) { - if (file->fd.table_reader != nullptr) { + if (file->fd.table_reader != nullptr && + file->fd.table_reader->GetTableProperties() != nullptr) { uint64_t creation_time = file->fd.table_reader->GetTableProperties()->creation_time; max_creation_time = std::max(max_creation_time, creation_time); diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 17f223353e97..fc6a8a8da865 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1442,15 +1442,17 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { auto f = *ritr; - if (f->fd.table_reader != nullptr) { + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; - if (creation_time > 0 && - creation_time < + if (creation_time == 0 || + creation_time >= (current_time - ioptions_.compaction_options_fifo.ttl)) { - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); + break; } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); } } diff --git a/db/version_set.cc b/db/version_set.cc index 008e22c127be..9251cbd6d141 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1328,7 +1328,8 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, if (status.ok()) { const uint64_t current_time = static_cast(_current_time); for (auto f : files) { - if (!f->being_compacted && f->fd.table_reader != nullptr) { + if (!f->being_compacted && f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; if (creation_time > 0 &&