From 8e966f82e0c2fff0ac5725d79612c24a2415286b Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Wed, 21 Aug 2024 21:45:50 +0000 Subject: [PATCH 1/8] Populate client side traces local address via tcp connect kprobes Signed-off-by: Dom Del Nano --- .../socket_tracer/bcc_bpf/socket_trace.c | 80 +++++++-- .../socket_tracer/bcc_bpf_intf/socket_trace.h | 1 + .../socket_tracer/socket_trace_bpf_test.cc | 156 ++++++++++++++++++ .../socket_tracer/socket_trace_connector.cc | 4 + .../socket_tracer/testing/protocol_checkers.h | 18 ++ 5 files changed, 247 insertions(+), 12 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c index f03c05479ac..dbb7104cc09 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c @@ -81,6 +81,11 @@ BPF_HASH(active_accept_args_map, uint64_t, struct accept_args_t); // Key is {tgid, pid}. BPF_HASH(active_connect_args_map, uint64_t, struct connect_args_t); +// Map from thread to its sock* struct. This facilitates capturing +// the local address of a tcp socket during connect() syscalls. +// Key is {tgid, pid}. +BPF_HASH(tcp_connect_args_map, uint64_t, struct sock *); + // Map from thread to its ongoing write() syscall's input argument. // Tracks write() call from entry -> exit. // Key is {tgid, pid}. @@ -346,12 +351,8 @@ static __inline void update_traffic_class(struct conn_info_t* conn_info, ***********************************************************/ static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, - const struct socket* socket) { - // Use BPF_PROBE_READ_KERNEL_VAR since BCC cannot insert them as expected. - struct sock* sk = NULL; - BPF_PROBE_READ_KERNEL_VAR(sk, &socket->sk); - - struct sock_common* sk_common = &sk->__sk_common; + const struct sock* sk) { + const struct sock_common* sk_common = &sk->__sk_common; uint16_t family = -1; uint16_t lport = -1; uint16_t rport = -1; @@ -377,12 +378,12 @@ static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, } static __inline void submit_new_conn(struct pt_regs* ctx, uint32_t tgid, int32_t fd, - const struct sockaddr* addr, const struct socket* socket, + const struct sockaddr* addr, const struct sock* sock, enum endpoint_role_t role, enum source_function_t source_fn) { struct conn_info_t conn_info = {}; init_conn_info(tgid, fd, &conn_info); - if (socket != NULL) { - read_sockaddr_kernel(&conn_info, socket); + if (sock != NULL) { + read_sockaddr_kernel(&conn_info, sock); } else if (addr != NULL) { conn_info.raddr = *((union sockaddr_t*)addr); } @@ -585,6 +586,57 @@ int conn_cleanup_uprobe(struct pt_regs* ctx) { return 0; } +// These probes are used to capture the *sock struct during client side tracing +// of connect() syscalls. This is necessary to capture the socket's local address, +// which is not accessible via the connect() and later syscalls. +// +// This function requires that the function being probed receives a struct sock* as its +// first argument and that the active_connect_args_map is populated when this probe fires. +// This means the function being probed must be part of the connect() syscall path or similar +// syscall path. +// +// Using the struct sock* for capturing a socket's local address only works for TCP sockets. +// The equivalent UDP functions (udp_v4_connect, udp_v6_connect and upd_sendmsg) always receive a sock +// struct with a 0.0.0.0 or ::1 local address. This is deemed acceptable since our local address +// population for server side tracing relies on accept/accept4, which only applies for TCP. +// +// TODO(ddelnano): The current implementation works for mid stream TCP connections despite +// my intuition that tcp_v4_connect and tcp_v6_connect should not be called mid stream. +// If mid stream connections have a missing local address in the future, we should probe +// tcp_sendmsg in addition to the current probes. This will require discerning between the +// UDP and TCP case in process_implicit_conn. +// +// int tcp_v4_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len); +// static int tcp_v6_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len); +int probe_entry_populate_active_connect_sock(struct pt_regs* ctx) { + uint64_t id = bpf_get_current_pid_tgid(); + + const struct connect_args_t* connect_args = active_connect_args_map.lookup(&id); + if (connect_args == NULL) { + return 0; + } + struct sock* sk = (struct sock *)PT_REGS_PARM1(ctx); + tcp_connect_args_map.update(&id, &sk); + + return 0; +} + +int probe_ret_populate_active_connect_sock(struct pt_regs* ctx) { + uint64_t id = bpf_get_current_pid_tgid(); + + struct sock** sk = tcp_connect_args_map.lookup(&id); + if (sk == NULL) { + return 0; + } + struct connect_args_t* connect_args = active_connect_args_map.lookup(&id); + if (connect_args != NULL) { + connect_args->connect_sock = *sk; + } + + tcp_connect_args_map.delete(&id); + return 0; +} + /*********************************************************** * BPF syscall processing functions ***********************************************************/ @@ -629,7 +681,7 @@ static __inline void process_syscall_connect(struct pt_regs* ctx, uint64_t id, return; } - submit_new_conn(ctx, tgid, args->fd, args->addr, /*socket*/ NULL, kRoleClient, kSyscallConnect); + submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleClient, kSyscallConnect); } static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id, @@ -645,7 +697,11 @@ static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id, return; } - submit_new_conn(ctx, tgid, ret_fd, args->addr, args->sock_alloc_socket, kRoleServer, + const struct sock* sk = NULL; + if (args->sock_alloc_socket != NULL) { + BPF_PROBE_READ_KERNEL_VAR(sk, &args->sock_alloc_socket->sk); + } + submit_new_conn(ctx, tgid, ret_fd, args->addr, sk, kRoleServer, kSyscallAccept); } @@ -690,7 +746,7 @@ static __inline void process_implicit_conn(struct pt_regs* ctx, uint64_t id, return; } - submit_new_conn(ctx, tgid, args->fd, args->addr, /*socket*/ NULL, kRoleUnknown, source_fn); + submit_new_conn(ctx, tgid, args->fd, args->addr, /*sock*/ NULL, kRoleUnknown, source_fn); } static __inline bool should_send_data(uint32_t tgid, uint64_t conn_disabled_tsid, diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.h b/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.h index a66e30d7e62..c8339fd412d 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.h +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.h @@ -263,6 +263,7 @@ struct socket_control_event_t { struct connect_args_t { const struct sockaddr* addr; + const struct sock* connect_sock; int32_t fd; }; diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc index fde841643a4..a57e9c5dc77 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc @@ -41,12 +41,15 @@ #include "src/stirling/source_connectors/socket_tracer/testing/client_server_system.h" #include "src/stirling/source_connectors/socket_tracer/testing/socket_trace_bpf_test_fixture.h" #include "src/stirling/testing/common.h" +#include "src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h" namespace px { namespace stirling { using ::px::stirling::testing::FindRecordsMatchingPID; using ::px::stirling::testing::RecordBatchSizeIs; +using ::px::stirling::testing::GetLocalAddrs; +using ::px::stirling::testing::GetLocalPorts; using ::px::system::TCPSocket; using ::px::system::UDPSocket; using ::px::system::UnixSocket; @@ -747,6 +750,159 @@ TEST_F(NullRemoteAddrTest, IPv6Accept4WithNullRemoteAddr) { EXPECT_EQ(records[kHTTPRemotePortIdx]->Get(0), port); } +using LocalAddrTest = testing::SocketTraceBPFTestFixture; + +TEST_F(LocalAddrTest, IPv4ConnectPopulatesLocalAddr) { + StartTransferDataThread(); + + TCPSocket client; + TCPSocket server; + + std::atomic server_ready = true; + + std::thread server_thread([&server, &server_ready]() { + server.BindAndListen(); + server_ready = true; + auto conn = server.Accept(/* populate_remote_addr */ true); + + std::string data; + + conn->Read(&data); + conn->Write(kHTTPRespMsg1); + }); + + // Wait for server thread to start listening. + while (!server_ready) { + } + // After server_ready, server.Accept() needs to enter the accepting state, before the client + // connection can succeed below. We don't have a simple and robust way to signal that from inside + // the server thread, so we just use sleep to avoid the race condition. + std::this_thread::sleep_for(std::chrono::seconds(1)); + + std::thread client_thread([&client, &server]() { + client.Connect(server); + + std::string data; + + client.Write(kHTTPReqMsg1); + client.Read(&data); + }); + + server_thread.join(); + client_thread.join(); + + // Get the remote port seen by server from client's local port. + struct sockaddr_in client_sockaddr = {}; + socklen_t client_sockaddr_len = sizeof(client_sockaddr); + struct sockaddr* client_sockaddr_ptr = reinterpret_cast(&client_sockaddr); + ASSERT_EQ(getsockname(client.sockfd(), client_sockaddr_ptr, &client_sockaddr_len), 0); + + // Close after getting the sockaddr from fd, otherwise getsockname() wont work. + client.Close(); + server.Close(); + + StopTransferDataThread(); + + std::vector tablets = ConsumeRecords(kHTTPTableNum); + ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets); + + std::vector indices = testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); + ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices); + + ASSERT_THAT(records, RecordBatchSizeIs(2)); + + // Make sure that the socket info resolution works. + ASSERT_OK_AND_ASSIGN(std::string remote_addr, IPv4AddrToString(client_sockaddr.sin_addr)); + EXPECT_THAT(GetLocalAddrs(records, kHTTPLocalAddrIdx, indices), Contains("127.0.0.1").Times(2)); + EXPECT_EQ(remote_addr, "127.0.0.1"); + + bool found_port = false; + uint16_t port = ntohs(client_sockaddr.sin_port); + for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { + // TODO(ddelnano): Determine why the local_addr column is storing the port + // in network byte order. + LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin_port << " and ntohs " << port; + if (lport == client_sockaddr.sin_port) { + found_port = true; + break; + } + } + EXPECT_TRUE(found_port); +} + +TEST_F(LocalAddrTest, IPv6ConnectPopulatesLocalAddr) { + StartTransferDataThread(); + + TCPSocket client(AF_INET6); + TCPSocket server(AF_INET6); + + std::atomic server_ready = false; + + std::thread server_thread([&server, &server_ready]() { + server.BindAndListen(); + server_ready = true; + auto conn = server.Accept(/* populate_remote_addr */ false); + + std::string data; + + conn->Read(&data); + conn->Write(kHTTPRespMsg1); + }); + + while (!server_ready) { + } + + std::thread client_thread([&client, &server]() { + client.Connect(server); + + std::string data; + + client.Write(kHTTPReqMsg1); + client.Read(&data); + }); + + server_thread.join(); + client_thread.join(); + + // Get the remote port seen by server from client's local port. + struct sockaddr_in6 client_sockaddr = {}; + socklen_t client_sockaddr_len = sizeof(client_sockaddr); + struct sockaddr* client_sockaddr_ptr = reinterpret_cast(&client_sockaddr); + ASSERT_EQ(getsockname(client.sockfd(), client_sockaddr_ptr, &client_sockaddr_len), 0); + + // Close after getting the sockaddr from fd, otherwise getsockname() wont work. + client.Close(); + server.Close(); + + StopTransferDataThread(); + + std::vector tablets = ConsumeRecords(kHTTPTableNum); + ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets); + + std::vector indices = testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); + ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices); + + ASSERT_THAT(records, RecordBatchSizeIs(2)); + + // Make sure that the socket info resolution works. + ASSERT_OK_AND_ASSIGN(std::string remote_addr, IPv6AddrToString(client_sockaddr.sin6_addr)); + EXPECT_THAT(GetLocalAddrs(records, kHTTPLocalAddrIdx, indices), Contains("::1").Times(2)); + EXPECT_EQ(remote_addr, "::1"); + + bool found_port = false; + uint16_t port = ntohs(client_sockaddr.sin6_port); + for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { + // TODO(ddelnano): Determine why the local_addr column is storing the port + // in network byte order. + LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin6_port << " and ntohs " << port; + if (lport == client_sockaddr.sin6_port) { + found_port = true; + break; + } + } + EXPECT_TRUE(found_port); +} + // Run a UDP-based client-server system. class UDPSocketTraceBPFTest : public SocketTraceBPFTest { protected: diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 76c20b5a1a5..4321e091aac 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -342,6 +342,10 @@ const auto kProbeSpecs = MakeArray({ {"close", ProbeType::kReturn, "syscall__probe_ret_close"}, {"mmap", ProbeType::kEntry, "syscall__probe_entry_mmap"}, {"sock_alloc", ProbeType::kReturn, "probe_ret_sock_alloc", /*is_syscall*/ false}, + {"tcp_v4_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", /*is_syscall*/ false}, + {"tcp_v4_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", /*is_syscall*/ false}, + {"tcp_v6_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", /*is_syscall*/ false}, + {"tcp_v6_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", /*is_syscall*/ false}, {"security_socket_sendmsg", ProbeType::kEntry, "probe_entry_socket_sendmsg", /*is_syscall*/ false, /* is_optional */ false, std::make_shared(bpf_tools::KProbeSpec{ diff --git a/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h b/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h index 0cb66f59a14..20dae3a56ba 100644 --- a/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h +++ b/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h @@ -135,6 +135,24 @@ inline std::vector GetEncrypted(const types::ColumnWrapperRecordBatch& rb, return encrypted; } +inline std::vector GetLocalAddrs(const types::ColumnWrapperRecordBatch& rb, + const int local_addr_idx, const std::vector& indices) { + std::vector laddrs; + for (size_t idx : indices) { + laddrs.push_back(rb[local_addr_idx]->Get(idx)); + } + return laddrs; +} + +inline std::vector GetLocalPorts(const types::ColumnWrapperRecordBatch& rb, + const int local_port_idx, const std::vector& indices) { + std::vector ports; + for (size_t idx : indices) { + ports.push_back(rb[local_port_idx]->Get(idx).val); + } + return ports; +} + inline std::vector GetRemotePorts(const types::ColumnWrapperRecordBatch& rb, const std::vector& indices) { std::vector addrs; From 3bd701359591dc9f1e344e7eff00c2cd50aecdcf Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Fri, 23 Aug 2024 18:39:06 +0000 Subject: [PATCH 2/8] Fix linting issues Signed-off-by: Dom Del Nano --- src/stirling/binaries/udp_client.cc | 42 +++++++++++++++++++ .../socket_tracer/bcc_bpf/socket_trace.c | 19 ++++----- .../socket_tracer/socket_trace_bpf_test.cc | 16 ++++--- .../socket_tracer/socket_trace_connector.cc | 12 ++++-- .../socket_tracer/testing/protocol_checkers.h | 6 ++- 5 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 src/stirling/binaries/udp_client.cc diff --git a/src/stirling/binaries/udp_client.cc b/src/stirling/binaries/udp_client.cc new file mode 100644 index 00000000000..6d0a4986212 --- /dev/null +++ b/src/stirling/binaries/udp_client.cc @@ -0,0 +1,42 @@ +/* + * Copyright 2018- The Pixie Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "src/common/base/base.h" +#include "src/common/base/env.h" +#include "src/common/system/udp_socket.h" + +using px::system::UDPSocket; + +int main(int argc, char** argv) { + px::EnvironmentGuard env_guard(&argc, argv); + + if (argc < 3) { + LOG(FATAL) << absl::Substitute("Expected server address and port to be provided, instead received $0", *argv); + } + std::string_view msg = "Hello, World!"; + UDPSocket client; + + sockaddr_in server_addr; + int status = inet_pton(AF_INET, argv[1], &server_addr.sin_addr); + if (status != 1) { + LOG(FATAL) << absl::Substitute("Failed to parse server address $0", argv[1]); + } + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(atoi(argv[2])); + client.SendTo(msg, server_addr, 0); +} diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c index dbb7104cc09..9e091a8fb1f 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c @@ -84,7 +84,7 @@ BPF_HASH(active_connect_args_map, uint64_t, struct connect_args_t); // Map from thread to its sock* struct. This facilitates capturing // the local address of a tcp socket during connect() syscalls. // Key is {tgid, pid}. -BPF_HASH(tcp_connect_args_map, uint64_t, struct sock *); +BPF_HASH(tcp_connect_args_map, uint64_t, struct sock*); // Map from thread to its ongoing write() syscall's input argument. // Tracks write() call from entry -> exit. @@ -350,8 +350,7 @@ static __inline void update_traffic_class(struct conn_info_t* conn_info, * Perf submit functions ***********************************************************/ -static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, - const struct sock* sk) { +static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, const struct sock* sk) { const struct sock_common* sk_common = &sk->__sk_common; uint16_t family = -1; uint16_t lport = -1; @@ -596,9 +595,9 @@ int conn_cleanup_uprobe(struct pt_regs* ctx) { // syscall path. // // Using the struct sock* for capturing a socket's local address only works for TCP sockets. -// The equivalent UDP functions (udp_v4_connect, udp_v6_connect and upd_sendmsg) always receive a sock -// struct with a 0.0.0.0 or ::1 local address. This is deemed acceptable since our local address -// population for server side tracing relies on accept/accept4, which only applies for TCP. +// The equivalent UDP functions (udp_v4_connect, udp_v6_connect and upd_sendmsg) always receive a +// sock struct with a 0.0.0.0 or ::1 local address. This is deemed acceptable since our local +// address population for server side tracing relies on accept/accept4, which only applies for TCP. // // TODO(ddelnano): The current implementation works for mid stream TCP connections despite // my intuition that tcp_v4_connect and tcp_v6_connect should not be called mid stream. @@ -615,7 +614,7 @@ int probe_entry_populate_active_connect_sock(struct pt_regs* ctx) { if (connect_args == NULL) { return 0; } - struct sock* sk = (struct sock *)PT_REGS_PARM1(ctx); + struct sock* sk = (struct sock*)PT_REGS_PARM1(ctx); tcp_connect_args_map.update(&id, &sk); return 0; @@ -681,7 +680,8 @@ static __inline void process_syscall_connect(struct pt_regs* ctx, uint64_t id, return; } - submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleClient, kSyscallConnect); + submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleClient, + kSyscallConnect); } static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id, @@ -701,8 +701,7 @@ static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id, if (args->sock_alloc_socket != NULL) { BPF_PROBE_READ_KERNEL_VAR(sk, &args->sock_alloc_socket->sk); } - submit_new_conn(ctx, tgid, ret_fd, args->addr, sk, kRoleServer, - kSyscallAccept); + submit_new_conn(ctx, tgid, ret_fd, args->addr, sk, kRoleServer, kSyscallAccept); } // TODO(oazizi): This is badly broken (but better than before). diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc index a57e9c5dc77..24ade3e6552 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc @@ -39,17 +39,17 @@ #include "src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.hpp" #include "src/stirling/source_connectors/socket_tracer/socket_trace_connector.h" #include "src/stirling/source_connectors/socket_tracer/testing/client_server_system.h" +#include "src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h" #include "src/stirling/source_connectors/socket_tracer/testing/socket_trace_bpf_test_fixture.h" #include "src/stirling/testing/common.h" -#include "src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h" namespace px { namespace stirling { using ::px::stirling::testing::FindRecordsMatchingPID; -using ::px::stirling::testing::RecordBatchSizeIs; using ::px::stirling::testing::GetLocalAddrs; using ::px::stirling::testing::GetLocalPorts; +using ::px::stirling::testing::RecordBatchSizeIs; using ::px::system::TCPSocket; using ::px::system::UDPSocket; using ::px::system::UnixSocket; @@ -806,7 +806,8 @@ TEST_F(LocalAddrTest, IPv4ConnectPopulatesLocalAddr) { std::vector tablets = ConsumeRecords(kHTTPTableNum); ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets); - std::vector indices = testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); + std::vector indices = + testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices); ASSERT_THAT(records, RecordBatchSizeIs(2)); @@ -821,7 +822,8 @@ TEST_F(LocalAddrTest, IPv4ConnectPopulatesLocalAddr) { for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { // TODO(ddelnano): Determine why the local_addr column is storing the port // in network byte order. - LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin_port << " and ntohs " << port; + LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin_port + << " and ntohs " << port; if (lport == client_sockaddr.sin_port) { found_port = true; break; @@ -879,7 +881,8 @@ TEST_F(LocalAddrTest, IPv6ConnectPopulatesLocalAddr) { std::vector tablets = ConsumeRecords(kHTTPTableNum); ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets); - std::vector indices = testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); + std::vector indices = + testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid()); ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices); ASSERT_THAT(records, RecordBatchSizeIs(2)); @@ -894,7 +897,8 @@ TEST_F(LocalAddrTest, IPv6ConnectPopulatesLocalAddr) { for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { // TODO(ddelnano): Determine why the local_addr column is storing the port // in network byte order. - LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin6_port << " and ntohs " << port; + LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin6_port + << " and ntohs " << port; if (lport == client_sockaddr.sin6_port) { found_port = true; break; diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 4321e091aac..755456f7117 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -342,10 +342,14 @@ const auto kProbeSpecs = MakeArray({ {"close", ProbeType::kReturn, "syscall__probe_ret_close"}, {"mmap", ProbeType::kEntry, "syscall__probe_entry_mmap"}, {"sock_alloc", ProbeType::kReturn, "probe_ret_sock_alloc", /*is_syscall*/ false}, - {"tcp_v4_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", /*is_syscall*/ false}, - {"tcp_v4_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", /*is_syscall*/ false}, - {"tcp_v6_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", /*is_syscall*/ false}, - {"tcp_v6_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", /*is_syscall*/ false}, + {"tcp_v4_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", + /*is_syscall*/ false}, + {"tcp_v4_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", + /*is_syscall*/ false}, + {"tcp_v6_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", + /*is_syscall*/ false}, + {"tcp_v6_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", + /*is_syscall*/ false}, {"security_socket_sendmsg", ProbeType::kEntry, "probe_entry_socket_sendmsg", /*is_syscall*/ false, /* is_optional */ false, std::make_shared(bpf_tools::KProbeSpec{ diff --git a/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h b/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h index 20dae3a56ba..207eb68e89b 100644 --- a/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h +++ b/src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h @@ -136,7 +136,8 @@ inline std::vector GetEncrypted(const types::ColumnWrapperRecordBatch& rb, } inline std::vector GetLocalAddrs(const types::ColumnWrapperRecordBatch& rb, - const int local_addr_idx, const std::vector& indices) { + const int local_addr_idx, + const std::vector& indices) { std::vector laddrs; for (size_t idx : indices) { laddrs.push_back(rb[local_addr_idx]->Get(idx)); @@ -145,7 +146,8 @@ inline std::vector GetLocalAddrs(const types::ColumnWrapperRecordBa } inline std::vector GetLocalPorts(const types::ColumnWrapperRecordBatch& rb, - const int local_port_idx, const std::vector& indices) { + const int local_port_idx, + const std::vector& indices) { std::vector ports; for (size_t idx : indices) { ports.push_back(rb[local_port_idx]->Get(idx).val); From a01138e792bbc5b74cba18ab8573f84614de6807 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Fri, 23 Aug 2024 18:39:42 +0000 Subject: [PATCH 3/8] Remove unrelated file Signed-off-by: Dom Del Nano --- src/stirling/binaries/udp_client.cc | 42 ----------------------------- 1 file changed, 42 deletions(-) delete mode 100644 src/stirling/binaries/udp_client.cc diff --git a/src/stirling/binaries/udp_client.cc b/src/stirling/binaries/udp_client.cc deleted file mode 100644 index 6d0a4986212..00000000000 --- a/src/stirling/binaries/udp_client.cc +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright 2018- The Pixie Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include "src/common/base/base.h" -#include "src/common/base/env.h" -#include "src/common/system/udp_socket.h" - -using px::system::UDPSocket; - -int main(int argc, char** argv) { - px::EnvironmentGuard env_guard(&argc, argv); - - if (argc < 3) { - LOG(FATAL) << absl::Substitute("Expected server address and port to be provided, instead received $0", *argv); - } - std::string_view msg = "Hello, World!"; - UDPSocket client; - - sockaddr_in server_addr; - int status = inet_pton(AF_INET, argv[1], &server_addr.sin_addr); - if (status != 1) { - LOG(FATAL) << absl::Substitute("Failed to parse server address $0", argv[1]); - } - server_addr.sin_family = AF_INET; - server_addr.sin_port = htons(atoi(argv[2])); - client.SendTo(msg, server_addr, 0); -} From f3548effe505c58d77ff679dc0a1a0869fcbe696 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Fri, 23 Aug 2024 21:56:39 +0000 Subject: [PATCH 4/8] Add probes on tcp_sendmsg to handle mid stream connections Signed-off-by: Dom Del Nano --- .../socket_tracer/bcc_bpf/socket_trace.c | 9 ++------- .../socket_tracer/socket_trace_connector.cc | 4 ++++ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c index 9e091a8fb1f..19534cf2d61 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c @@ -599,14 +599,9 @@ int conn_cleanup_uprobe(struct pt_regs* ctx) { // sock struct with a 0.0.0.0 or ::1 local address. This is deemed acceptable since our local // address population for server side tracing relies on accept/accept4, which only applies for TCP. // -// TODO(ddelnano): The current implementation works for mid stream TCP connections despite -// my intuition that tcp_v4_connect and tcp_v6_connect should not be called mid stream. -// If mid stream connections have a missing local address in the future, we should probe -// tcp_sendmsg in addition to the current probes. This will require discerning between the -// UDP and TCP case in process_implicit_conn. -// // int tcp_v4_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len); // static int tcp_v6_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len); +// int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size); int probe_entry_populate_active_connect_sock(struct pt_regs* ctx) { uint64_t id = bpf_get_current_pid_tgid(); @@ -745,7 +740,7 @@ static __inline void process_implicit_conn(struct pt_regs* ctx, uint64_t id, return; } - submit_new_conn(ctx, tgid, args->fd, args->addr, /*sock*/ NULL, kRoleUnknown, source_fn); + submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleUnknown, source_fn); } static __inline bool should_send_data(uint32_t tgid, uint64_t conn_disabled_tsid, diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 755456f7117..6011d133e0b 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -350,6 +350,10 @@ const auto kProbeSpecs = MakeArray({ /*is_syscall*/ false}, {"tcp_v6_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", /*is_syscall*/ false}, + {"tcp_sendmsg", ProbeType::kEntry, "probe_entry_populate_active_connect_sock", + /*is_syscall*/ false}, + {"tcp_sendmsg", ProbeType::kReturn, "probe_ret_populate_active_connect_sock", + /*is_syscall*/ false}, {"security_socket_sendmsg", ProbeType::kEntry, "probe_entry_socket_sendmsg", /*is_syscall*/ false, /* is_optional */ false, std::make_shared(bpf_tools::KProbeSpec{ From 7a5cd84a9068808bab1ed71c7ad817bce8e845fb Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Fri, 23 Aug 2024 21:57:12 +0000 Subject: [PATCH 5/8] #ci:bpf-build-all-kernels Signed-off-by: Dom Del Nano From 796013539743e82510ddf329d2133d38936c1a5b Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Tue, 27 Aug 2024 16:04:39 +0000 Subject: [PATCH 6/8] Reduce iovec bpf loop limit from 42 to 41 to fix BPF instruction count for 4.14 kernels Signed-off-by: Dom Del Nano --- .../source_connectors/socket_tracer/socket_trace_connector.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc index 6011d133e0b..c4c273b96ba 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc @@ -176,7 +176,7 @@ DEFINE_bool( stirling_debug_tls_sources, gflags::BoolFromEnv("PX_DEBUG_TLS_SOURCES", false), "If true, stirling will add additional prometheus metrics regarding the traced tls sources"); -DEFINE_uint32(stirling_bpf_loop_limit, 42, +DEFINE_uint32(stirling_bpf_loop_limit, 41, "The maximum number of iovecs to capture for syscalls. " "Set conservatively for older kernels by default to keep the instruction count below " "BPF's limit for version 4 kernels (4096 per probe)."); From 636d15fb11918968b47258e3f3e6c3a323722768 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Wed, 4 Sep 2024 05:04:46 +0000 Subject: [PATCH 7/8] Properly convert kernel data structure that uses host byte order for local port (skc_num field) Signed-off-by: Dom Del Nano --- .../source_connectors/socket_tracer/bcc_bpf/socket_trace.c | 3 +++ .../socket_tracer/socket_trace_bpf_test.cc | 6 +----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c index 19534cf2d61..68a23dda923 100644 --- a/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c +++ b/src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c @@ -358,6 +358,9 @@ static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, const s BPF_PROBE_READ_KERNEL_VAR(family, &sk_common->skc_family); BPF_PROBE_READ_KERNEL_VAR(lport, &sk_common->skc_num); + // skc_num is stored in host byte order. The rest of our user space processing + // assumes network byte order so convert it here. + lport = htons(lport); BPF_PROBE_READ_KERNEL_VAR(rport, &sk_common->skc_dport); conn_info->laddr.sa.sa_family = family; diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc index 24ade3e6552..b08381318f1 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc @@ -820,11 +820,7 @@ TEST_F(LocalAddrTest, IPv4ConnectPopulatesLocalAddr) { bool found_port = false; uint16_t port = ntohs(client_sockaddr.sin_port); for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { - // TODO(ddelnano): Determine why the local_addr column is storing the port - // in network byte order. - LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin_port - << " and ntohs " << port; - if (lport == client_sockaddr.sin_port) { + if (lport == port) { found_port = true; break; } From 945b211b73c4596cad062b14bfc0bfef44a4e152 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Wed, 4 Sep 2024 06:09:08 +0000 Subject: [PATCH 8/8] Ensure ipv6 test is cleaned up after endianess fix was discovered Signed-off-by: Dom Del Nano --- .../socket_tracer/socket_trace_bpf_test.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc index b08381318f1..8de89aa4b22 100644 --- a/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc +++ b/src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc @@ -891,11 +891,7 @@ TEST_F(LocalAddrTest, IPv6ConnectPopulatesLocalAddr) { bool found_port = false; uint16_t port = ntohs(client_sockaddr.sin6_port); for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) { - // TODO(ddelnano): Determine why the local_addr column is storing the port - // in network byte order. - LOG(INFO) << "Local port: " << lport << " and pre ntohs " << client_sockaddr.sin6_port - << " and ntohs " << port; - if (lport == client_sockaddr.sin6_port) { + if (lport == port) { found_port = true; break; }