Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/processor/types"
utilstypes "github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -1729,21 +1727,15 @@ def transformEvent(event, metadata):
t.Cleanup(mockGateway.Close)

// Start shared rudder-transformer.
transformerPort, err := kithelper.GetFreePort()
require.NoError(t, err)
transformerURL := fmt.Sprintf("http://localhost:%d", transformerPort)
transformerContainer := startRudderTransformer(t, pool, transformerPort, configBackend.URL, mockGateway.URL)
transformerContainer, transformerURL := startRudderTransformer(t, pool, configBackend.URL, mockGateway.URL)
t.Cleanup(func() {
if err := pool.Purge(transformerContainer); err != nil {
t.Logf("Failed to purge rudder-transformer: %v", err)
}
})

// Start shared rudder-pytransformer.
pyTransformerPort, err := kithelper.GetFreePort()
require.NoError(t, err)
pyTransformerURL := fmt.Sprintf("http://localhost:%d", pyTransformerPort)
pyTransformerContainer := startRudderPytransformer(t, pool, pyTransformerPort, configBackend.URL)
pyTransformerContainer, pyTransformerURL := startRudderPytransformer(t, pool, configBackend.URL)
t.Cleanup(func() {
if err := pool.Purge(pyTransformerContainer); err != nil {
t.Logf("Failed to purge rudder-pytransformer: %v", err)
Expand All @@ -1765,12 +1757,8 @@ def transformEvent(event, metadata):
env := newBCTestEnv(t, transformerURL, pyTransformerURL)

if st.config.code != "" {
openFaasPort, err := kithelper.GetFreePort()
require.NoError(t, err)
openFaasURL := fmt.Sprintf("http://localhost:%d", openFaasPort)

t.Logf("Starting openfaas-flask-base for %s (versionID=%s)...", st.name, st.versionID)
container := startOpenFaasFlask(t, pool, openFaasPort, st.versionID, configBackend.URL)
container, openFaasURL := startOpenFaasFlask(t, pool, st.versionID, configBackend.URL)
t.Cleanup(func() {
if err := pool.Purge(container); err != nil {
t.Logf("Failed to purge openfaas-flask-base: %v", err)
Expand Down
20 changes: 3 additions & 17 deletions integration_test/pytransformer_contract/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pytransformer_contract

import (
"context"
"fmt"
"testing"

"github.com/ory/dockertest/v3"
Expand All @@ -11,7 +10,6 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/processor/types"
Expand Down Expand Up @@ -98,20 +96,8 @@ def transformEvent(event, metadata):
defer configBackend.Close()
t.Logf("Config backend at %s", configBackend.URL)

t.Log("Allocating free ports...")
openFaasPort, err := kithelper.GetFreePort()
require.NoError(t, err)
transformerPort, err := kithelper.GetFreePort()
require.NoError(t, err)
pyTransformerPort, err := kithelper.GetFreePort()
require.NoError(t, err)

openFaasURL := fmt.Sprintf("http://localhost:%d", openFaasPort)
transformerURL := fmt.Sprintf("http://localhost:%d", transformerPort)
pyTransformerURL := fmt.Sprintf("http://localhost:%d", pyTransformerPort)

t.Log("Starting openfaas-flask-base container...")
openFaasContainer := startOpenFaasFlask(t, pool, openFaasPort, versionID, configBackend.URL)
openFaasContainer, openFaasURL := startOpenFaasFlask(t, pool, versionID, configBackend.URL)
defer func() {
if err := pool.Purge(openFaasContainer); err != nil {
t.Logf("Failed to purge openfaas-flask-base container: %v", err)
Expand All @@ -125,15 +111,15 @@ def transformEvent(event, metadata):
t.Logf("Mock OpenFaaS gateway at %s", mockGateway.URL)

t.Log("Starting rudder-transformer container...")
transformerContainer := startRudderTransformer(t, pool, transformerPort, configBackend.URL, mockGateway.URL)
transformerContainer, transformerURL := startRudderTransformer(t, pool, configBackend.URL, mockGateway.URL)
defer func() {
if err := pool.Purge(transformerContainer); err != nil {
t.Logf("Failed to purge rudder-transformer container: %v", err)
}
}()

t.Log("Starting rudder-pytransformer container...")
pyTransformerContainer := startRudderPytransformer(t, pool, pyTransformerPort, configBackend.URL)
pyTransformerContainer, pyTransformerURL := startRudderPytransformer(t, pool, configBackend.URL)
defer func() {
if err := pool.Purge(pyTransformerContainer); err != nil {
t.Logf("Failed to purge rudder-pytransformer container: %v", err)
Expand Down
171 changes: 122 additions & 49 deletions integration_test/pytransformer_contract/bc_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"runtime"
"strconv"
"strings"
"sync"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
dockertesthelper "github.com/rudderlabs/rudder-go-kit/testhelper/docker"
miniodocker "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/registry"

Expand All @@ -32,6 +34,69 @@ import (
"github.com/rudderlabs/rudder-server/processor/usertransformer"
)

// containerConfig holds platform-specific Docker container configuration.
// On Linux, containers use host networking (sharing the host's network namespace).
// On macOS, containers use bridge networking with port bindings and host.docker.internal.
type containerConfig struct {
hostPort int // allocated host port (Linux only)
ExtraHosts []string
PortBindings map[docker.Port][]docker.PortBinding
hostConfigFn func(*docker.HostConfig)
}

// newContainerConfig returns the appropriate Docker configuration for the current platform.
// Default is host networking (Linux, CI, production). On macOS, Docker Desktop does not
// support host networking so we fall back to bridge networking with port bindings.
func newContainerConfig(t *testing.T, containerPort string) containerConfig {
t.Helper()
if runtime.GOOS == "darwin" {
return containerConfig{
ExtraHosts: []string{"host.docker.internal:host-gateway"},
PortBindings: map[docker.Port][]docker.PortBinding{
docker.Port(containerPort + "/tcp"): {{HostIP: "127.0.0.1", HostPort: "0"}},
},
hostConfigFn: func(hc *docker.HostConfig) {},
}
}
port, err := kithelper.GetFreePort()
require.NoError(t, err)
return containerConfig{
hostPort: port,
hostConfigFn: func(hc *docker.HostConfig) {
hc.NetworkMode = "host"
},
}
}

// portStr returns the port to pass as a container environment variable.
func (c containerConfig) portStr(containerPort string) string {
if runtime.GOOS == "darwin" {
return containerPort
}
return strconv.Itoa(c.hostPort)
}

// url returns the URL to reach the container from the host test process.
func (c containerConfig) url(container *dockertest.Resource, containerPort string) string {
if runtime.GOOS == "darwin" {
return fmt.Sprintf("http://%s:%s",
container.GetBoundIP(containerPort+"/tcp"),
container.GetPort(containerPort+"/tcp"),
)
}
return fmt.Sprintf("http://localhost:%d", c.hostPort)
}

// toContainerURL rewrites a host URL for use inside a Docker container.
// On macOS (bridge networking): replaces localhost/127.0.0.1 with host.docker.internal.
// Default (host networking): returns the URL as-is since containers share the host namespace.
func toContainerURL(url string) string {
if runtime.GOOS == "darwin" {
return dockertesthelper.ToInternalDockerHost(url)
}
return url
}

// bcTestEnv holds clients for both the old architecture (rudder-transformer + openfaas)
// and the new architecture (rudder-pytransformer) to compare their responses.
type bcTestEnv struct {
Expand Down Expand Up @@ -309,78 +374,89 @@ func newMockOpenFaaSGateway(t *testing.T, getTarget func() string) (*httptest.Se
// startOpenFaasFlask starts an openfaas-flask-base container with transformation code
// loaded at startup via --vid and --config-backend-url. Optional extra environment
// variables can be passed (e.g. "geolocation_url=http://...").
// Returns the container resource and the URL to reach it from the host.
func startOpenFaasFlask(
t *testing.T, pool *dockertest.Pool,
port int, versionID, configBackendURL string,
versionID, configBackendURL string,
extraEnv ...string,
) *dockertest.Resource {
) (*dockertest.Resource, string) {
t.Helper()
const containerPort = "8080"
cfg := newContainerConfig(t, containerPort)
env := []string{
fmt.Sprintf("fprocess=python index.py --vid %s --config-backend-url %s", versionID, configBackendURL),
fmt.Sprintf("port=%d", port),
fmt.Sprintf("fprocess=python index.py --vid %s --config-backend-url %s", versionID, toContainerURL(configBackendURL)),
fmt.Sprintf("port=%s", cfg.portStr(containerPort)),
}
for _, e := range extraEnv {
env = append(env, toContainerURL(e))
}
env = append(env, extraEnv...)
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "422074288268.dkr.ecr.us-east-1.amazonaws.com/rudderstack/openfaas-flask",
Tag: "latest",
Auth: registry.AuthConfiguration(),
Env: env,
}, func(hc *docker.HostConfig) {
hc.NetworkMode = "host"
})
Repository: "422074288268.dkr.ecr.us-east-1.amazonaws.com/rudderstack/openfaas-flask",
Tag: "latest",
Auth: registry.AuthConfiguration(),
Env: env,
ExtraHosts: cfg.ExtraHosts,
PortBindings: cfg.PortBindings,
}, cfg.hostConfigFn)
require.NoError(t, err, "failed to start openfaas-flask-base container")
return container
return container, cfg.url(container, containerPort)
}

// startRudderTransformer starts a rudder-transformer container configured to use
// the mock config backend and mock OpenFaaS gateway.
// Returns the container resource and the URL to reach it from the host.
func startRudderTransformer(
t *testing.T, pool *dockertest.Pool,
port int, configBackendURL, openfaasGatewayURL string,
) *dockertest.Resource {
configBackendURL, openfaasGatewayURL string,
) (*dockertest.Resource, string) {
t.Helper()
const containerPort = "9090"
cfg := newContainerConfig(t, containerPort)
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "rudderstack/rudder-transformer",
Tag: "latest",
Env: []string{
"CONFIG_BACKEND_URL=" + configBackendURL,
"OPENFAAS_GATEWAY_URL=" + openfaasGatewayURL,
"PORT=" + strconv.Itoa(port),
"CONFIG_BACKEND_URL=" + toContainerURL(configBackendURL),
"OPENFAAS_GATEWAY_URL=" + toContainerURL(openfaasGatewayURL),
"PORT=" + cfg.portStr(containerPort),
"NODE_OPTIONS=--no-node-snapshot",
},
}, func(hc *docker.HostConfig) {
hc.NetworkMode = "host"
})
ExtraHosts: cfg.ExtraHosts,
PortBindings: cfg.PortBindings,
}, cfg.hostConfigFn)
require.NoError(t, err, "failed to start rudder-transformer container")
return container
return container, cfg.url(container, containerPort)
}

// startRudderPytransformer starts a rudder-pytransformer container configured
// to use the mock config backend. Optional extra environment variables can be
// passed (e.g. "GEOLOCATION_URL=http://...").
// Returns the container resource and the URL to reach it from the host.
func startRudderPytransformer(
t *testing.T, pool *dockertest.Pool,
port int, configBackendURL string,
configBackendURL string,
extraEnv ...string,
) *dockertest.Resource {
) (*dockertest.Resource, string) {
t.Helper()
const containerPort = "8080"
cfg := newContainerConfig(t, containerPort)
env := []string{
"CONFIG_BACKEND_URL=" + configBackendURL,
"GUNICORN_WORKERS=1",
"GUNICORN_TIMEOUT=120",
"GUNICORN_BIND=0.0.0.0:" + strconv.Itoa(port),
"CONFIG_BACKEND_URL=" + toContainerURL(configBackendURL),
"UVICORN_PORT=" + cfg.portStr(containerPort),
}
for _, e := range extraEnv {
env = append(env, toContainerURL(e))
}
env = append(env, extraEnv...)
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "422074288268.dkr.ecr.us-east-1.amazonaws.com/rudderstack/rudder-pytransformer",
Tag: "latest",
Auth: registry.AuthConfiguration(),
Env: env,
}, func(hc *docker.HostConfig) {
hc.NetworkMode = "host"
})
Repository: "422074288268.dkr.ecr.us-east-1.amazonaws.com/rudderstack/rudder-pytransformer",
Tag: "latest",
Auth: registry.AuthConfiguration(),
Env: env,
ExtraHosts: cfg.ExtraHosts,
PortBindings: cfg.PortBindings,
}, cfg.hostConfigFn)
require.NoError(t, err, "failed to start rudder-pytransformer container")
return container
return container, cfg.url(container, containerPort)
}

// waitForHealthy polls a service's /health endpoint until it returns 200 OK.
Expand Down Expand Up @@ -458,8 +534,7 @@ func normalizeResponseErrors(r *types.Response) {
// startRudderGeolocation starts a MinIO container, uploads the test MMDB file,
// then starts a rudder-geolocation container configured to download the database
// from MinIO on startup. The container serves the /geoip/{ip} endpoint.
// Like other containers in this test suite, it uses host networking so it can
// reach MinIO at localhost and be reached by other host-networked containers.
// Returns the container resource and the URL to reach it from the host.
func startRudderGeolocation(t *testing.T, pool *dockertest.Pool) (*dockertest.Resource, string) {
t.Helper()

Expand All @@ -475,31 +550,29 @@ func startRudderGeolocation(t *testing.T, pool *dockertest.Pool) (*dockertest.Re
)
require.NoError(t, err, "failed to upload city_test.mmdb to MinIO")

geoPort, err := kithelper.GetFreePort()
require.NoError(t, err, "failed to get free port for rudder-geolocation")

const containerPort = "8080"
cfg := newContainerConfig(t, containerPort)
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "422074288268.dkr.ecr.us-east-1.amazonaws.com/rudderstack/rudder-geolocation",
Tag: "main",
Auth: registry.AuthConfiguration(),
Env: []string{
"PORT=" + strconv.Itoa(geoPort),
"PORT=" + cfg.portStr(containerPort),
"BUCKET=" + minioResource.BucketName,
"KEY=city_test.mmdb",
"OUTPUT_PATH=/tmp/city.mmdb",
"REGION=us-east-1",
"S3_ENDPOINT=" + "http://" + minioResource.Endpoint,
"S3_ENDPOINT=" + toContainerURL("http://"+minioResource.Endpoint),
"S3_FORCE_PATH_STYLE=true",
"AWS_ACCESS_KEY_ID=" + minioResource.AccessKeyID,
"AWS_SECRET_ACCESS_KEY=" + minioResource.AccessKeySecret,
},
}, func(hc *docker.HostConfig) {
hc.NetworkMode = "host"
})
ExtraHosts: cfg.ExtraHosts,
PortBindings: cfg.PortBindings,
}, cfg.hostConfigFn)
require.NoError(t, err, "failed to start rudder-geolocation container")

geoURL := fmt.Sprintf("http://localhost:%d", geoPort)
return container, geoURL
return container, cfg.url(container, containerPort)
}

// waitForGeolocation polls the rudder-geolocation service until it responds to
Expand Down
Loading
Loading