Skip to content
1 change: 1 addition & 0 deletions include/envoy/singleton/manager.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <memory>
#include <string>

#include "envoy/common/pure.h"
#include "envoy/singleton/instance.h"
Expand Down
50 changes: 39 additions & 11 deletions source/common/network/address_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ InstanceConstSharedPtr peerAddressFromFd(int fd) {
if (rc != 0) {
throw EnvoyException(fmt::format("getpeername failed for '{}': {}", fd, strerror(errno)));
}
#ifdef __APPLE__
if (ss_len == sizeof(sockaddr) && ss.ss_family == AF_UNIX) {
#else
if (ss_len == sizeof(sa_family_t) && ss.ss_family == AF_UNIX) {
#endif
// For Unix domain sockets, can't find out the peer name, but it should match our own
// name for the socket (i.e. the path should match, barring any namespace or other
// mechanisms to hide things, of which there are many).
Expand All @@ -77,14 +81,42 @@ InstanceConstSharedPtr peerAddressFromFd(int fd) {
return addressFromSockAddr(ss, ss_len);
}

int InstanceBase::flagsFromSocketType(SocketType type) const {
int InstanceBase::socketFromSocketType(SocketType socketType) const {
#if defined(__APPLE__)
int flags = 0;
#else
int flags = SOCK_NONBLOCK;
if (type == SocketType::Stream) {
#endif

if (socketType == SocketType::Stream) {
flags |= SOCK_STREAM;
} else {
flags |= SOCK_DGRAM;
}
return flags;

int domain;
if (type() == Type::Ip) {
IpVersion version = ip()->version();
if (version == IpVersion::v6) {
domain = AF_INET6;
} else {
ASSERT(version == IpVersion::v4);
domain = AF_INET;
}
} else {
ASSERT(type() == Type::Pipe);
domain = AF_UNIX;
}

int fd = ::socket(domain, flags, 0);
RELEASE_ASSERT(fd != -1);

#ifdef __APPLE__
// Cannot set SOCK_NONBLOCK as a ::socket flag.
RELEASE_ASSERT(fcntl(fd, F_SETFL, O_NONBLOCK) != -1);
#endif

return fd;
}

Ipv4Instance::Ipv4Instance(const sockaddr_in* address) : InstanceBase(Type::Ip) {
Expand Down Expand Up @@ -129,9 +161,7 @@ int Ipv4Instance::connect(int fd) const {
sizeof(ip_.ipv4_.address_));
}

int Ipv4Instance::socket(SocketType type) const {
return ::socket(AF_INET, flagsFromSocketType(type), 0);
}
int Ipv4Instance::socket(SocketType type) const { return socketFromSocketType(type); }

std::array<uint8_t, 16> Ipv6Instance::Ipv6Helper::address() const {
std::array<uint8_t, 16> result;
Expand Down Expand Up @@ -186,8 +216,8 @@ int Ipv6Instance::connect(int fd) const {
}

int Ipv6Instance::socket(SocketType type) const {
const int fd = ::socket(AF_INET6, flagsFromSocketType(type), 0);
RELEASE_ASSERT(fd != -1);
const int fd = socketFromSocketType(type);

// Setting IPV6_V6ONLY resticts the IPv6 socket to IPv6 connections only.
const int v6only = 1;
RELEASE_ASSERT(::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &v6only, sizeof(v6only)) != -1);
Expand Down Expand Up @@ -217,9 +247,7 @@ int PipeInstance::connect(int fd) const {
return ::connect(fd, reinterpret_cast<const sockaddr*>(&address_), sizeof(address_));
}

int PipeInstance::socket(SocketType type) const {
return ::socket(AF_UNIX, flagsFromSocketType(type), 0);
}
int PipeInstance::socket(SocketType type) const { return socketFromSocketType(type); }

} // namespace Address
} // namespace Network
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/address_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class InstanceBase : public Instance {

protected:
InstanceBase(Type type) : type_(type) {}
int flagsFromSocketType(SocketType type) const;
int socketFromSocketType(SocketType type) const;

std::string friendly_name_;

Expand Down
24 changes: 20 additions & 4 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ std::atomic<uint64_t> ConnectionImpl::next_global_id_;
ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
Address::InstanceConstSharedPtr remote_address,
Address::InstanceConstSharedPtr local_address,
bool using_original_dst)
bool using_original_dst, bool connected)
: filter_manager_(*this, *this), remote_address_(remote_address), local_address_(local_address),
read_buffer_(dispatcher.getBufferFactory().create()),
write_buffer_(Buffer::InstancePtr{dispatcher.getBufferFactory().create()},
Expand All @@ -70,6 +70,10 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
// condition and just crash.
RELEASE_ASSERT(fd_ != -1);

if (!connected) {
state_ |= InternalState::Connecting;
}

// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
Expand Down Expand Up @@ -179,6 +183,14 @@ void ConnectionImpl::noDelay(bool enable) {
// Set NODELAY
int new_value = enable;
rc = setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &new_value, sizeof(new_value));
#ifdef __APPLE__
if (-1 == rc && errno == EINVAL) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this happen? Is this needed with the connected fixes?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This still happens and I think it's basically the same reason.

We call ConnectionImpl::connect() and then immediately ConnectionImpl::noDelay(true), but the socket isn't connected yet, so OS X returns an error.

I added some code to read the value of TCP_NODELAY and it does get set despite the error.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way to get rid of this #ifdef is to only set TCP_NODELAY after the socket is connected. That means either changing all the call sites to invoke this from a callback or else tracking the desired setting and applying in onWriteReady.

Copy link
Copy Markdown
Member

@mattklein123 mattklein123 Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an OS X bug to me, but I digress. Hmm. In this case I would be in favor of just putting in more of a comment on why this is done/breaks and maybe a TODO?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also file an issue on this and reference it here. I'm imagining some point in the future where we're looking at performance on OS X, and the TCP_NODELAY fail ends up being due to some reason that does not actually set the option. We will then have non-deterministic performance characteristics. This is only slightly hypothetical - we've recently been chasing down a TCP proxy overhead issue that relates somewhat to TCP_NODELAY on envoy-users.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #1446.

// Sometimes occurs when the connection is not yet fully formed. Empirically, TCP_NODELAY is
// enabled despite this result.
return;
}
#endif

RELEASE_ASSERT(0 == rc);
UNREFERENCED_PARAMETER(rc);
}
Expand Down Expand Up @@ -273,6 +285,9 @@ void ConnectionImpl::write(Buffer::Instance& data) {
// to SSL_write(). That code will have to change if we ever copy here.
write_buffer_.move(data);

// Activating a write event before the socket is connected has the side-effect of tricking
// doWriteReady into thinking the socket is connected. On OS X, the underlying write may fail
// with a connection error if a call to write(2) occurs before the connection is completed.
if (!(state_ & InternalState::Connecting)) {
file_event_->activate(Event::FileReadyType::Write);
}
Expand Down Expand Up @@ -476,15 +491,16 @@ void ConnectionImpl::doConnect() {
int rc = remote_address_->connect(fd_);
if (rc == 0) {
// write will become ready.
state_ |= InternalState::Connecting;
ASSERT(state_ & InternalState::Connecting);
} else {
ASSERT(rc == -1);
if (errno == EINPROGRESS) {
state_ |= InternalState::Connecting;
ASSERT(state_ & InternalState::Connecting);
ENVOY_CONN_LOG(debug, "connection in progress", *this);
} else {
// read/write will become ready.
state_ |= InternalState::ImmediateConnectionError;
state_ &= ~InternalState::Connecting;
ENVOY_CONN_LOG(debug, "immediate connection error: {}", *this, errno);
}
}
Expand Down Expand Up @@ -518,7 +534,7 @@ void ConnectionImpl::updateWriteBufferStats(uint64_t num_written, uint64_t new_s
ClientConnectionImpl::ClientConnectionImpl(Event::DispatcherImpl& dispatcher,
Address::InstanceConstSharedPtr address)
: ConnectionImpl(dispatcher, address->socket(Address::SocketType::Stream), address,
getNullLocalAddress(*address), false) {}
getNullLocalAddress(*address), false, false) {}

} // namespace Network
} // namespace Envoy
3 changes: 2 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ConnectionImpl : public virtual Connection,
public:
ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
Address::InstanceConstSharedPtr remote_address,
Address::InstanceConstSharedPtr local_address, bool using_original_dst);
Address::InstanceConstSharedPtr local_address, bool using_original_dst,
bool connected);

~ConnectionImpl();

Expand Down
8 changes: 4 additions & 4 deletions source/common/network/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,17 @@ void ListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remote_
Address::InstanceConstSharedPtr local_address,
bool using_original_dst) {
ConnectionPtr new_connection(
new ConnectionImpl(dispatcher_, fd, remote_address, local_address, using_original_dst));
new ConnectionImpl(dispatcher_, fd, remote_address, local_address, using_original_dst, true));
new_connection->setBufferLimits(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}

void SslListenerImpl::newConnection(int fd, Address::InstanceConstSharedPtr remote_address,
Address::InstanceConstSharedPtr local_address,
bool using_original_dst) {
ConnectionPtr new_connection(new Ssl::ConnectionImpl(dispatcher_, fd, remote_address,
local_address, using_original_dst, ssl_ctx_,
Ssl::ConnectionImpl::InitialState::Server));
ConnectionPtr new_connection(
new Ssl::ConnectionImpl(dispatcher_, fd, remote_address, local_address, using_original_dst,
true, ssl_ctx_, Ssl::ConnectionImpl::InitialState::Server));
new_connection->setBufferLimits(options_.per_connection_buffer_limit_bytes_);
cb_.onNewConnection(std::move(new_connection));
}
Expand Down
11 changes: 11 additions & 0 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

#include <arpa/inet.h>
#include <ifaddrs.h>

#if defined(__linux__)
#include <linux/netfilter_ipv4.h>
#endif

#include <netinet/ip.h>
#include <sys/socket.h>

Expand Down Expand Up @@ -291,6 +295,7 @@ Address::InstanceConstSharedPtr Utility::getAddressWithPort(const Address::Insta
}

Address::InstanceConstSharedPtr Utility::getOriginalDst(int fd) {
#ifdef SOL_IP
sockaddr_storage orig_addr;
socklen_t addr_len = sizeof(sockaddr_storage);
int status = getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len);
Expand All @@ -303,6 +308,12 @@ Address::InstanceConstSharedPtr Utility::getOriginalDst(int fd) {
} else {
return nullptr;
}
#else
// TODO(zuercher): determine if connection redirection is possible under OS X (c.f. pfctl and
// divert), and whether it's possible to find the learn destination address.
UNREFERENCED_PARAMETER(fd);
return nullptr;
#endif
}

void Utility::parsePortRangeList(const std::string& string, std::list<PortRange>& list) {
Expand Down
8 changes: 5 additions & 3 deletions source/common/ssl/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ getNullLocalAddress(const Network::Address::Instance& address) {
ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
Network::Address::InstanceConstSharedPtr remote_address,
Network::Address::InstanceConstSharedPtr local_address,
bool using_original_dst, Context& ctx, InitialState state)
: Network::ConnectionImpl(dispatcher, fd, remote_address, local_address, using_original_dst),
bool using_original_dst, bool connected, Context& ctx,
InitialState state)
: Network::ConnectionImpl(dispatcher, fd, remote_address, local_address, using_original_dst,
connected),
ctx_(dynamic_cast<Ssl::ContextImpl&>(ctx)), ssl_(ctx_.newSsl()) {
BIO* bio = BIO_new_socket(fd, 0);
SSL_set_bio(ssl_.get(), bio, bio);
Expand Down Expand Up @@ -294,7 +296,7 @@ std::string ConnectionImpl::getUriSanFromCertificate(X509* cert) {
ClientConnectionImpl::ClientConnectionImpl(Event::DispatcherImpl& dispatcher, Context& ctx,
Network::Address::InstanceConstSharedPtr address)
: ConnectionImpl(dispatcher, address->socket(Network::Address::SocketType::Stream), address,
getNullLocalAddress(*address), false, ctx, InitialState::Client) {}
getNullLocalAddress(*address), false, false, ctx, InitialState::Client) {}

void ClientConnectionImpl::connect() { doConnect(); }

Expand Down
2 changes: 1 addition & 1 deletion source/common/ssl/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ConnectionImpl : public Network::ConnectionImpl, public Connection {
ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd,
Network::Address::InstanceConstSharedPtr remote_address,
Network::Address::InstanceConstSharedPtr local_address, bool using_original_dst,
Context& ctx, InitialState state);
bool connected, Context& ctx, InitialState state);
~ConnectionImpl();

// Network::Connection
Expand Down
22 changes: 14 additions & 8 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ TEST_P(ConnectionImplDeathTest, BadFd) {
Event::DispatcherImpl dispatcher;
EXPECT_DEATH(ConnectionImpl(dispatcher, -1,
Network::Test::getCanonicalLoopbackAddress(GetParam()),
Network::Test::getCanonicalLoopbackAddress(GetParam()), false),
Network::Test::getCanonicalLoopbackAddress(GetParam()), false, false),
".*assert failure: fd_ != -1.*");
}

Expand Down Expand Up @@ -96,8 +96,9 @@ class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
server_connection_->addConnectionCallbacks(server_callbacks_);
server_connection_->addReadFilter(read_filter_);
}));
EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::Connected));
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::Connected))
.WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_->exit(); }));
dispatcher_->run(Event::Dispatcher::RunType::Block);
}

void disconnect(bool wait_for_remote_close) {
Expand Down Expand Up @@ -373,14 +374,16 @@ TEST_P(ConnectionImplTest, WriteWithWatermarks) {
EXPECT_CALL(*client_write_buffer_, move(_))
.WillRepeatedly(DoAll(AddBufferToStringWithoutDraining(&data_written),
Invoke(client_write_buffer_, &MockBuffer::baseMove)));
EXPECT_CALL(*client_write_buffer_, write(_))
.WillOnce(Invoke(client_write_buffer_, &MockBuffer::failWrite));
EXPECT_CALL(*client_write_buffer_, write(_)).WillOnce(Invoke([&](int fd) -> int {
dispatcher_->exit();
return client_write_buffer_->failWrite(fd);
}));
// The write() call on the connection will buffer enough data to bring the connection above the
// high watermark and as the data will not flush it should not return below the watermark.
EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark());
EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0);
client_connection_->write(second_buffer_to_write);
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
dispatcher_->run(Event::Dispatcher::RunType::Block);

// Clean up the connection. The close() (called via disconnect) will attempt to flush. The
// call to write() will succeed, bringing the connection back under the low watermark.
Expand Down Expand Up @@ -477,6 +480,7 @@ class ReadBufferLimitTest : public ConnectionImplTest {
.per_connection_buffer_limit_bytes_ = read_buffer_limit});

client_connection_ = dispatcher_->createClientConnection(socket_.localAddress());
client_connection_->addConnectionCallbacks(client_callbacks_);
client_connection_->connect();

read_filter_.reset(new NiceMock<MockReadFilter>());
Expand All @@ -502,8 +506,10 @@ class ReadBufferLimitTest : public ConnectionImplTest {
return FilterStatus::StopIteration;
}));

client_connection_->addConnectionCallbacks(client_callbacks_);
EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::Connected));
EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::Connected))
.WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_->exit(); }));
dispatcher_->run(Event::Dispatcher::RunType::Block);

EXPECT_CALL(client_callbacks_, onEvent(ConnectionEvent::RemoteClose))
.WillOnce(Invoke([&](Network::ConnectionEvent) -> void {
EXPECT_EQ(buffer_size, filter_seen);
Expand Down
1 change: 0 additions & 1 deletion test/common/network/proxy_protocol_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class ProxyProtocolTest : public testing::TestWithParam<Address::IpVersion> {
server_connection_ = std::move(conn);
server_connection_->addConnectionCallbacks(server_callbacks_);
server_connection_->addReadFilter(read_filter_);
printf("argh!\n");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops nevermind. You did del. :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did. It snuck in via a different PR of mine. 😬

}));
EXPECT_CALL(connection_callbacks_, onEvent(ConnectionEvent::Connected))
.WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_.exit(); }));
Expand Down
3 changes: 3 additions & 0 deletions test/common/network/utility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

#include "envoy/common/exception.h"

#include "common/common/thread.h"
#include "common/network/address_impl.h"
#include "common/network/utility.h"

#include "test/test_common/environment.h"
#include "test/test_common/network_utility.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"

Expand Down
7 changes: 5 additions & 2 deletions test/common/ssl/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,9 @@ class SslReadBufferLimitTest : public SslCertsTest,

client_connection_ =
dispatcher_->createSslClientConnection(*client_ctx_, socket_.localAddress());
client_connection_->addConnectionCallbacks(client_callbacks_);
client_connection_->connect();
read_filter_.reset(new Network::MockReadFilter());
client_connection_->addConnectionCallbacks(client_callbacks_);
}

void readBufferLimitTest(uint32_t read_buffer_limit, uint32_t expected_chunk_size,
Expand Down Expand Up @@ -534,7 +534,8 @@ class SslReadBufferLimitTest : public SslCertsTest,

initialize(read_buffer_limit);

EXPECT_CALL(client_callbacks_, onEvent(Network::ConnectionEvent::Connected));
EXPECT_CALL(client_callbacks_, onEvent(Network::ConnectionEvent::Connected))
.WillOnce(Invoke([&](Network::ConnectionEvent) -> void { dispatcher_->exit(); }));

EXPECT_CALL(listener_callbacks_, onNewConnection_(_))
.WillOnce(Invoke([&](Network::ConnectionPtr& conn) -> void {
Expand All @@ -545,6 +546,8 @@ class SslReadBufferLimitTest : public SslCertsTest,
EXPECT_EQ(read_buffer_limit, server_connection_->bufferLimit());
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);

EXPECT_CALL(*read_filter_, onNewConnection());
EXPECT_CALL(*read_filter_, onData(_)).Times(testing::AnyNumber());

Expand Down