diff --git a/configure b/configure index 9624080159b..c2fae1ee65d 100755 --- a/configure +++ b/configure @@ -21032,7 +21032,6 @@ $as_echo "#define HAVE_GCC__ATOMIC_INT64_CAS 1" >>confdefs.h fi - # Check for x86 cpuid instruction { $as_echo "$as_me:${as_lineno-$LINENO}: checking for __get_cpuid" >&5 $as_echo_n "checking for __get_cpuid... " >&6; } diff --git a/contrib/interconnect/README.md b/contrib/interconnect/README.md index 3975cb7b832..fd9615d89c9 100644 --- a/contrib/interconnect/README.md +++ b/contrib/interconnect/README.md @@ -199,6 +199,75 @@ SendChunk/RecvTupleChunk() { ``` +# interconnect test && bench +In the path `contrib/interconnect/test`, the test of the interconnect and the benchmark are implemented, also need to be compiled separately. +For proxy type interconnect, user need to compile `cbdb` with `--enable-ic-proxy` to make the test take effect. + +compile test and benchmark + +``` +cd contrib/interconnect/test +mkdir build && cd build +cmake .. +make -j +``` + +Notice that: for now, only `single client + single server` is supported in testing and benchmarking. + +## bench result + +test env + +- system: CentOS Linux release 7.5.1804 +- machine: qingcloud e2, x86, 8 cpu, 16G memory +- mtu: 1500 +- buffer size: 200 +- time: 100s + + +tcp result: + +``` ++----------------+------------+ +| Total time(s) | 100.000 | +| Loop times | 48045111 | +| LPS(l/ms) | 480.451 | +| Recv mbs | 65430 | +| TPS(mb/s) | 654.301 | +| Recv counts | 336315777 | +| Items ops/ms | 3363.157 | ++----------------+------------+ +``` + +proxy result: + +``` ++----------------+------------+ +| Total time(s) | 100.118 | +| Loop times | 11447670 | +| LPS(l/ms) | 114.342 | +| Recv mbs | 15589 | +| TPS(mb/s) | 155.716 | +| Recv counts | 80133690 | +| Items ops/ms | 800.393 | ++----------------+------------+ +``` + +udpifc result: + +``` ++----------------+------------+ +| Total time(s) | 100.079 | +| Loop times | 369104 | +| LPS(l/ms) | 3.688 | +| Recv mbs | 502 | +| TPS(mb/s) | 5.023 | +| Recv counts | 2583728 | +| Items ops/ms | 25.817 | ++----------------+------------+ +``` + +Notice that: Lower TPS does not mean the protocol is slower, might means that the cpu time taken by the protocol is low. For the udpifc, it satisfies the highest tps required by `cbdb`. at the same time it occupies a lower cpu than other types of interconnect. diff --git a/contrib/interconnect/test/CMakeLists.txt b/contrib/interconnect/test/CMakeLists.txt new file mode 100644 index 00000000000..78580579b7b --- /dev/null +++ b/contrib/interconnect/test/CMakeLists.txt @@ -0,0 +1,90 @@ +project(interconnect_ext) + +cmake_minimum_required (VERSION 3.11.0) +set(CMAKE_CXX_STANDARD 14) + +find_program( + PG_CONFIG pg_config + HINTS ${PG_PATH} + PATH_SUFFIXES bin + DOC "The path to the pg_config of the CBDB version to compile against") + +if(NOT PG_CONFIG) + message(FATAL_ERROR "Unable to find 'pg_config'") +endif() + +function(GET_PG_CONFIG var) + set(_temp) + + # Only call pg_config if the variable didn't already have a value. + if(NOT ${var}) + execute_process( + COMMAND ${PG_CONFIG} ${ARGN} + OUTPUT_VARIABLE _temp + OUTPUT_STRIP_TRAILING_WHITESPACE) + endif() + + set(${var} + ${_temp} + PARENT_SCOPE) +endfunction() + +# Get CBDB configuration from pg_config +get_pg_config(PG_INCLUDEDIR --includedir) + +set(TOP_DIR ../../..) +set(UNIT_TEST_DIR ${TOP_DIR}/src/test/unit/cmockery/) +set(UNIT_TEST_MOCK_DIR ${TOP_DIR}/src/test/unit/mock/) +set(IC_MODULE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../) +set(CBDB_INCLUDE_DIR ${PG_INCLUDEDIR}/postgresql/server) + +add_definitions(-DENABLE_IC_PROXY) + +set(interconnect_src + ../ic_common.c + ../ic_modules.c + ../tcp/ic_tcp.c + ../udp/ic_udpifc.c + ../proxy/ic_proxy_main.c + ../proxy/ic_proxy_client.c + ../proxy/ic_proxy_peer.c + ../proxy/ic_proxy_router.c + ../proxy/ic_proxy_backend.c + ../proxy/ic_proxy_addr.c + ../proxy/ic_proxy_key.c + ../proxy/ic_proxy_packet.c + ../proxy/ic_proxy_pkt_cache.c + ../proxy/ic_proxy_iobuf.c) + +set(interconnect_ext_test_src + ${UNIT_TEST_DIR}/cmockery.c + ic_test_env.c + elog_mock.c + ic_interface_test.c) + +set(ic_bench_src + ${UNIT_TEST_DIR}/cmockery.c + ic_test_env.c + elog_mock.c + ic_bench.c) + +SET(CMAKE_BUILD_TYPE "Debug") +SET(CMAKE_CXX_FLAGS_DEBUG "$ENV{CXXFLAGS} -O0 -Wall -g -ggdb") + +add_compile_definitions(ENABLE_IC_PROXY) +link_directories($ENV{GPHOME}/lib) +add_executable(interconnect_ext_test ${interconnect_src} ${interconnect_ext_test_src}) +target_include_directories(interconnect_ext_test PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} "${CBDB_INCLUDE_DIR}" "${IC_MODULE_DIR}" "${UNIT_TEST_DIR}") + +target_link_libraries(interconnect_ext_test PUBLIC + pthread + uv + postgres) + +link_directories($ENV{GPHOME}/lib) +add_executable(ic_bench ${interconnect_src} ${ic_bench_src}) +target_include_directories(ic_bench PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} "${CBDB_INCLUDE_DIR}" "${IC_MODULE_DIR}" "${UNIT_TEST_DIR}") +target_link_libraries(ic_bench PUBLIC + pthread + uv + postgres) diff --git a/contrib/interconnect/test/elog_mock.c b/contrib/interconnect/test/elog_mock.c new file mode 100644 index 00000000000..16fac824140 --- /dev/null +++ b/contrib/interconnect/test/elog_mock.c @@ -0,0 +1,1024 @@ +/* + * Generated Mocking Source + */ +#include +#include +#include +#include "cmockery.h" + + +#include "postgres.h" + +#include +#include +#include +#include +#include +#ifdef HAVE_SYSLOG +#include +#endif +#ifdef HAVE_EXECINFO_H +#include +#endif + +#ifdef HAVE_EXECINFO_H +#include +#endif + +#include "access/transam.h" +#include "access/xact.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/postmaster.h" +#include "postmaster/syslogger.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" + +#include "cdb/cdbvars.h" +#include "cdb/cdbtm.h" +#include "utils/ps_status.h" +#include "cdb/cdbselect.h" +#include "pgtime.h" + +#include "miscadmin.h" + + +#define _DARWIN_C_SOURCE 1 +#include + + +#undef _ +#define _(x) err_gettext(x) + + + +ErrorContextCallback *error_context_stack = NULL; + +sigjmp_buf *PG_exception_stack = NULL; + +extern bool redirection_done; + + +emit_log_hook_type emit_log_hook = NULL; + + +int Log_error_verbosity = PGERROR_VERBOSE; +char *Log_line_prefix = NULL; +int Log_destination = LOG_DESTINATION_STDERR; +char *Log_destination_string = NULL; +bool syslog_sequence_numbers = true; +bool syslog_split_messages = true; + +#ifdef HAVE_SYSLOG + + +#ifndef PG_SYSLOG_LIMIT +#define PG_SYSLOG_LIMIT 900 +#endif + +static bool openlog_done = false; +static char *syslog_ident = NULL; +static int syslog_facility = LOG_LOCAL0; + +static void write_syslog(int level, const char *line); +#endif + +#ifdef WIN32 +extern char *event_source; + +static void write_eventlog(int level, const char *line, int len); +#endif + + +#define ERRORDATA_STACK_SIZE 10 + +#define CMD_BUFFER_SIZE 1024 +#define SYMBOL_SIZE 512 +#define ADDRESS_SIZE 20 +#define STACK_DEPTH_MAX 100 + + +#if defined(__i386) +#define ASMFP asm volatile ("movl %%ebp, %0" : "=g" (ulp)); +#define GET_PTR_FROM_VALUE(value) ((uint32)value) +#define GET_FRAME_POINTER(x) do { uint64 ulp; ASMFP; x = ulp; } while (0) +#elif defined(__x86_64__) +#define ASMFP asm volatile ("movq %%rbp, %0" : "=g" (ulp)); +#define GET_PTR_FROM_VALUE(value) (value) +#define GET_FRAME_POINTER(x) do { uint64 ulp; ASMFP; x = ulp; } while (0) +#else +#define ASMFP +#define GET_PTR_FROM_VALUE(value) (value) +#define GET_FRAME_POINTER(x) +#endif + + +static ErrorData errordata[ERRORDATA_STACK_SIZE]; + +static int errordata_stack_depth = -1; + +static int recursion_depth = 0; + + +static struct timeval saved_timeval; +static bool saved_timeval_set = false; + +#define FORMATTED_TS_LEN 128 +static char formatted_start_time[FORMATTED_TS_LEN]; +static char formatted_log_time[FORMATTED_TS_LEN]; + + + +#define CHECK_STACK_DEPTH() \ + do { \ + if (errordata_stack_depth < 0) \ + { \ + errordata_stack_depth = -1; \ + ereport(ERROR, (errmsg_internal("errstart was not called"))); \ + } \ + } while (0) + + +static void cdb_tidy_message(ErrorData *edata); +static const char *err_gettext(const char *str) pg_attribute_format_arg(1); +static pg_noinline void set_backtrace(ErrorData *edata, int num_skip); +static void set_errdata_field(MemoryContextData *cxt, char **ptr, const char *str); +static void write_console(const char *line, int len); +static void setup_formatted_log_time(void); +static void setup_formatted_start_time(void); +static const char *process_log_prefix_padding(const char *p, int *padding); +static void log_line_prefix(StringInfo buf, ErrorData *edata); +static void write_csvlog(ErrorData *edata); +static void send_message_to_server_log(ErrorData *edata); +static void write_pipe_chunks(char *data, int len, int dest); +static void send_message_to_frontend(ErrorData *edata); +static const char *error_severity(int elevel); +static void append_with_tabs(StringInfo buf, const char *str); +static bool is_log_level_output(int elevel, int log_min_level); +static void write_pipe_chunks(char *data, int len, int dest); +static void write_csvlog(ErrorData *edata); +static void elog_debug_linger(ErrorData *edata); + +inline void +ignore_returned_result(long long int result) +{ + mock(); +} + +static void setup_formatted_log_time(void); +static void setup_formatted_start_time(void); + + + +inline bool +is_log_level_output(int elevel, int log_min_level) +{ + return (bool) mock(); +} + + +inline bool +should_output_to_server(int elevel) +{ + return (bool) mock(); +} + + +inline bool +should_output_to_client(int elevel) +{ + return (bool) mock(); +} + + +bool +message_level_is_interesting(int elevel) +{ + check_expected(elevel); + return (bool) mock(); +} + + +bool +in_error_recursion_trouble(void) +{ + return (bool) mock(); +} + + +inline const char * +err_gettext(const char * str) +{ + return (const char *) mock(); +} + +pg_attribute_cold +bool +errstart_cold(int elevel, const char * domain) +{ + if (elevel >= 21) { + assert_true(false); + } + return true; +} + + +bool +errstart(int elevel, const char * domain) +{ + if (elevel >= 21) { + assert_true(false); + } + return true; +} + + +static bool +matches_backtrace_functions(const char * funcname) +{ + return (bool) mock(); +} + + +void +errfinish(const char * filename, int lineno, const char * funcname) +{ +} + + +ErrorData * +errfinish_and_return(const char * filename, int lineno, const char * funcname) +{ + check_expected(filename); + check_expected(lineno); + check_expected(funcname); + optional_assignment(filename); + optional_assignment(funcname); + return (ErrorData *) mock(); +} + + +void +errcode(int sqlerrcode) +{ + check_expected(sqlerrcode); + mock(); +} + + +void +errcode_for_file_access(void) +{ + mock(); +} + + +void +errcode_for_socket_access(void) +{ + mock(); +} + + +void +errcode_to_sqlstate(int errcode, char outbuf[6]) +{ + check_expected(errcode); + check_expected(outbuf); + mock(); +} + + +int +sqlstate_to_errcode(const char * sqlstate) +{ + check_expected(sqlstate); + optional_assignment(sqlstate); + return (int) mock(); +} + +#define EVALUATE_MESSAGE(domain, targetfield, appendval, translateit) \ + { \ + StringInfoData buf; \ + \ + if ((translateit) && !in_error_recursion_trouble()) \ + fmt = dgettext((domain), fmt); \ + initStringInfo(&buf); \ + if ((appendval) && edata->targetfield) { \ + appendStringInfoString(&buf, edata->targetfield); \ + appendStringInfoChar(&buf, '\n'); \ + } \ + \ + for (;;) \ + { \ + va_list args; \ + int needed; \ + errno = edata->saved_errno; \ + va_start(args, fmt); \ + needed = appendStringInfoVA(&buf, fmt, args); \ + va_end(args); \ + if (needed == 0) \ + break; \ + enlargeStringInfo(&buf, needed); \ + } \ + \ + if (edata->targetfield) \ + pfree(edata->targetfield); \ + edata->targetfield = pstrdup(buf.data); \ + pfree(buf.data); \ + } + + +#define EVALUATE_MESSAGE_PLURAL(domain, targetfield, appendval) \ + { \ + const char *fmt; \ + StringInfoData buf; \ + \ + if (!in_error_recursion_trouble()) \ + fmt = dngettext((domain), fmt_singular, fmt_plural, n); \ + else \ + fmt = (n == 1 ? fmt_singular : fmt_plural); \ + initStringInfo(&buf); \ + if ((appendval) && edata->targetfield) { \ + appendStringInfoString(&buf, edata->targetfield); \ + appendStringInfoChar(&buf, '\n'); \ + } \ + \ + for (;;) \ + { \ + va_list args; \ + int needed; \ + errno = edata->saved_errno; \ + va_start(args, n); \ + needed = appendStringInfoVA(&buf, fmt, args); \ + va_end(args); \ + if (needed == 0) \ + break; \ + enlargeStringInfo(&buf, needed); \ + } \ + \ + if (edata->targetfield) \ + pfree(edata->targetfield); \ + edata->targetfield = pstrdup(buf.data); \ + pfree(buf.data); \ + } + + + + +void +errmsg(const char * fmt, ...) {} + + +int +errbacktrace(void) +{ + return (int) mock(); +} + + +static void +set_backtrace(ErrorData * edata, int num_skip) +{ + mock(); +} + + +void +errmsg_internal(const char * fmt, ...) +{ +} + + +void +errmsg_plural(const char * fmt_singular, const char * fmt_plural, unsigned long n, ...) +{ + check_expected(fmt_singular); + check_expected(fmt_plural); + check_expected(n); + optional_assignment(fmt_singular); + optional_assignment(fmt_plural); + mock(); +} + + +void +errdetail(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + mock(); +} + + +void +errdetail_internal(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + mock(); +} + + +void +errdetail_log(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + mock(); +} + + +void +errdetail_log_plural(const char * fmt_singular, const char * fmt_plural, unsigned long n, ...) +{ + check_expected(fmt_singular); + check_expected(fmt_plural); + check_expected(n); + optional_assignment(fmt_singular); + optional_assignment(fmt_plural); + mock(); +} + + +void +errdetail_plural(const char * fmt_singular, const char * fmt_plural, unsigned long n, ...) +{ + check_expected(fmt_singular); + check_expected(fmt_plural); + check_expected(n); + optional_assignment(fmt_singular); + optional_assignment(fmt_plural); + mock(); +} + + +void +errhint(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + mock(); +} + + +int +errhint_plural(const char * fmt_singular, const char * fmt_plural, unsigned long n, ...) +{ + check_expected(fmt_singular); + check_expected(fmt_plural); + check_expected(n); + optional_assignment(fmt_singular); + optional_assignment(fmt_plural); + return (int) mock(); +} + + +int +errcontext_msg(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + return (int) mock(); +} + + +int +set_errcontext_domain(const char * domain) +{ + check_expected(domain); + optional_assignment(domain); + return (int) mock(); +} + + +void +errhidestmt(bool hide_stmt) +{ + check_expected(hide_stmt); + mock(); +} + + +void +errhidecontext(bool hide_ctx) +{ + check_expected(hide_ctx); + mock(); +} + + +int +errposition(int cursorpos) +{ + check_expected(cursorpos); + return (int) mock(); +} + + +int +errprintstack(bool printstack) +{ + check_expected(printstack); + return (int) mock(); +} + + +void +internalerrposition(int cursorpos) +{ + check_expected(cursorpos); + mock(); +} + + +void +internalerrquery(const char * query) +{ + check_expected(query); + optional_assignment(query); + mock(); +} + + +void +err_generic_string(int field, const char * str) +{ + check_expected(field); + check_expected(str); + optional_assignment(str); + mock(); +} + + +static void +set_errdata_field(MemoryContextData * cxt, char ** ptr, const char * str) +{ + mock(); +} + + +int +geterrcode(void) +{ + return (int) mock(); +} + + +int +geterrposition(void) +{ + return (int) mock(); +} + + +int +getinternalerrposition(void) +{ + return (int) mock(); +} + + +int +errFatalReturn(bool fatalReturn) +{ + check_expected(fatalReturn); + return (int) mock(); +} + +static int save_format_errnumber; +static const char *save_format_domain; + + +void +pre_format_elog_string(int errnumber, const char * domain) +{ + check_expected(errnumber); + check_expected(domain); + optional_assignment(domain); + mock(); +} + + +char * +format_elog_string(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + return (char *) mock(); +} + + +void +EmitErrorReport(void) +{ + mock(); +} + + +ErrorData * +CopyErrorData(void) +{ + return (ErrorData *) mock(); +} + + +void +FreeErrorData(ErrorData * edata) +{ + check_expected(edata); + optional_assignment(edata); + mock(); +} + + +void +FlushErrorState(void) +{ + mock(); +} + + +void +ThrowErrorData(ErrorData * edata) +{ + check_expected(edata); + optional_assignment(edata); + mock(); +} + +void +pg_re_throw(void) // __attribute__ ((noreturn)) +{ + +} + + +void +elog_exception_statement(const char* statement) +{ + check_expected(statement); + optional_assignment(statement); + mock(); +} + + +bool +elog_demote(int downgrade_to_elevel) +{ + check_expected(downgrade_to_elevel); + return (bool) mock(); +} + + +bool +elog_dismiss(int downgrade_to_elevel) +{ + check_expected(downgrade_to_elevel); + return (bool) mock(); +} + + +int +elog_geterrcode(void) +{ + return (int) mock(); +} + + +int +elog_getelevel(void) +{ + return (int) mock(); +} + + +char* +elog_message(void) +{ + return (char*) mock(); +} + + +char * +GetErrorContextStack(void) +{ + return (char *) mock(); +} + + +void +DebugFileOpen(void) +{ + mock(); +} + +#ifdef HAVE_SYSLOG + + + +void +set_syslog_parameters(const char * ident, int facility) +{ + check_expected(ident); + check_expected(facility); + optional_assignment(ident); + mock(); +} + + +static void +write_syslog(int level, const char * line) +{ + mock(); +} + +#endif + +#ifdef WIN32 + + +static int +GetACPEncoding(void) +{ + return (int) mock(); +} + + +static void +write_eventlog(int level, const char * line, int len) +{ + mock(); +} + +#endif + + + + + +static void +cdb_strip_trailing_whitespace(char ** buf) +{ + mock(); +} + + +void +cdb_tidy_message(ErrorData * edata) +{ + check_expected(edata); + optional_assignment(edata); + mock(); +} + + +static void +write_console(const char * line, int len) +{ + mock(); +} + + +static void +setup_formatted_log_time(void) +{ + mock(); +} + + +static void +setup_formatted_start_time(void) +{ + mock(); +} + + +static const char * +process_log_prefix_padding(const char * p, int * ppadding) +{ + return (const char *) mock(); +} + + +static void +log_line_prefix(StringInfo buf, ErrorData * edata) +{ + mock(); +} + + +inline void +appendCSVLiteral(StringInfo buf, const char * data) +{ + mock(); +} + + +static void +write_csvlog(ErrorData * edata) +{ + mock(); +} + + +char * +unpack_sql_state(int sql_state) +{ + check_expected(sql_state); + return (char *) mock(); +} + +#define WRITE_PIPE_CHUNK_TIMEOUT 1000 + + + +inline void +gp_write_pipe_chunk(const char * buffer, int len) +{ + mock(); +} + + +inline void +append_string_to_pipe_chunk(PipeProtoChunk * buffer, const char* input) +{ + mock(); +} + + +static void +append_stacktrace(PipeProtoChunk * buffer, StringInfo append, void *const * stackarray, int stacksize, bool amsyslogger) +{ + mock(); +} + + +inline void +write_syslogger_file_string(const char * str, bool amsyslogger, bool append_comma) +{ + mock(); +} + + +static void +write_syslogger_in_csv(ErrorData * edata, bool amsyslogger) +{ + mock(); +} + + +void +write_message_to_server_log(int elevel, int sqlerrcode, const char * message, const char * detail, const char * hint, const char * query_text, int cursorpos, int internalpos, const char * internalquery, const char * context, const char * funcname, bool show_funcname, const char * filename, int lineno, int stacktracesize, bool omit_location, void* const * stacktracearray, bool printstack) +{ + check_expected(elevel); + check_expected(sqlerrcode); + check_expected(message); + check_expected(detail); + check_expected(hint); + check_expected(query_text); + check_expected(cursorpos); + check_expected(internalpos); + check_expected(internalquery); + check_expected(context); + check_expected(funcname); + check_expected(show_funcname); + check_expected(filename); + check_expected(lineno); + check_expected(stacktracesize); + check_expected(omit_location); + check_expected(stacktracearray); + check_expected(printstack); + optional_assignment(message); + optional_assignment(detail); + optional_assignment(hint); + optional_assignment(query_text); + optional_assignment(internalquery); + optional_assignment(context); + optional_assignment(funcname); + optional_assignment(filename); + optional_assignment(stacktracearray); + mock(); +} + + +static void +send_message_to_server_log(ErrorData * edata) +{ + mock(); +} + + +static void +write_pipe_chunks(char * data, int len, int dest) +{ + mock(); +} + + +static void +err_sendstring(StringInfo buf, const char * str) +{ + mock(); +} + + +static void +send_message_to_frontend(ErrorData * edata) +{ + mock(); +} + + +static const char * +error_severity(int elevel) +{ + return (const char *) mock(); +} + + +static void +append_with_tabs(StringInfo buf, const char * str) +{ + mock(); +} + + +void +write_stderr(const char * fmt, ...) +{ + check_expected(fmt); + optional_assignment(fmt); + mock(); +} + + +int +trace_recovery(int trace_level) +{ + check_expected(trace_level); + return (int) mock(); +} + + +void +elog_debug_linger(ErrorData * edata) +{ + check_expected(edata); + optional_assignment(edata); + mock(); +} + + +void +debug_backtrace(void) +{ + mock(); +} + + +uint32 +gp_backtrace(void ** stackAddresses, uint32 maxStackDepth) +{ + check_expected(stackAddresses); + check_expected(maxStackDepth); + optional_assignment(stackAddresses); + return (uint32) mock(); +} + + +char * +gp_stacktrace(void ** stackAddresses, uint32 stackDepth) +{ + check_expected(stackAddresses); + check_expected(stackDepth); + optional_assignment(stackAddresses); + return (char *) mock(); +} + + +const char * +SegvBusIllName(int signal) +{ + check_expected(signal); + return (const char *) mock(); +} + + +void +StandardHandlerForSigillSigsegvSigbus_OnMainThread(char * processName, int signal_args) +{ + check_expected(processName); + check_expected(signal_args); + optional_assignment(processName); + mock(); +} diff --git a/contrib/interconnect/test/ic_bench.c b/contrib/interconnect/test/ic_bench.c new file mode 100644 index 00000000000..763c6d743b2 --- /dev/null +++ b/contrib/interconnect/test/ic_bench.c @@ -0,0 +1,704 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "postgres.h" +#include "ic_modules.h" +#include "ic_internal.h" +#include "postmaster/postmaster.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "storage/latch.h" +#include "ic_test_env.h" + +#define MTU_LESS_LEN (sizeof(struct icpkthdr) + TUPLE_CHUNK_HEADER_SIZE) + 1 + +const char *progname = NULL; +static MemoryContext testMemoryContext = NULL; + +volatile bool interrupt_flag = false; +bool am_client_side = false; +pid_t server_side_pid = -1; +pid_t client_side_pid = -1; + +pid_t server_ic_proxy_pid = -1; +pid_t client_ic_proxy_pid = -1; + +static struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"type", required_argument, NULL, 't'}, + {"interval", required_argument, NULL, 'i'}, + {"verify", optional_argument, NULL, 'v'}, + {"mtu", required_argument, NULL, 'm'}, + {"direct", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0} +}; + +struct bench_options +{ + int ic_type; + int interval; + bool should_verify; + /* in fact, mtu is currently only valid for the client side */ + int mtu; + int bsize; + bool direct_buffer; + + MotionIPCLayer *ipc_layer; +}; + +static inline void +usage() +{ + printf("%s - Interconntect benchmark \n\n", progname); + printf("Usage:\n %s [OPTION]...\n\n", progname); + printf("Options:\n"); + + printf(" -t, --type Specify the interconnection type to run benchmark\n" + " The value range is [0-2]:\n" + " 0 - tcp\n" + " 1 - udpifc\n" + " 2 - proxy\n" + " default tcp(0)\n"); + printf(" -i, --interval The duration of the benchmark. default is \"60s\"\n"); + printf(" -v, --verify Verify the result in recv side. default is \"false\"\n"); + printf(" -m, --mtu The MTU setting. default is \"1500\"\n"); + printf(" -b, --bsize The each buffer send size. default is \"200\"\n"); + printf(" -d, --direct Use direct buffer in sender. default is \"false\"\n"); +} + +static void +init_memory_context() +{ + if (NULL == TopMemoryContext) + { + assert(NULL == testMemoryContext); + MemoryContextInit(); + + testMemoryContext = AllocSetContextCreate(TopMemoryContext, + "Test Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + MemoryContextSwitchTo(testMemoryContext); + } +} + +static void +destroy_memory_context() +{ + MemoryContextReset(testMemoryContext); + testMemoryContext = NULL; + TopMemoryContext = NULL; + CurrentMotionIPCLayer = NULL; +} + +static EState * +client_side_setup(const struct bench_options *options, int stc[2], int cts[2]) +{ + int32 listen_port = 0; + int32 server_listen_port = 0; + pid_t c_pid = 0; + int8 already_setup = 1; + EState *estate; + + am_client_side = true; + client_side_global_var_init(options->ipc_layer, &client_ic_proxy_pid); + Gp_max_packet_size = options->mtu; + + CurrentMotionIPCLayer->InitMotionLayerIPC(); + + listen_port = CurrentMotionIPCLayer->GetListenPort(); + if (listen_port == 0) + { + printf("failed to init motion layer ipc."); + return NULL; + } + + write_data_to_pipe(cts, listen_port, int32); + read_data_from_pipe(stc, &server_listen_port, int32); + + c_pid = getpid(); + write_data_to_pipe(cts, c_pid, pid_t); + read_data_from_pipe(stc, &server_side_pid, pid_t); + + estate = prepare_estate( /* local_slice */ 1, server_listen_port, + listen_port, + server_side_pid, + c_pid); + + CurrentMotionIPCLayer->SetupInterconnect(estate); + if (!estate->es_interconnect_is_setup || !estate->interconnect_context) + { + cleanup_estate(estate); + printf("failed to setup motion layer ipc."); + return NULL; + } + + write_data_to_pipe(cts, already_setup, int8); + return estate; +} + +EState * +server_side_setup(const struct bench_options *options, + int stc[2], + int cts[2]) +{ + int32 listen_port = 0; + int32 client_listen_port = 0; + int8 already_setup = 0; + pid_t c_pid = 0; + EState *estate; + + am_client_side = false; + server_side_global_var_init(options->ipc_layer, &server_ic_proxy_pid); + Gp_max_packet_size = options->mtu; + + CurrentMotionIPCLayer->InitMotionLayerIPC(); + + listen_port = CurrentMotionIPCLayer->GetListenPort(); + if (listen_port == 0) + { + printf("failed to init motion layer ipc."); + return NULL; + } + + read_data_from_pipe(cts, &client_listen_port, int32); + write_data_to_pipe(stc, listen_port, int32); + + c_pid = getpid(); + read_data_from_pipe(cts, &client_side_pid, pid_t); + write_data_to_pipe(stc, c_pid, pid_t); + + estate = prepare_estate( /* local_slice */ 0, + listen_port, + client_listen_port, + c_pid, + client_side_pid); + + CurrentMotionIPCLayer->SetupInterconnect(estate); + if (!estate->es_interconnect_is_setup || !estate->interconnect_context) + { + cleanup_estate(estate); + printf("failed to setup motion layer ipc."); + return NULL; + } + + /* waiting for client setup */ + read_data_from_pipe(cts, &already_setup, int8); + if (already_setup != 1) + { + printf("failed to recv client setup signal"); + cleanup_estate(estate); + return NULL; + } + + return estate; +} + +void +sig_handler(int sig_num) +{ + switch (sig_num) + { + case SIGALRM: + case SIGUSR1: + { + interrupt_flag = true; + break; + } + default: + { + /* do nothing */ + break; + } + } +} + +void +sig_stop() +{ + interrupt_flag = true; + if (am_client_side) + { + assert(server_side_pid != -1); + kill(server_side_pid, SIGUSR1); + } + else + { + assert(client_side_pid != -1); + kill(client_side_pid, SIGUSR1); + } +} + +static TupleChunkListData * +build_chunk_tuple_slot(EState *estate, size_t size) +{ + TupleChunkListData *tc_list; + char tc_list_raw_buffer[size]; + + generate_seq_buffer(tc_list_raw_buffer, size); + tc_list = prepare_chunk_list_raw_data(tc_list_raw_buffer, size); + + return tc_list; +} + +static bool +measure_chunk_tuple_list(char *verify_buffer, int verify_buff_len, TupleChunkListItem tc_item, + uint64 *total_recv_size, uint64 *total_recv_chunk_item_counts) +{ + + TupleChunkListItem p_curr = tc_item; + TupleChunkListItem p_last; + + if (!p_curr) + { + printf("recv got empty TupleChunkListItem.\n"); + return false; + } + + while (p_curr) + { + if (verify_buffer && verify_buff_len != 0 && !verify_chunk_list_raw_data(p_curr, verify_buffer, verify_buff_len)) + { + printf("recv TupleChunkListItem not matched.\n"); + return false; + } + + *total_recv_size = *total_recv_size + p_curr->chunk_length; + (*total_recv_chunk_item_counts)++; + p_last = p_curr; + p_curr = p_curr->p_next; + pfree(p_last); + } + + return true; +} + +static void +print_direct_mode_summary(const struct bench_options *options, + const uint64 direct_hit, + const uint64 non_direct_hit) +{ + char pbuff[1024 * 100]; + int n = 0; + + setbuf(stdout, NULL); + n = sprintf(pbuff, "+----------------+------------+\n"); + n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Direct hits", direct_hit); + n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Ndirect hits", non_direct_hit); + /* non-direct hits/direct hits */ + n += sprintf(pbuff + n, "| %-14s | %10.4f |\n", "n/d hits rate", direct_hit == 0 + ? 0 : (double) non_direct_hit / direct_hit); + /* buffer size/mtu */ + n += sprintf(pbuff + n, "| %-14s | %10.4f |\n", "b/m ratio", (double) options->bsize / options->mtu); + sprintf(pbuff + n, "+----------------+------------+\n"); + + printf("%s", pbuff); +} + + +void +client_loop(const struct bench_options *options, int stc[2], int cts[2]) +{ + EState *estate; + struct itimerval timeout_val; + bool has_error = false; + TupleChunkListData *tc_list_raw_buffer; + struct directTransportBuffer direct_buffer; + int8 already_stop = 0; + + uint64 direct_hit = 0; + uint64 non_direct_hit = 0; + + init_memory_context(); + estate = client_side_setup(options, stc, cts); + if (!estate) + { + /* can not use signal to notify server side now */ + printf("client side setup failed.\n"); + return; + } + + signal(SIGALRM, sig_handler); + signal(SIGUSR1, sig_handler); + + tc_list_raw_buffer = build_chunk_tuple_slot(estate, options->bsize); + + timeout_val.it_value.tv_sec = options->interval; + timeout_val.it_value.tv_usec = 0; + timeout_val.it_interval.tv_sec = 0; + timeout_val.it_interval.tv_usec = 0; + setitimer(ITIMER_REAL, &timeout_val, NULL); + + while (true) + { + if (interrupt_flag) + { + kill(server_side_pid, SIGALRM); + int n = 0; + + if (fcntl(stc[0], F_SETFL, fcntl(stc[0], F_GETFL) | O_NONBLOCK) != 0) + { + printf("client side exit failed.\n"); + return; + } + + /* waiting for server side stuck in recv or break. */ + sleep(0.1); + while ((n = read(stc[0], &already_stop, sizeof(int8))) < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); + continue; + } + else + { + printf("client side read signal failed failed.\n"); + } + } + break; + } + + if (options->direct_buffer) + { + CurrentMotionIPCLayer->GetTransportDirectBuffer(estate->interconnect_context, 1, 0, &direct_buffer); + if (direct_buffer.prilen < tc_list_raw_buffer->p_first->chunk_length) + { + non_direct_hit++; + CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); + } + else + { + direct_hit++; + memcpy(direct_buffer.pri, tc_list_raw_buffer->p_first->chunk_data, tc_list_raw_buffer->p_first->chunk_length); + CurrentMotionIPCLayer->PutTransportDirectBuffer(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first->chunk_length); + } + } + else + { + CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list_raw_buffer->p_first); + } + } + + if (options->direct_buffer) + { + print_direct_mode_summary(options, direct_hit, non_direct_hit); + } + + + CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); + assert(!has_error); + + CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); + + shutdown_ic_proxy_if_need(client_ic_proxy_pid); + + cleanup_estate(estate); + destroy_memory_context(); +} + +static void +print_summary(const double elapsed_time, const uint32 loop_times, + const uint64 total_recv_size, const uint64 total_recv_chunk_item_counts) +{ + char pbuff[1024 * 100]; + int n = 0; + + setbuf(stdout, NULL); + n = sprintf(pbuff, "+----------------+------------+\n"); + n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "Total time(s)", elapsed_time / 1000); + n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Loop times", loop_times); + n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "LPS(l/ms)", (double) (loop_times / elapsed_time)); + n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Recv mbs", total_recv_size / 1024 / 1024); + n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "TPS(mb/s)", (double) (total_recv_size / (elapsed_time / 1000) / 1024 / 1024)); + n += sprintf(pbuff + n, "| %-14s | %10ld |\n", "Recv counts", total_recv_chunk_item_counts); + n += sprintf(pbuff + n, "| %-14s | %10.3f |\n", "Items ops/ms", (double) (total_recv_chunk_item_counts / (elapsed_time))); + sprintf(pbuff + n, "+----------------+------------+\n"); + printf("%s", pbuff); +} + +void +server_loop(const struct bench_options *options, int stc[2], int cts[2]) +{ + EState *estate; + struct timeval start_time, + end_time; + double elapsed_time; + TupleChunkListItem tc_item; + bool has_error = false; + uint32 loop_times = 0; + uint64 total_recv_size = 0; + uint64 total_recv_chunk_item_counts = 0; + char *tc_item_raw_verify_buff; + int tc_item_raw_verify_buff_len = 0; + int8 already_stop = 1; + + init_memory_context(); + estate = server_side_setup(options, stc, cts); + if (!estate) + { + /* can not use signal to notify client side now */ + printf("server side setup failed.\n"); + return; + } + + signal(SIGALRM, sig_handler); + signal(SIGUSR1, sig_handler); + + if (options->should_verify) + { + tc_item_raw_verify_buff_len = options->bsize; + tc_item_raw_verify_buff = palloc(tc_item_raw_verify_buff_len); + generate_seq_buffer(tc_item_raw_verify_buff, tc_item_raw_verify_buff_len); + } + + gettimeofday(&start_time, NULL); + while (true) + { + if (interrupt_flag) + { + write_data_to_pipe(stc, already_stop, int8); + break; + } + + tc_item = CurrentMotionIPCLayer->RecvTupleChunkFrom(estate->interconnect_context, 1, 0); + if (!measure_chunk_tuple_list(tc_item_raw_verify_buff, + tc_item_raw_verify_buff_len, + tc_item, + &total_recv_size, + &total_recv_chunk_item_counts)) + { + sig_stop(); + } + + if (CurrentMotionIPCLayer->ic_type == INTERCONNECT_TYPE_UDPIFC) + { + CurrentMotionIPCLayer->DirectPutRxBuffer(estate->interconnect_context, 1, 0); + } + + loop_times++; + } + + gettimeofday(&end_time, NULL); + elapsed_time = (double) (end_time.tv_sec - start_time.tv_sec) * 1000 + + (double) (end_time.tv_usec - start_time.tv_usec) / 1000; + + print_summary(elapsed_time, loop_times, total_recv_size, total_recv_chunk_item_counts); + + if (options->should_verify) + { + pfree(tc_item_raw_verify_buff); + } + + CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); + assert(!has_error); + + CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); + shutdown_ic_proxy_if_need(server_ic_proxy_pid); + + cleanup_estate(estate); + destroy_memory_context(); +} + + +int +main(int argc, char *argv[]) +{ + int stc[2], + cts[2]; + pid_t f_pid; + int c; + + struct bench_options options = { + .ic_type = 0, + .interval = 60, + .should_verify = false, + /* the default MTU is 8192 in GUC + * but in a production environment, DBA will generally set it to 1500 + */ + .mtu = 1500, + .bsize = TUPLE_CHUNK_RAW_BUFFER_LEN, + .ipc_layer = NULL, + .direct_buffer = false + }; + + progname = get_progname(argv[0]); + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0 + || strcmp(argv[1], "-h") == 0) + { + usage(); + exit(0); + } + } + + optind = 1; + while (optind < argc) + { + while ((c = getopt_long(argc, argv, "t:i:vm:b:d", + long_options, NULL)) != -1) + { + switch (c) + { + case 't': + { + char *type_c; + + type_c = strdup(optarg); + options.ic_type = atoi(type_c); + free(type_c); + break; + } + case 'i': + { + char *interval_c; + + interval_c = strdup(optarg); + options.interval = atoi(interval_c); + free(interval_c); + break; + } + case 'v': + { + options.should_verify = true; + break; + } + case 'm': + { + char *mtu_c; + + mtu_c = strdup(optarg); + options.mtu = atoi(mtu_c); + free(mtu_c); + break; + } + case 'b': + { + char *buffer_size_c; + + buffer_size_c = strdup(optarg); + options.bsize = atoi(buffer_size_c); + free(buffer_size_c); + break; + } + case 'd': + { + options.direct_buffer = true; + break; + } + default: + { + /* do nothing */ + break; + } + } + } + } + + switch (options.ic_type) + { + case INTERCONNECT_TYPE_TCP: + { + options.ipc_layer = &tcp_ipc_layer; + break; + } + case INTERCONNECT_TYPE_UDPIFC: + { + options.ipc_layer = &udpifc_ipc_layer; + break; + } + case INTERCONNECT_TYPE_PROXY: + { + options.ipc_layer = &proxy_ipc_layer; + break; + } + default: + { + printf("invalid of args -t/--type %d\n", options.ic_type); + usage(); + return -1; + } + } + + if (options.interval <= 1) + { + printf("invalid of args -i/--interval %d\n", options.interval); + usage(); + return -1; + } + + if (options.mtu < MTU_LESS_LEN) + { + printf("invalid of args -m/--mtu %d, should not less than \n", options.mtu, MTU_LESS_LEN); + usage(); + return -1; + } + + if (options.ic_type == INTERCONNECT_TYPE_PROXY && options.mtu != 1500) + { + printf("invalid of args -m/--mtu %d, proxy not allow setting mtu. \n", options.mtu); + usage(); + return -1; + } + + if (options.bsize <= 0 || options.bsize >= (options.mtu - MTU_LESS_LEN)) + { + printf("invalid of args -b/--bsize %d, should not bigger than (mtu - header).\n", options.bsize); + usage(); + return -1; + } + + if (pipe(stc) < 0) + { + printf("pipe created failed. errno: %d\n", errno); + return -1; + } + + if (pipe(cts) < 0) + { + printf("pipe created failed. errno: %d\n", errno); + close(stc[0]); + close(stc[1]); + return -1; + } + + f_pid = fork(); + + if (f_pid < 0) + { + printf("fork failed. errno: %d\n", errno); + close(stc[0]); + close(stc[1]); + close(cts[0]); + close(cts[1]); + return -1; + } + + if (f_pid == 0) + { + client_loop(&options, stc, cts); + close(stc[0]); + close(stc[1]); + close(cts[0]); + close(cts[1]); + } + else + { + server_loop(&options, stc, cts); + } + + wait(NULL); + close(stc[0]); + close(stc[1]); + close(cts[0]); + close(cts[1]); + return 0; +} diff --git a/contrib/interconnect/test/ic_interface_test.c b/contrib/interconnect/test/ic_interface_test.c new file mode 100644 index 00000000000..b1da34e84a1 --- /dev/null +++ b/contrib/interconnect/test/ic_interface_test.c @@ -0,0 +1,345 @@ + +#include +#include + +#include "postgres.h" +#include "cmockery.h" +#include "ic_modules.h" +#include "postmaster/postmaster.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "storage/latch.h" +#include "ic_test_env.h" + +const char *progname = NULL; +static MemoryContext testMemoryContext = NULL; +pid_t server_ic_proxy_pid = -1; +pid_t client_ic_proxy_pid = -1; + +static void +verify_chunk_tuple_list_serialized_tuple(TupleChunkListItem tc_item) +{ + HeapTupleTableSlot *heap_tuple_slot = NULL; + + char tc_item_verify_buff[COLUMN_TEXT_LEN]; + + heap_tuple_slot = get_tuple_slot_from_chunk_list_serialized_tuple(tc_item); + assert_true(heap_tuple_slot); + + assert_int_equal(VARSIZE_ANY_EXHDR(heap_tuple_slot->base.tts_values[0]), COLUMN_TEXT_LEN); + + generate_seq_buffer(tc_item_verify_buff, COLUMN_TEXT_LEN); + /* 1b string header */ + assert_string_equal(tc_item_verify_buff, DatumGetPointer(heap_tuple_slot->base.tts_values[0]) + 1); + assert_int_equal(DatumGetPointer(heap_tuple_slot->base.tts_values[1]), COLUMN_INT4_VALUE); + + cleanup_heap_tuple_slot(heap_tuple_slot); +} + +static void +verify_chunk_tuple_list(TupleChunkListItem tc_item) +{ + char tc_item_raw_verify_buff[TUPLE_CHUNK_RAW_BUFFER_LEN]; + + assert_true(tc_item); + assert_true(tc_item->p_next); + assert_false(tc_item->p_next->p_next); + + verify_chunk_tuple_list_serialized_tuple(tc_item); + generate_seq_buffer(tc_item_raw_verify_buff, TUPLE_CHUNK_RAW_BUFFER_LEN); + assert_true(verify_chunk_list_raw_data(tc_item->p_next, tc_item_raw_verify_buff, TUPLE_CHUNK_RAW_BUFFER_LEN)); +} + + +static TupleChunkListData * +build_chunk_tuple_slot(EState *estate, bool direct) +{ + TupleChunkListData *tc_list; + TupleTableSlot *slot; + int sent = 0; + TupleChunkListItem item; + int32 raw_buffer_len = 0; + + if (direct) + { + SerTupInfo tup_info; + struct directTransportBuffer direct_buffer; + + tc_list = (TupleChunkListData *) palloc0(sizeof(TupleChunkListData)); + + slot = prepare_tuple_slot(); + tup_info.tupdesc = slot->tts_tupleDescriptor; + + CurrentMotionIPCLayer->GetTransportDirectBuffer(estate->interconnect_context, 1, 0, &direct_buffer); + assert_int_not_equal(direct_buffer.pri, 0); + assert_int_not_equal(direct_buffer.prilen, 0); + + sent = SerializeTuple(slot, &tup_info, &direct_buffer, tc_list, 0); + assert_int_not_equal(sent, 0); + + CurrentMotionIPCLayer->PutTransportDirectBuffer(estate->interconnect_context, 1, 0, sent); + cleanup_tuple_slot(slot); + } + else + { + /* Don't use InitSerTupInfo */ + /* cause can't call SerializeTuple with system cache */ + /* + * just build tc_list without call SerializeTuple when `direct` is + * false + */ + + char tc_list_raw_buffer[TUPLE_CHUNK_RAW_BUFFER_LEN]; + + generate_seq_buffer(tc_list_raw_buffer, TUPLE_CHUNK_RAW_BUFFER_LEN); + tc_list = prepare_chunk_list_raw_data(tc_list_raw_buffer, TUPLE_CHUNK_RAW_BUFFER_LEN); + } + + return tc_list; +} + +static EState * +client_side_setup(MotionIPCLayer * motion_ipc_layer, int stc[2], int cts[2]) +{ + int32 listen_port = 0; + int32 server_listen_port = 0; + pid_t c_pid = 0; + pid_t server_pid = 0; + int8 already_setup = 1; + EState *estate; + + client_side_global_var_init(motion_ipc_layer, &client_ic_proxy_pid); + + CurrentMotionIPCLayer->InitMotionLayerIPC(); + + listen_port = CurrentMotionIPCLayer->GetListenPort(); + assert_int_not_equal(listen_port, 0); + + write_data_to_pipe(cts, listen_port, int32); + read_data_from_pipe(stc, &server_listen_port, int32); + + c_pid = getpid(); + write_data_to_pipe(cts, c_pid, pid_t); + read_data_from_pipe(stc, &server_pid, pid_t); + + estate = prepare_estate( /* local_slice */ 1, server_listen_port, + listen_port, + server_pid, + c_pid); + + CurrentMotionIPCLayer->SetupInterconnect(estate); + assert_true(estate->es_interconnect_is_setup); + assert_true(estate->interconnect_context != NULL); + + write_data_to_pipe(cts, already_setup, int8); + return estate; +} + +static void +forked_client(void **state, int stc[2], int cts[2], MotionIPCLayer * motion_ipc_layer) +{ + EState *estate; + TupleChunkListData *tc_list1; + TupleChunkListData *tc_list2; + bool has_error = false; + + assert_false(CurrentMotionIPCLayer); + + estate = client_side_setup(motion_ipc_layer, stc, cts); + assert_true(estate); + assert_true(CurrentMotionIPCLayer); + + tc_list1 = build_chunk_tuple_slot(estate, true); + tc_list2 = build_chunk_tuple_slot(estate, false); + + CurrentMotionIPCLayer->SendTupleChunkToAMS(estate->interconnect_context, 1, 0, tc_list1->p_first); + CurrentMotionIPCLayer->SendEOS(estate->interconnect_context, 1, tc_list2->p_first); + + CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); + assert_false(has_error); + + CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); + + shutdown_ic_proxy_if_need(client_ic_proxy_pid); + + cleanup_chunk_list_raw_data(tc_list1); + cleanup_chunk_list_raw_data(tc_list2); + cleanup_estate(estate); +} + + +static EState * +server_side_setup(MotionIPCLayer * motion_ipc_layer, int stc[2], int cts[2]) +{ + int32 listen_port = 0; + int32 client_listen_port = 0; + pid_t c_pid = 0; + pid_t client_pid = 0; + int8 already_setup = 1; + EState *estate; + + server_side_global_var_init(motion_ipc_layer, &server_ic_proxy_pid); + + CurrentMotionIPCLayer->InitMotionLayerIPC(); + + listen_port = CurrentMotionIPCLayer->GetListenPort(); + assert_int_not_equal(listen_port, 0); + + read_data_from_pipe(cts, &client_listen_port, int32); + write_data_to_pipe(stc, listen_port, int32); + + c_pid = getpid(); + read_data_from_pipe(cts, &client_pid, pid_t); + write_data_to_pipe(stc, c_pid, pid_t); + + estate = prepare_estate( /* local_slice */ 0, listen_port, + client_listen_port, + c_pid, + client_pid); + + CurrentMotionIPCLayer->SetupInterconnect(estate); + assert_true(estate->es_interconnect_is_setup); + assert_true(estate->interconnect_context != NULL); + + /* waiting for client setup */ + read_data_from_pipe(cts, &already_setup, int8); + assert_int_equal(1, already_setup); + + return estate; +} + +static void +main_server(void **state, int stc[2], int cts[2], MotionIPCLayer * motion_ipc_layer) +{ + int16 src_route = 0; + TupleChunkListItem tc_item; + EState *estate; + bool has_error = false; + + assert_false(CurrentMotionIPCLayer); + + estate = server_side_setup(motion_ipc_layer, stc, cts); + assert_true(estate); + assert_true(CurrentMotionIPCLayer); + + tc_item = CurrentMotionIPCLayer->RecvTupleChunkFromAny(estate->interconnect_context, 1, &src_route); + verify_chunk_tuple_list(tc_item); + + CurrentMotionIPCLayer->TeardownInterconnect(estate->interconnect_context, &has_error); + assert_false(has_error); + + CurrentMotionIPCLayer->CleanUpMotionLayerIPC(); + + shutdown_ic_proxy_if_need(server_ic_proxy_pid); + + pfree(tc_item); + cleanup_estate(estate); +} + +static void +test_ic_interface(void **state, MotionIPCLayer * motion_ipc_layer) +{ + pid_t f_pid; + int stc[2], + cts[2]; + + assert_false(CurrentMotionIPCLayer); + + assert_int_equal(pipe(stc), 0); + assert_int_equal(pipe(cts), 0); + + f_pid = fork(); + + assert_true(f_pid >= 0); + if (f_pid == 0) + { + forked_client(state, stc, cts, motion_ipc_layer); + + close(stc[0]); + close(stc[1]); + close(cts[0]); + close(cts[1]); + MemoryContextReset(testMemoryContext); + testMemoryContext = NULL; + + /* + * client side should not return assert can be catch in forked process + */ + exit(0); + } + + main_server(state, stc, cts, motion_ipc_layer); + wait(NULL); + + close(stc[0]); + close(stc[1]); + close(cts[0]); + close(cts[1]); +} + +static void +test_ic_interface_tcp(void **state) +{ + test_ic_interface(state, &tcp_ipc_layer); +} + +static void +test_ic_interface_udpifc(void **state) +{ + test_ic_interface(state, &udpifc_ipc_layer); +} + +static void +test_ic_interface_proxy(void **state) +{ + if (!proxy_ipc_layer.IcProxyServiceMain) + { + printf("enable-ic-proxy not set. skiped.\n"); + return; + } + test_ic_interface(state, &proxy_ipc_layer); +} + +static void +setup_data_structures(void **state) +{ + if (NULL == TopMemoryContext) + { + assert_true(NULL == testMemoryContext); + MemoryContextInit(); + + testMemoryContext = AllocSetContextCreate(TopMemoryContext, + "Test Context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + MemoryContextSwitchTo(testMemoryContext); + } +} + +static void +teardown_data_structures(void **state) +{ + MemoryContextReset(testMemoryContext); + testMemoryContext = NULL; + TopMemoryContext = NULL; + CurrentMotionIPCLayer = NULL; +} + +int +main(int argc, char *argv[]) +{ + progname = get_progname(argv[0]); + + cmockery_parse_arguments(argc, argv); + + const UnitTest tests[] = { + unit_test_setup_teardown(test_ic_interface_tcp, setup_data_structures, teardown_data_structures), + unit_test_setup_teardown(test_ic_interface_udpifc, setup_data_structures, teardown_data_structures), + unit_test_setup_teardown(test_ic_interface_proxy, setup_data_structures, teardown_data_structures) + }; + + return run_tests(tests); + return 0; +} diff --git a/contrib/interconnect/test/ic_test_env.c b/contrib/interconnect/test/ic_test_env.c new file mode 100644 index 00000000000..5333a143de5 --- /dev/null +++ b/contrib/interconnect/test/ic_test_env.c @@ -0,0 +1,409 @@ + +#include +#include +#include + +#include "postgres.h" +#include "ic_test_env.h" +#include "ic_modules.h" +#include "postmaster/postmaster.h" +#include "utils/memutils.h" +#include "storage/latch.h" + + +void +generate_seq_buffer(char *buffer, const size_t buffer_len) +{ + for (size_t i = 0; i < buffer_len; i++) + { + buffer[i] = i; + } +} + +TupleDesc +prepare_tuple_desc() +{ + TupleDesc tuple_desc; + + tuple_desc = palloc0(sizeof(TupleDescData) + sizeof(FormData_pg_attribute) * 2); + tuple_desc->natts = COLUMN_NUMS; + tuple_desc->attrs[0].attlen = -1; + tuple_desc->attrs[0].attbyval = false; + tuple_desc->attrs[0].attalign = TYPALIGN_CHAR; + tuple_desc->attrs[1].attlen = 4; + tuple_desc->attrs[1].attbyval = true; + tuple_desc->attrs[1].attalign = TYPALIGN_INT; + + return tuple_desc; +} + +TupleTableSlot * +prepare_tuple_slot() +{ + TupleDesc tuple_desc; + TupleTableSlot *slot; + text *column1_text; + Size basesz = 0, + allocsz = 0; + TupleTableSlotOps *current_slot_ops = (TupleTableSlotOps *) &TTSOpsVirtual; + + tuple_desc = prepare_tuple_desc(); + + basesz = current_slot_ops->base_slot_size; + + allocsz = MAXALIGN(basesz) + MAXALIGN(tuple_desc->natts * sizeof(Datum)) + + MAXALIGN(tuple_desc->natts * sizeof(bool)); + + slot = palloc0(allocsz); + + *((const TupleTableSlotOps **) &slot->tts_ops) = current_slot_ops; + slot->type = T_TupleTableSlot; + slot->tts_flags = TTS_FLAG_EMPTY | TTS_FLAG_FIXED; + slot->tts_tupleDescriptor = tuple_desc; + slot->tts_mcxt = CurrentMemoryContext; + slot->tts_nvalid = 0; + + slot->tts_values = (Datum *) (((char *) (slot)) + MAXALIGN(basesz)); + slot->tts_isnull = (bool *) ( + (((char *) (slot)) + MAXALIGN(basesz) + + MAXALIGN(tuple_desc->natts * sizeof(Datum)))); + slot->tts_isnull[0] = false; + slot->tts_isnull[1] = false; + slot->tts_ops->init(slot); + + column1_text = (text *) (palloc0(COLUMN_TEXT_LEN + VARHDRSZ)); + SET_VARSIZE(column1_text, COLUMN_TEXT_LEN + VARHDRSZ); + generate_seq_buffer(((char *) VARDATA(column1_text)), COLUMN_TEXT_LEN); + + slot->tts_values[0] = PointerGetDatum(column1_text); + slot->tts_values[1] = Int32GetDatum(COLUMN_INT4_VALUE); + + return slot; +} + +HeapTupleTableSlot * +get_tuple_slot_from_chunk_list_serialized_tuple(TupleChunkListItem tc_item) +{ + char *tc_item_data = NULL; + int tc_item_len = 0; + char *tc_item_body_pos = NULL; + int tc_item_body_len = 0; + char *tc_item_minimal_tuple_body = NULL; + unsigned int tc_item_minimal_tuple_len = 0; + MinimalTuple tc_item_minimal_tuple = NULL; + HeapTuple tc_item_heap_tuple = NULL; + Size tc_item_heap_tuple_desc_basesz = 0, + tc_item_heap_tuple_desc_allocsz = 0; + HeapTupleTableSlot *tc_item_heap_tuple_slot = NULL; + + tc_item_data = (char *) GetChunkDataPtr(tc_item) + TUPLE_CHUNK_HEADER_SIZE; + tc_item_len = tc_item->chunk_length - TUPLE_CHUNK_HEADER_SIZE; + + tc_item_body_pos = tc_item_data; + + memcpy(&tc_item_body_len, tc_item_body_pos, sizeof(tc_item_body_len)); + tc_item_body_pos += sizeof(tc_item_body_len); + + tc_item_minimal_tuple_len = tc_item_body_len + MINIMAL_TUPLE_DATA_OFFSET; + + tc_item_minimal_tuple = palloc0(tc_item_minimal_tuple_len); + tc_item_minimal_tuple->t_len = tc_item_minimal_tuple_len; + + tc_item_minimal_tuple_body = (char *) tc_item_minimal_tuple + MINIMAL_TUPLE_DATA_OFFSET; + memcpy(tc_item_minimal_tuple_body, tc_item_body_pos, tc_item_body_len); + + tc_item_heap_tuple = heap_tuple_from_minimal_tuple(tc_item_minimal_tuple); + TupleDesc heap_tuple_desc = prepare_tuple_desc(); + + tc_item_heap_tuple_desc_basesz = TTSOpsHeapTuple.base_slot_size; + + tc_item_heap_tuple_desc_allocsz = MAXALIGN(tc_item_heap_tuple_desc_basesz) + MAXALIGN(heap_tuple_desc->natts * sizeof(Datum)) + + MAXALIGN(heap_tuple_desc->natts * sizeof(bool)); + + tc_item_heap_tuple_slot = palloc0(sizeof(tc_item_heap_tuple_desc_allocsz)); + tc_item_heap_tuple_slot->tuple = tc_item_heap_tuple; + tc_item_heap_tuple_slot->off = 0; + tc_item_heap_tuple_slot->base.tts_tupleDescriptor = heap_tuple_desc; + tc_item_heap_tuple_slot->base.tts_isnull = (bool *) ( + (((char *) (&tc_item_heap_tuple_slot->base)) + MAXALIGN(tc_item_heap_tuple_desc_basesz) + + MAXALIGN(heap_tuple_desc->natts * sizeof(Datum)))); + tc_item_heap_tuple_slot->base.tts_values = (Datum *) (((char *) (&tc_item_heap_tuple_slot->base)) + MAXALIGN(tc_item_heap_tuple_desc_basesz)); + tc_item_heap_tuple_slot->base.tts_flags = TTS_FLAG_FIXED; + TTSOpsHeapTuple.getsomeattrs(&tc_item_heap_tuple_slot->base, heap_tuple_desc->natts); + + pfree(tc_item_minimal_tuple); + return tc_item_heap_tuple_slot; +} + +void +cleanup_tuple_desc(TupleDesc desc) +{ + pfree(desc); +} + +void +cleanup_tuple_slot(TupleTableSlot *slot) +{ + cleanup_tuple_desc(slot->tts_tupleDescriptor); + pfree(DatumGetPointer(slot->tts_values[0])); + pfree(slot); +} + +void +cleanup_heap_tuple_slot(HeapTupleTableSlot *slot) +{ + cleanup_tuple_desc(slot->base.tts_tupleDescriptor); + pfree(slot); +} + +static void +init_exec_slices(ExecSlice * parent_exec_slice, + ExecSlice * child_exec_slice, + int32 parent_listen_port, + int32 child_listen_port, + pid_t parent_pid, + pid_t child_pid) +{ + + CdbProcess *parent_process; + CdbProcess *child_process; + + memset(parent_exec_slice, 0, sizeof(ExecSlice)); + memset(child_exec_slice, 0, sizeof(ExecSlice)); + + parent_process = (CdbProcess *) palloc0(sizeof(CdbProcess)); + child_process = (CdbProcess *) palloc0(sizeof(CdbProcess)); + + parent_process->type = T_CdbProcess; + parent_process->listenerAddr = "127.0.1.1"; + parent_process->listenerPort = parent_listen_port; + + parent_process->dbid = 1; + parent_process->pid = parent_pid; + parent_process->contentid = -1; + + child_process->type = T_CdbProcess; + child_process->listenerAddr = "127.0.1.1"; + child_process->listenerPort = child_listen_port; + + child_process->dbid = 2; + child_process->pid = child_pid; + child_process->contentid = 0; + + parent_exec_slice->sliceIndex = 0; + parent_exec_slice->rootIndex = 0; + parent_exec_slice->planNumSegments = 1; + parent_exec_slice->gangType = GANGTYPE_UNALLOCATED; + parent_exec_slice->segments = 0; + parent_exec_slice->primaryGang = 0; + + parent_exec_slice->primaryProcesses = lappend(parent_exec_slice->primaryProcesses, parent_process); + parent_exec_slice->processesMap = 0; + + parent_exec_slice->children = lappend_int(parent_exec_slice->children, 1); + parent_exec_slice->parentIndex = -1; + + child_exec_slice->sliceIndex = 1; + child_exec_slice->rootIndex = 0; + child_exec_slice->planNumSegments = 1; + child_exec_slice->gangType = GANGTYPE_PRIMARY_READER; + child_exec_slice->segments = 0; + child_exec_slice->primaryGang = 0; + + child_exec_slice->primaryProcesses = lappend(child_exec_slice->primaryProcesses, child_process); + child_exec_slice->processesMap = 0; + + child_exec_slice->children = NIL; + child_exec_slice->parentIndex = 0; +} + + +extern EState * +prepare_estate(const int local_slice, + const int32 parent_listen_port, + const int32 child_listen_port, + const pid_t parent_pid, + const pid_t child_pid) +{ + EState *estate; + SliceTable *slice_table; + + estate = (EState *) palloc0(sizeof(EState)); + slice_table = (SliceTable *) palloc0(sizeof(SliceTable)); + slice_table->type = T_SliceTable; + slice_table->localSlice = local_slice; + slice_table->numSlices = 2; + slice_table->slices = (ExecSlice *) palloc(sizeof(ExecSlice) * slice_table->numSlices); + slice_table->ic_instance_id = 1; + + estate->es_sliceTable = slice_table; + + init_exec_slices(&estate->es_sliceTable->slices[0], + &estate->es_sliceTable->slices[1], + parent_listen_port, + child_listen_port, + parent_pid, + child_pid); + + return estate; +} + +extern void +cleanup_estate(EState *estate) +{ + list_free(estate->es_sliceTable->slices[0].primaryProcesses); + list_free(estate->es_sliceTable->slices[1].primaryProcesses); + pfree(estate->es_sliceTable->slices); + pfree(estate->es_sliceTable); + pfree(estate); +} + + +TupleChunkListData * +prepare_chunk_list_raw_data(const char *raw_buffer, const size_t raw_buffer_len) +{ + TupleChunkListData *tc_list; + TupleChunkListItem item; + + tc_list = (TupleChunkListData *) palloc0(sizeof(TupleChunkListData)); + + tc_list->p_first = NULL; + tc_list->p_last = NULL; + tc_list->num_chunks = 0; + tc_list->serialized_data_length = 0; + tc_list->max_chunk_length = Gp_max_tuple_chunk_size; + + item = (TupleChunkListItem) + palloc(sizeof(TupleChunkListItemData) + Gp_max_tuple_chunk_size); + MemSetAligned(item, 0, sizeof(TupleChunkListItemData) + 4); + + SetChunkType(item->chunk_data, TC_WHOLE); + item->chunk_length = TUPLE_CHUNK_HEADER_SIZE; + appendChunkToTCList(tc_list, item); + + memcpy(item->chunk_data, &raw_buffer_len, 4); + memcpy(item->chunk_data + 4, raw_buffer, raw_buffer_len); + item->chunk_length = raw_buffer_len + 4; + + return tc_list; +} + +void +cleanup_chunk_list_raw_data(TupleChunkListData * tc_list) +{ + /* only set zero/one item in test */ + if (tc_list->p_first) + pfree(tc_list->p_first); + pfree(tc_list); +} + +bool +verify_chunk_list_raw_data(const TupleChunkListItem tc_item_raw, + const char *verify_buffer, const size_t verify_buffer_len) +{ + char *tc_item_raw_data = NULL; + int tc_item_raw_len = 0; + + tc_item_raw_data = (char *) GetChunkDataPtr(tc_item_raw) + TUPLE_CHUNK_HEADER_SIZE; + tc_item_raw_len = tc_item_raw->chunk_length - TUPLE_CHUNK_HEADER_SIZE; + + return tc_item_raw_len == verify_buffer_len && strncmp(tc_item_raw_data, verify_buffer, verify_buffer_len) == 0; +} + +void +client_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_pid) +{ + Gp_max_packet_size = 1500; + + GpIdentity.segindex = 0; + GpIdentity.dbid = 2; + + MyProcPid = getpid(); + + CurrentMotionIPCLayer = motion_ipc_layer; + interconnect_address = "127.0.1.1"; + + Gp_max_tuple_chunk_size = CurrentMotionIPCLayer->GetMaxTupleChunkSize(); + + /* udpifc */ + ic_htab_size = 2; + + Gp_interconnect_queue_depth = 800; + Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_timer_period = 1; + Gp_interconnect_timer_checking_period = 2; + InitializeLatchSupport(); + if (pipe(postmaster_alive_fds) != 0 || fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) != 0) + { + ereport(ERROR, "fail to init postmaster_alive_fds"); + } + + /* proxy */ + if (motion_ipc_layer->ic_type == INTERCONNECT_TYPE_PROXY) + { + assert(motion_ipc_layer->IcProxyServiceMain); + PostmasterPid = getpid(); + + gp_interconnect_proxy_addresses = "1:-1:127.0.1.1:7111,2:0:127.0.1.1:7112"; + *ic_proxy_pid = fork(); + assert(*ic_proxy_pid != -1); + if (*ic_proxy_pid == 0) + { + motion_ipc_layer->IcProxyServiceMain(); + } + } +} + +void +server_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_pid) +{ + Gp_max_packet_size = 1500; + + GpIdentity.segindex = -1; + GpIdentity.dbid = 1; + + MyProcPid = getpid(); + + CurrentMotionIPCLayer = motion_ipc_layer; + interconnect_address = "127.0.1.1"; + + Gp_max_tuple_chunk_size = CurrentMotionIPCLayer->GetMaxTupleChunkSize(); + + /* udpifc */ + ic_htab_size = 2; + + Gp_interconnect_queue_depth = 800; + Gp_interconnect_snd_queue_depth = 600; + Gp_interconnect_timer_period = 1; + Gp_interconnect_timer_checking_period = 2; + InitializeLatchSupport(); + if (pipe(postmaster_alive_fds) != 0 || fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) != 0) + { + ereport(ERROR, "fail to init postmaster_alive_fds"); + } + + /* proxy */ + if (motion_ipc_layer->ic_type == INTERCONNECT_TYPE_PROXY) + { + assert(motion_ipc_layer->IcProxyServiceMain); + PostmasterPid = getpid(); + + gp_interconnect_proxy_addresses = "1:-1:127.0.1.1:7111,2:0:127.0.1.1:7112"; + *ic_proxy_pid = fork(); + assert(*ic_proxy_pid != -1); + if (*ic_proxy_pid == 0) + { + motion_ipc_layer->IcProxyServiceMain(); + } + } +} + +void +shutdown_ic_proxy_if_need(pid_t ic_proxy_pid) +{ + if (ic_proxy_pid != -1) + { + /* fixme: uv_signal_start setting is not right */ + kill(ic_proxy_pid, SIGKILL); + } +} diff --git a/contrib/interconnect/test/ic_test_env.h b/contrib/interconnect/test/ic_test_env.h new file mode 100644 index 00000000000..7f828d49cbc --- /dev/null +++ b/contrib/interconnect/test/ic_test_env.h @@ -0,0 +1,55 @@ + +#include + +#include "postgres.h" +#include "ic_modules.h" +#include "postmaster/postmaster.h" + + +#ifndef IC_TEST_ENV_H +#define IC_TEST_ENV_H + +#define write_data_to_pipe(fd_pipe, any, type) \ + do { \ + size_t write_size = 0; \ + write_size = write(fd_pipe[1], (void *)&any, sizeof(type)); \ + } while(0); + +#define read_data_from_pipe(fd_pipe, any_ptr, type) \ + do { \ + size_t read_size = 0; \ + read_size = read(fd_pipe[0], any_ptr, sizeof(type)); \ + } while(0); + + +#define COLUMN_NUMS 2 +#define COLUMN_TEXT_LEN 100 +#define COLUMN_INT4_VALUE 0x123 +#define TUPLE_CHUNK_RAW_BUFFER_LEN 200 + +extern void generate_seq_buffer(char *buffer, const size_t buffer_len); +extern TupleDesc prepare_tuple_desc(); +extern TupleTableSlot *prepare_tuple_slot(); +extern HeapTupleTableSlot *get_tuple_slot_from_chunk_list_serialized_tuple(TupleChunkListItem tc_item); +extern void cleanup_tuple_desc(TupleDesc desc); +extern void cleanup_tuple_slot(TupleTableSlot *slot); +extern void cleanup_heap_tuple_slot(HeapTupleTableSlot *slot); + +extern TupleChunkListData * prepare_chunk_list_raw_data(const char *raw_buffer, const size_t raw_buffer_len); +extern void cleanup_chunk_list_raw_data(TupleChunkListData * tc_list); +extern bool verify_chunk_list_raw_data(const TupleChunkListItem tc_item_raw, + const char *verify_buffer, const size_t verify_buffer_len); + + +extern EState *prepare_estate(const int local_slice, + const int32 parent_listen_port, + const int32 child_listen_port, + const pid_t parent_pid, + const pid_t child_pid); +extern void cleanup_estate(EState *estate); + +extern void client_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_pid); +extern void server_side_global_var_init(MotionIPCLayer * motion_ipc_layer, pid_t *ic_proxy_pid); +extern void shutdown_ic_proxy_if_need(pid_t ic_proxy_pid); + +#endif // IC_TEST_ENV_H diff --git a/src/backend/cdb/motion/tupleremap.c b/src/backend/cdb/motion/tupleremap.c index 168e2ee1b08..7fe542c1eb1 100644 --- a/src/backend/cdb/motion/tupleremap.c +++ b/src/backend/cdb/motion/tupleremap.c @@ -265,12 +265,12 @@ TRHandleTypeLists(TupleRemapper *remapper, List *typelist) * typmods. */ Datum -TRRemapDatum(TupleRemapper *remapper, Oid typeid, Datum value) +TRRemapDatum(TupleRemapper *remapper, Oid type_id, Datum value) { TupleRemapInfo *remapinfo; bool changed; - remapinfo = BuildTupleRemapInfo(typeid, remapper->mycontext); + remapinfo = BuildTupleRemapInfo(type_id, remapper->mycontext); if (!remapinfo) return value; diff --git a/src/backend/cdb/motion/tupser.c b/src/backend/cdb/motion/tupser.c index ea141c0bfb2..df1ab4f2f2e 100644 --- a/src/backend/cdb/motion/tupser.c +++ b/src/backend/cdb/motion/tupser.c @@ -426,7 +426,12 @@ SerializeTuple(TupleTableSlot *slot, SerTupInfo *pSerInfo, struct directTranspor { Datum *values; - slot_getallattrs(slot); + /* it is same logic if we remove the `if case` + * because caller won't pass virtual tuple here except test. + */ + if (!TTS_IS_VIRTUAL(slot)) { + slot_getallattrs(slot); + } values = slot->tts_values; for (int i = 0; i < natts; i++) diff --git a/src/include/cdb/tupleremap.h b/src/include/cdb/tupleremap.h index 201273fde95..65427aec92c 100644 --- a/src/include/cdb/tupleremap.h +++ b/src/include/cdb/tupleremap.h @@ -22,6 +22,6 @@ extern TupleRemapper *CreateTupleRemapper(void); extern void DestroyTupleRemapper(TupleRemapper *remapper); extern MinimalTuple TRCheckAndRemap(TupleRemapper *remapper, TupleDesc tupledesc, MinimalTuple tuple); extern void TRHandleTypeLists(TupleRemapper *remapper, List *typelist); -extern Datum TRRemapDatum(TupleRemapper *remapper, Oid typeid, Datum value); +extern Datum TRRemapDatum(TupleRemapper *remapper, Oid type_id, Datum value); #endif /* TUPLEREMAP_H */