Skip to content
Draft

neper #1793

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
FROM docker.io/rust:1.90.0-slim-bookworm AS cacher
ARG SCCACHE_BUCKET
ARG SCCACHE_REGION
ARG AWS_ACCESS_KEY_ID

Check warning on line 14 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build amd64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_ACCESS_KEY_ID") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/

Check warning on line 14 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build arm64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_ACCESS_KEY_ID") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/
ARG AWS_SECRET_ACCESS_KEY

Check warning on line 15 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build amd64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_SECRET_ACCESS_KEY") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/

Check warning on line 15 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build arm64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_SECRET_ACCESS_KEY") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/
ARG AWS_SESSION_TOKEN

Check warning on line 16 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build amd64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_SESSION_TOKEN") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/

Check warning on line 16 in Dockerfile

View workflow job for this annotation

GitHub Actions / Build arm64

Sensitive data should not be used in the ARG or ENV commands

SecretsUsedInArgOrEnv: Do not use ARG or ENV instructions for sensitive data (ARG "AWS_SESSION_TOKEN") More info: https://docs.docker.com/go/dockerfile/rule/secrets-used-in-arg-or-env/
ENV CARGO_INCREMENTAL=0
WORKDIR /app
RUN apt-get update && apt-get install -y \
Expand Down Expand Up @@ -77,13 +77,29 @@
if [ -n "${SCCACHE_BUCKET:-}" ]; then export RUSTC_WRAPPER=sccache; fi && \
cargo build --release --locked --bin lading --features logrotate_fs

# Stage 3: Runtime
# Stage 3: Build neper binaries
FROM docker.io/debian:bookworm-20241202-slim AS neper-builder
RUN apt-get update && apt-get install -y \
git \
build-essential \
libsctp-dev \
&& rm -rf /var/lib/apt/lists/*
RUN git clone https://github.com/google/neper.git /tmp/neper \
&& cd /tmp/neper \
&& make \
&& cp tcp_rr tcp_crr tcp_stream /usr/local/bin/ \
&& rm -rf /tmp/neper

# Stage 4: Runtime
FROM docker.io/debian:bookworm-20241202-slim
RUN apt-get update && apt-get install -y \
libfuse3-dev=3.14.0-4 \
fuse3=3.14.0-4 \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/lading /usr/bin/lading
COPY --from=neper-builder /usr/local/bin/tcp_rr /usr/local/bin/tcp_rr
COPY --from=neper-builder /usr/local/bin/tcp_crr /usr/local/bin/tcp_crr
COPY --from=neper-builder /usr/local/bin/tcp_stream /usr/local/bin/tcp_stream

# Smoke test
RUN ["/usr/bin/lading", "--help"]
Expand Down
2 changes: 1 addition & 1 deletion lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ k8s-openapi = { version = "0.26.0", default-features = false, features = [
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-util = { workspace = true }
nix = { version = "0.30", features = ["fs", "signal"] }
nix = { version = "0.30", features = ["fs", "resource", "signal"] }
num_cpus = { version = "1.17" }
num-traits = { version = "0.2", default-features = false }
once_cell = { workspace = true }
Expand Down
7 changes: 6 additions & 1 deletion lading/src/bin/lading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ async fn inner_main(

let mut tsrv_joinset = task::JoinSet::new();
let mut osrv_joinset = task::JoinSet::new();

let target_present = config.target.is_some();
//
// OBSERVER
//
Expand All @@ -605,6 +607,7 @@ async fn inner_main(
let obs_rcv = tgt_snd.subscribe();
let observer_server = observer::Server::new(config.observer, shutdown_watcher.clone())?;
let sample_period = Duration::from_millis(config.sample_period_milliseconds);
info!("starting observer {:#?}", sample_period);
osrv_joinset.spawn(observer_server.run(obs_rcv, sample_period));

//
Expand All @@ -625,7 +628,9 @@ async fn inner_main(
async move {
info!("waiting for target startup");
target_running_watcher.recv().await;
info!("target is running, now sleeping for warmup");
if target_present {
info!("target is running, now sleeping for warmup");
}
sleep(warmup_duration).await;
experiment_started_broadcast.signal();
info!("warmup completed, collecting samples");
Expand Down
7 changes: 7 additions & 0 deletions lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,13 @@ fn check_generator(config: &generator::Config, args: &Args) -> Result<Option<Fin
let conf = lading_payload::Config::TraceAgent(g.variant);
generate_and_check(&conf, g.seed, total_bytes, g.maximum_block_size, args)
}
generator::Inner::Neper(_) => {
if args.fingerprint {
warn!("Neper not supported for fingerprinting");
return Ok(None);
}
unimplemented!("Neper not supported")
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions lading/src/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod common;
pub mod datadog;
pub mod datadog_stateful_logs;
pub mod http;
pub mod neper;
pub mod otlp;
pub mod splunk_hec;
pub mod sqs;
Expand Down Expand Up @@ -52,6 +53,9 @@ pub enum Error {
/// See [`crate::blackhole::otlp::Error`] for details.
#[error(transparent)]
Otlp(otlp::Error),
/// See [`crate::blackhole::neper::Error`] for details.
#[error(transparent)]
Neper(neper::Error),
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -101,6 +105,8 @@ pub enum Inner {
Sqs(sqs::Config),
/// See [`crate::blackhole::otlp::Config`] for details.
Otlp(otlp::Config),
/// See [`crate::blackhole::neper::Config`] for details.
Neper(neper::Config),
}

#[derive(Debug)]
Expand Down Expand Up @@ -129,6 +135,8 @@ pub enum Server {
Sqs(sqs::Sqs),
/// See [`crate::blackhole::otlp::Otlp`] for details.
Otlp(otlp::Otlp),
/// See [`crate::blackhole::neper::Neper`] for details.
Neper(neper::Neper),
}

impl Server {
Expand Down Expand Up @@ -169,6 +177,7 @@ impl Server {
Inner::Otlp(conf) => {
Self::Otlp(otlp::Otlp::new(&config.general, &conf, &shutdown).map_err(Error::Otlp)?)
}
Inner::Neper(conf) => Self::Neper(neper::Neper::new(config.general, &conf, shutdown)),
};
Ok(server)
}
Expand Down Expand Up @@ -196,6 +205,7 @@ impl Server {
Server::Sqs(inner) => inner.run().await.map_err(Error::Sqs),
Server::SplunkHec(inner) => inner.run().await.map_err(Error::SplunkHec),
Server::Otlp(inner) => inner.run().await.map_err(Error::Otlp),
Server::Neper(inner) => inner.run().await.map_err(Error::Neper),
}
}
}
247 changes: 247 additions & 0 deletions lading/src/blackhole/neper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
//! The neper network performance blackhole.
//!
//! This blackhole spawns a [neper](https://github.com/google/neper) server
//! process that listens for incoming connections from a neper client. When
//! the server exits (after the client disconnects), its stdout is parsed for
//! the throughput value and emitted as a gauge metric.
//!
//! ## Metrics
//!
//! `neper_throughput`: Throughput value reported by the neper server

use std::{io, path::PathBuf, process::Stdio};

use metrics::gauge;
use nix::sys::resource::{Resource, getrlimit, setrlimit};
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tracing::{info, warn};

use super::General;
use crate::generator::neper::Workload;

/// Directory where neper binaries are installed.
const NEPER_BIN_DIR: &str = "/usr/local/bin";

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
/// Configuration for the neper blackhole.
pub struct Config {
/// The workload to serve.
pub workload: Workload,
/// Optional control port override (`-C`).
pub control_port: Option<u16>,
/// Optional data port override (`-P`).
pub data_port: Option<u16>,
/// Extra CLI arguments forwarded verbatim to the neper binary.
#[serde(default)]
pub extra_args: Vec<String>,
}

#[derive(thiserror::Error, Debug)]
/// Errors produced by [`Neper`].
pub enum Error {
/// IO error
#[error(transparent)]
Io(#[from] io::Error),
/// Failed to set resource limits.
#[error("failed to set rlimit: {0}")]
Rlimit(#[from] nix::errno::Errno),
/// Neper process exited with a non-zero status.
#[error("neper server exited with {status}: {stderr}")]
NeperFailed {
/// Exit status
status: std::process::ExitStatus,
/// Captured stderr
stderr: String,
},
/// Neper server was still running when shutdown arrived and the grace
/// period expired.
#[error("neper server did not exit within the grace period after shutdown")]
ShutdownTimeout,
}

#[derive(Debug)]
/// The neper blackhole.
///
/// Spawns a neper server binary and keeps it running until shutdown.
pub struct Neper {
config: Config,
shutdown: lading_signal::Watcher,
metric_labels: Vec<(String, String)>,
}

impl Neper {
/// Create a new [`Neper`] blackhole instance.
#[must_use]
pub fn new(general: General, config: &Config, shutdown: lading_signal::Watcher) -> Self {
let mut metric_labels = vec![
("component".to_string(), "blackhole".to_string()),
("component_name".to_string(), "neper".to_string()),
];
if let Some(id) = general.id {
metric_labels.push(("id".to_string(), id));
}

Self {
config: config.clone(),
shutdown,
metric_labels,
}
}

/// Run the neper server until the child exits.
///
/// The blackhole always waits for the neper server to finish so that its
/// stdout can be parsed for the throughput metric. If a shutdown signal
/// arrives while the server is still running, a short grace period is
/// given; if the server does not exit in time it is killed and an error
/// is returned.
///
/// # Errors
///
/// Returns an error if spawning neper fails, it exits with a non-zero
/// status, or it is still running when the shutdown grace period expires.
pub async fn run(self) -> Result<(), Error> {
// Raise the open file descriptor limit to the hard limit. Neper opens
// many sockets and can easily exceed the default soft limit.
let (_, hard) = getrlimit(Resource::RLIMIT_NOFILE)?;
setrlimit(Resource::RLIMIT_NOFILE, hard, hard)?;
info!(nofile_limit = hard, "raised RLIMIT_NOFILE to hard limit");

let binary = PathBuf::from(NEPER_BIN_DIR).join(self.config.workload.binary_name());

let mut cmd = Command::new(&binary);

if let Some(port) = self.config.control_port {
cmd.arg("-C").arg(port.to_string());
}
if let Some(port) = self.config.data_port {
cmd.arg("-P").arg(port.to_string());
}
for arg in &self.config.extra_args {
cmd.arg(arg);
}

cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);

info!(?binary, "spawning neper server");
let mut child = cmd.spawn()?;

// Always wait for the child. If shutdown arrives first, give the
// server a short grace period to finish on its own before killing it.
let mut shutdown_wait = std::pin::pin!(self.shutdown.recv());
let exited_cleanly = tokio::select! {
result = child.wait() => {
let status = result?;
if !status.success() {
let stderr = read_child_stderr(&mut child).await;
warn!(%status, "neper server exited unexpectedly");
return Err(Error::NeperFailed { status, stderr });
}
true
}
() = &mut shutdown_wait => {
info!("shutdown signal received, waiting briefly for neper server to exit");
match tokio::time::timeout(
tokio::time::Duration::from_secs(5),
child.wait(),
).await {
Ok(Ok(status)) if status.success() => true,
Ok(Ok(status)) => {
let stderr = read_child_stderr(&mut child).await;
warn!(%status, "neper server exited with error during grace period");
return Err(Error::NeperFailed { status, stderr });
}
Ok(Err(e)) => return Err(Error::Io(e)),
Err(_) => {
warn!("grace period expired, killing neper server");
let _ = child.kill().await;
false
}
}
}
};

if exited_cleanly {
let stdout = read_child_stdout(&mut child).await;
if let Some(value) = parse_throughput(&stdout, self.config.workload) {
info!(throughput = value, "neper server run complete");
gauge!("neper_throughput", &self.metric_labels).set(value);
} else {
warn!("could not parse neper throughput from server output");
}
Ok(())
} else {
Err(Error::ShutdownTimeout)
}
}
}

async fn read_child_stdout(child: &mut tokio::process::Child) -> String {
match child.stdout.take() {
Some(mut so) => {
use tokio::io::AsyncReadExt;
let mut buf = String::new();
let _ = so.read_to_string(&mut buf).await;
buf
}
None => String::new(),
}
}

async fn read_child_stderr(child: &mut tokio::process::Child) -> String {
match child.stderr.take() {
Some(mut se) => {
use tokio::io::AsyncReadExt;
let mut buf = String::new();
let _ = se.read_to_string(&mut buf).await;
buf
}
None => String::new(),
}
}

/// Parse the throughput value from neper's stdout output.
///
/// Neper prints key=value pairs, one per line. We look for the line matching
/// the workload's throughput key.
fn parse_throughput(output: &str, workload: Workload) -> Option<f64> {
let key = workload.throughput_key();
for line in output.lines() {
let line = line.trim();
if let Some((k, v)) = line.split_once('=')
&& k.trim() == key
{
return v.trim().parse::<f64>().ok();
}
}
None
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn parse_throughput_tcp_rr() {
let output = "num_flows=1\nnum_threads=1\nthroughput=12345.67\n";
let value = parse_throughput(output, Workload::TcpRr).unwrap();
assert!((value - 12345.67).abs() < f64::EPSILON);
}

#[test]
fn parse_throughput_tcp_stream() {
let output = "throughput=98765.43\nlatency=0.5\n";
let value = parse_throughput(output, Workload::TcpStream).unwrap();
assert!((value - 98765.43).abs() < f64::EPSILON);
}

#[test]
fn parse_throughput_missing_key() {
let output = "latency=0.5\n";
assert!(parse_throughput(output, Workload::TcpRr).is_none());
}
}
Loading
Loading