Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions scripts/my.cnf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ tianmu_groupby_parallel_rows_minimum=655360
# order by parallel switch
tianmu_orderby_speedup=1

# DIR
tianmu_hugefiledir='/tmp'

# MB, If the value is set to 0, the system does not use it
tianmu_hugefilesize=30720

# minimum available memory of the system, MB
# If the remaining memory is smaller than this value, huge memory is used
tianmu_os_least_mem=512

# here, at the end of [mysqld] group mtr will automatically disable
# all optional plugins.

Expand Down
90 changes: 76 additions & 14 deletions storage/tianmu/core/aggregation_algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ low-level mechanisms
#include "core/mi_iterator.h"
#include "core/pack_guardian.h"
#include "core/transaction.h"
#include "mm/memory_statistics.h"
#include "system/fet.h"
#include "system/tianmu_system.h"

Expand Down Expand Up @@ -247,7 +248,8 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
unsigned int thd_cnt = 1;
if (tianmu_sysvar_groupby_parallel_degree > 1) {
if (static_cast<uint64_t>(mit.NumOfTuples()) > tianmu_sysvar_groupby_parallel_rows_minimum) {
unsigned int thd_limit = std::thread::hardware_concurrency() * 2;
unsigned int thd_limit = std::thread::hardware_concurrency();
thd_limit = thd_limit > 8 ? 8 : thd_limit; // limit no more 8
thd_cnt = tianmu_sysvar_groupby_parallel_degree > thd_limit ? thd_limit : tianmu_sysvar_groupby_parallel_degree;
TIANMU_LOG(LogCtl_Level::DEBUG,
"MultiDimensionalGroupByScan multi threads thd_cnt: %d thd_limit: %d NumOfTuples: %d "
Expand Down Expand Up @@ -289,13 +291,16 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
}

[[maybe_unused]] const char *thread_type = "multi";

[[maybe_unused]] uint64_t mem_used = 0;
#ifdef DEBUG_AGGREGA_COST
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
uint64_t mem_available = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available;
uint64_t swap_used = MemoryStatisticsOS::Instance()->GetMemInfo().swap_used;
memory_statistics_record("AGGREGA", "START");
#endif

if (ag_worker.ThreadsUsed() > 1) {
ag_worker.DistributeAggreTaskAverage(mit);
ag_worker.DistributeAggreTaskAverage(mit, &mem_used);
} else {
thread_type = "sin";
while (mit.IsValid()) { // need muti thread
Expand All @@ -306,7 +311,7 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6

// Grouping on a packrow
int64_t packrow_length = mit.GetPackSizeLeft();
AggregaGroupingResult grouping_result = AggregatePackrow(gbw, &mit, cur_tuple);
AggregaGroupingResult grouping_result = AggregatePackrow(gbw, &mit, cur_tuple, &mem_used);
if (sender) {
sender->SetAffectRows(gbw.NumOfGroups());
}
Expand All @@ -324,12 +329,17 @@ void AggregationAlgorithm::MultiDimensionalGroupByScan(GroupByWrapper &gbw, int6
}

#ifdef DEBUG_AGGREGA_COST
int64_t mem_available_chg = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available - mem_available;
int64_t swap_used_chg = MemoryStatisticsOS::Instance()->GetMemInfo().swap_used - swap_used;
auto diff =
std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::high_resolution_clock::now() - start);
if (diff.count() > tianmu_sysvar_slow_query_record_interval) {
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow thread_type: %s spend: %f NumOfTuples: %d", thread_type,
diff.count(), mit.NumOfTuples());
TIANMU_LOG(LogCtl_Level::INFO,
"AggregatePackrow thread_type: %s spend: %f NumOfTuples: %d mem_available_chg: %ld swap_used_chg: "
"%ld collec_mem_used: %lu",
thread_type, diff.count(), mit.NumOfTuples(), mem_available_chg, swap_used_chg, mem_used);
}
memory_statistics_record("AGGREGA", "END");
#endif

gbw.ClearDistinctBuffers(); // reset buffers for a new contents
Expand Down Expand Up @@ -517,12 +527,20 @@ void AggregationAlgorithm::MultiDimensionalDistinctScan(GroupByWrapper &gbw, MII
}
}

AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) {
AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple,
uint64_t *mem_used) {
int64_t packrow_length = mit->GetPackSizeLeft();
if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) {
mit->NextPackrow();
return AggregaGroupingResult::AGR_NO_LEFT;
}

#ifdef DEBUG_AGGREGA_COST
const auto &mem_info = MemoryStatisticsOS::Instance()->GetMemInfo();
uint64_t mem_available = mem_info.mem_available;
uint64_t swap_used = mem_info.swap_used;
#endif

int64_t uniform_pos = common::NULL_VALUE_64;
bool skip_packrow = false;
bool packrow_done = false;
Expand Down Expand Up @@ -641,6 +659,21 @@ AggregaGroupingResult AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw
break;
}
gbw.CommitResets();

#ifdef DEBUG_AGGREGA_COST
{
if (mem_available) {
const auto mem_info = MemoryStatisticsOS::Instance()->GetMemInfo();
int64_t mem_available_chg = mem_info.mem_available - mem_available;
int64_t swap_used_chg = mem_info.swap_used - swap_used;

if (mem_used && (mem_available_chg < 0)) {
(*mem_used) -= mem_available_chg;
}
}
}
#endif

return AggregaGroupingResult::AGR_OK; // success
}

Expand Down Expand Up @@ -873,11 +906,12 @@ void AggregationAlgorithm::TaskFillOutput(GroupByWrapper *gbw, Transaction *ci,

void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, DimensionVector *dims [[maybe_unused]],
MIIterator *mit [[maybe_unused]], CTask *task [[maybe_unused]],
GroupByWrapper *gbw, Transaction *ci [[maybe_unused]]) {
GroupByWrapper *gbw, Transaction *ci [[maybe_unused]], uint64_t *mem_used) {
TIANMU_LOG(LogCtl_Level::DEBUG, "TaskAggrePacks task_id: %d start pack_start: %d pack_end: %d", task->dwTaskId,
task->dwStartPackno, task->dwEndPackno);
#ifdef DEBUG_AGGREGA_COST
std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();
uint64_t mem_available = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available;
#endif

taskIterator->Rewind();
Expand All @@ -886,7 +920,7 @@ void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, DimensionVec
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {
int cur_tuple = (*task->dwPack2cur)[task_pack_num];
MIInpackIterator mii(*taskIterator);
AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple);
AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple, mem_used);
if (grouping_result == AggregaGroupingResult::AGR_FINISH)
break;
if (grouping_result == AggregaGroupingResult::AGR_KILLED)
Expand All @@ -901,11 +935,13 @@ void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, DimensionVec
}

#ifdef DEBUG_AGGREGA_COST
int64_t mem_available_chg = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available - mem_available;
auto diff =
std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::high_resolution_clock::now() - start);
if (diff.count() > tianmu_sysvar_slow_query_record_interval) {
TIANMU_LOG(LogCtl_Level::INFO, "TaskAggrePacks task_id: %d spend: %f pack_start: %d pack_end: %d", task->dwTaskId,
diff.count(), task->dwStartPackno, task->dwEndPackno);
TIANMU_LOG(LogCtl_Level::INFO,
"TaskAggrePacks task_id: %d spend: %f pack_start: %d pack_end: %d mem_available_chg: %ld",
task->dwTaskId, diff.count(), task->dwStartPackno, task->dwEndPackno, mem_available_chg);
}
#endif
}
Expand All @@ -924,7 +960,13 @@ void AggregationWorkerEnt::PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_
}

/*Average allocation task*/
void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit) {
void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit, uint64_t *mem_used) {
#ifdef DEBUG_AGGREGA_COST
const auto mem_info = MemoryStatisticsOS::Instance()->GetMemInfo();
uint64_t mem_available = mem_info.mem_available;
uint64_t swap_used = mem_info.swap_used;
#endif

Transaction *conn = current_txn_;
DimensionVector dims(mind->NumOfDimensions());
std::vector<CTask> vTask;
Expand Down Expand Up @@ -1015,18 +1057,38 @@ void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit) {

for (size_t i = 0; i < vTask.size(); ++i) {
GroupByWrapper *gbw = i == 0 ? gb_main : vGBW[i].get();
res1.insert(ha_tianmu_engine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this,
&taskIterator[i], &dims, &mit, &vTask[i], gbw, conn));
res1.insert(ha_tianmu_engine_->query_thread_pool.add_task(
&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i], &dims, &mit, &vTask[i], gbw, conn, mem_used));
}
res1.get_all_with_except();

#ifdef DEBUG_AGGREGA_COST
{
int64_t mem_available_chg = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available - mem_available;
int64_t swap_used_chg = MemoryStatisticsOS::Instance()->GetMemInfo().swap_used - swap_used;
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage TASK mem_available_chg: %ld swap_used_chg: %ld",
mem_available_chg, swap_used_chg);
}
memory_statistics_record("AGGREGA", "TASK");
#endif

for (size_t i = 0; i < vTask.size(); ++i) {
// Merge aggreation data together
if (i != 0) {
aa->MultiDimensionalDistinctScan(*(vGBW[i]), mit);
gb_main->Merge(*(vGBW[i]));
}
}

#ifdef DEBUG_AGGREGA_COST
{
int64_t mem_available_chg = MemoryStatisticsOS::Instance()->GetMemInfo().mem_available - mem_available;
int64_t swap_used_chg = MemoryStatisticsOS::Instance()->GetMemInfo().swap_used - swap_used;
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage MERGE mem_available_chg: %ld swap_used_chg: %ld",
mem_available_chg, swap_used_chg);
}
memory_statistics_record("AGGREGA", "MERGE");
#endif
}
} // namespace core
} // namespace Tianmu
7 changes: 4 additions & 3 deletions storage/tianmu/core/aggregation_algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class AggregationAlgorithm {

// Return code for AggregatePackrow: 0 - success, 1 - stop aggregation
// (finished), 5 - pack already aggregated (skip)
AggregaGroupingResult AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple);
AggregaGroupingResult AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple,
uint64_t *mem_used = nullptr);

// No parallel for subquery/join/distinct cases
bool ParallelAllowed(GroupByWrapper &gbw) {
Expand Down Expand Up @@ -99,8 +100,8 @@ class AggregationWorkerEnt {
int ThreadsUsed() { return m_threads; }
void Barrier() {}
void TaskAggrePacks(MIIterator *taskIterator, DimensionVector *dims, MIIterator *mit, CTask *task,
GroupByWrapper *gbw, Transaction *ci);
void DistributeAggreTaskAverage(MIIterator &mit);
GroupByWrapper *gbw, Transaction *ci, uint64_t *mem_used = nullptr);
void DistributeAggreTaskAverage(MIIterator &mit, uint64_t *mem_used = nullptr);
void PrepShardingCopy(MIIterator *mit, GroupByWrapper *gb_sharding,
std::vector<std::unique_ptr<GroupByWrapper>> *vGBW);

Expand Down
18 changes: 17 additions & 1 deletion storage/tianmu/core/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "core/tools.h"
#include "core/transaction.h"
#include "mm/initializer.h"
#include "mm/memory_statistics.h"
#include "mysql/thread_pool_priv.h"
#include "mysqld_thd_manager.h"
#include "system/file_out.h"
Expand Down Expand Up @@ -222,7 +223,7 @@ int Engine::Init(uint engine_slot) {
size_t main_size = size_t(tianmu_sysvar_servermainheapsize) << 20;

std::string hugefiledir = tianmu_sysvar_hugefiledir;
int hugefilesize = 0; // unused
int hugefilesize = tianmu_sysvar_hugefilesize; // MB
if (hugefiledir.empty())
mm::MemoryManagerInitializer::Instance(0, main_size);
else
Expand Down Expand Up @@ -1558,6 +1559,21 @@ void Engine::LogStat() {
TIANMU_LOG(LogCtl_Level::INFO, msg.c_str());
}

{
const auto &&mem_info = MemoryStatisticsOS::Instance()->GetMemInfo();

uint64_t mem_available = mem_info.mem_available;
uint64_t swap_used = mem_info.swap_used;
int64_t mem_available_chg = mem_available - m_mem_available_;
int64_t swap_used_chg = swap_used - m_swap_used_;
m_mem_available_ = mem_available;
m_swap_used_ = swap_used;

TIANMU_LOG(LogCtl_Level::INFO, "mem_available_chg: %ld swap_used_chg: %ld", mem_available_chg, swap_used_chg);

memory_statistics_record("HEATBEAT", "UPDATE");
}

TIANMU_LOG(LogCtl_Level::DEBUG,
"Select: %lu/%lu, "
"Loaded: %lu/%lu(%lu/%lu), "
Expand Down
2 changes: 2 additions & 0 deletions storage/tianmu/core/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ class Engine final {
std::condition_variable cv_drop_;
std::mutex cv_drop_mtx_;
std::unique_ptr<TaskExecutor> task_executor;
uint64_t m_mem_available_ = 0;
uint64_t m_swap_used_ = 0;
};

class ResultSender {
Expand Down
6 changes: 6 additions & 0 deletions storage/tianmu/handler/ha_tianmu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2382,6 +2382,10 @@ static MYSQL_SYSVAR_UINT(insert_max_buffered, tianmu_sysvar_insert_max_buffered,
static MYSQL_SYSVAR_BOOL(compensation_start, tianmu_sysvar_compensation_start, PLUGIN_VAR_BOOL, "-", nullptr, nullptr,
FALSE);
static MYSQL_SYSVAR_STR(hugefiledir, tianmu_sysvar_hugefiledir, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, "");
static MYSQL_SYSVAR_UINT(os_least_mem, tianmu_os_least_mem, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, 1, 0,
UINT32_MAX, 0);
static MYSQL_SYSVAR_UINT(hugefilesize, tianmu_sysvar_hugefilesize, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, 1, 0,
UINT32_MAX, 0);
static MYSQL_SYSVAR_UINT(cachinglevel, tianmu_sysvar_cachinglevel, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, 1, 0,
512, 0);
static MYSQL_SYSVAR_STR(mm_policy, tianmu_sysvar_mm_policy, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, "");
Expand Down Expand Up @@ -2565,6 +2569,8 @@ static struct st_mysql_sys_var *tianmu_showvars[] = {MYSQL_SYSVAR(bg_load_thread
MYSQL_SYSVAR(groupby_parallel_rows_minimum),
MYSQL_SYSVAR(slow_query_record_interval),
MYSQL_SYSVAR(hugefiledir),
MYSQL_SYSVAR(hugefilesize),
MYSQL_SYSVAR(os_least_mem),
MYSQL_SYSVAR(index_cache_size),
MYSQL_SYSVAR(index_search),
MYSQL_SYSVAR(enable_rowstore),
Expand Down
27 changes: 24 additions & 3 deletions storage/tianmu/mm/huge_heap_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,30 @@
*/

#include "huge_heap_policy.h"
#include "system/tianmu_system.h"
#include "util/log_ctl.h"

#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <cstring>
#include <unistd.h>

#include "system/tianmu_system.h"
#include <cstring>

namespace Tianmu {
namespace mm {

HugeHeap::HugeHeap(std::string hugedir, size_t size) : TCMHeap(0) {
{
char command[2048];
std::strcpy(command, "rm -rf ");
std::strcat(command, hugedir.c_str());
std::strcat(command, "/tianmuhuge.*");
::system(command);
}

heap_frame_ = nullptr;
// convert size from MB to B and make it a multiple of 2MB
size_ = 1_MB * (size & ~0x1);
Expand All @@ -40,13 +52,18 @@ HugeHeap::HugeHeap(std::string hugedir, size_t size) : TCMHeap(0) {
std::sprintf(pidtext, "%d", getpid());
std::strcat(huge_filename_, "/tianmuhuge.");
std::strcat(huge_filename_, pidtext);
fd_ = open(huge_filename_, O_CREAT | O_RDWR, 0700);
fd_ = open(huge_filename_, O_CREAT | O_RDWR, 0666);
if (fd_ < 0) {
heap_status_ = HEAP_STATUS::HEAP_OUT_OF_MEMORY;
tianmu_control_ << system::lock << "Memory Manager Error: Unable to create hugepage file: " << huge_filename_
<< system::unlock;
return;
}

lseek(fd_, size_ - 1, SEEK_SET);
write(fd_, " ", 1);
lseek(fd_, 0, SEEK_SET);

// MAP_SHARED to have mmap fail immediately if not enough pages
// MAP_PRIVATE does copy on write
// MAP_POPULATE to create page table entries and avoid future surprises
Expand All @@ -60,6 +77,10 @@ HugeHeap::HugeHeap(std::string hugedir, size_t size) : TCMHeap(0) {
}

tianmu_control_ << system::lock << "Huge Heap size (MB) " << (int)(size) << system::unlock;

TIANMU_LOG(LogCtl_Level::INFO, "HugeHeap huge_filename_: %s size: %ld size_: %ld kPageShift: %d", huge_filename_,
size, size_, kPageShift);

// size_ = size;
// manage the region as a normal 4k pagesize heap
m_heap.RegisterArea(heap_frame_, size_ >> kPageShift);
Expand Down
1 change: 1 addition & 0 deletions storage/tianmu/mm/memory_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class BLOCK_TYPE : char {
BLOCK_COMPRESSED,
BLOCK_UNCOMPRESSED,
BLOCK_TEMPORARY,
BLOCK_HUGE,
BLOCK_FIXED // Used in rc_realloc when pointer != nullptr
};

Expand Down
Loading