Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 84 additions & 130 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,145 +49,98 @@ import (
)

func BenchmarkFilestream(b *testing.B) {
logp.TestingSetup(logp.ToDiscardOutput())
logger := logp.NewNopLogger()

b.Run("single file", func(b *testing.B) {
lineCount := 10000
filename := generateFile(b, b.TempDir(), lineCount)
b.ResetTimer()
cases := []struct {
name string
lineCount int
fileCount int
fingerprint bool
}{
{"1_file/inode", 10_000, 1, false},
{"1_file/fingerprint", 10_000, 1, true},
{"100_files/inode", 1000, 100, false},
{"100_files/fingerprint", 1000, 100, true},
{"1000_files/fingerprint", 20, 1000, true},
{"10000_files/fingerprint", 20, 10_000, true},
}

b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
prospector.scanner.fingerprint.enabled: false
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-inode-benchmark-%d", i), cfg, lineCount)
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
dir := b.TempDir()
var ingestPath string
for i := 0; i < tc.fileCount; i++ {
ingestPath = generateFile(b, dir, tc.lineCount)
}
})

b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("one-file-fp-benchmark-%d", i), cfg, lineCount)
if tc.fileCount > 1 {
ingestPath = filepath.Join(dir, "*")
}
})
})

b.Run("many files", func(b *testing.B) {
lineCount := 1000
fileCount := 100
dir := b.TempDir()

for i := 0; i < fileCount; i++ {
_ = generateFile(b, dir, lineCount)
}

ingestPath := filepath.Join(dir, "*")
expEvents := lineCount * fileCount
b.ResetTimer()
expEvents := tc.lineCount * tc.fileCount
cfg := filestreamBenchCfg(ingestPath, tc.fingerprint)
b.ResetTimer()

b.Run("inode throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
prospector.scanner.fingerprint.enabled: false
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-inode-benchmark-%d", i), cfg, expEvents)
runFilestreamBenchmark(b, logger, fmt.Sprintf("%s-%d", tc.name, i), cfg, expEvents)
}
})
}

b.Run("fingerprint throughput", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner:
fingerprint.enabled: true
check_interval: 1s
file_identity.fingerprint: ~
paths:
- ` + ingestPath + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("many-files-fp-benchmark-%d", i), cfg, expEvents)
}
})
})

b.Run("line filter", func(b *testing.B) {
lineCount := 10000
b.Run("line_filter", func(b *testing.B) {
lineCount := 10_000
filename := generateFile(b, b.TempDir(), lineCount)
b.ResetTimer()

b.Run("no filter", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
prospector.scanner.fingerprint.enabled: false
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("no-filter-%d", i), cfg, lineCount)
}
})

b.Run("with include_lines", func(b *testing.B) {
cfg := `
filterCases := []struct {
name string
includeLines string
excludeLines string
expEvents int
}{
{"none", "", "", lineCount},
{"include", "include_lines: ['^rather']", "", lineCount},
{"exclude", "", "exclude_lines: ['^NOMATCH']", lineCount},
{"include_and_exclude", "include_lines: ['^rather']", "exclude_lines: ['^NOMATCH']", lineCount},
{"drop_all", "include_lines: [' - 9999$']", "", 1},
}
for _, fc := range filterCases {
b.Run(fc.name, func(b *testing.B) {
cfg := fmt.Sprintf(`
type: filestream
prospector.scanner.check_interval: 1s
prospector.scanner.check_interval: 100ms
prospector.scanner.fingerprint.enabled: false
include_lines: ['^rather']
close.reader.on_eof: true
file_identity.native: ~
%s
%s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("include-lines-%d", i), cfg, lineCount)
}
})
- %s
`, fc.includeLines, fc.excludeLines, filename)
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, logger, fmt.Sprintf("filter-%s-%d", fc.name, i), cfg, fc.expEvents)
}
})
}
})
}

b.Run("with exclude_lines", func(b *testing.B) {
cfg := `
type: filestream
prospector.scanner.check_interval: 1s
func filestreamBenchCfg(path string, fingerprint bool) string {
identity := `
prospector.scanner.fingerprint.enabled: false
exclude_lines: ['^NOMATCH']
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("exclude-lines-%d", i), cfg, lineCount)
}
})

b.Run("with include_and_exclude_lines", func(b *testing.B) {
cfg := `
file_identity.native: ~`
if fingerprint {
identity = `
prospector.scanner.fingerprint.enabled: true
file_identity.fingerprint: ~`
}
return fmt.Sprintf(`
type: filestream
prospector.scanner.check_interval: 1s
prospector.scanner.fingerprint.enabled: false
include_lines: ['^rather']
exclude_lines: ['^NOMATCH']
prospector.scanner.check_interval: 100ms
close.reader.on_eof: true%s
paths:
- ` + filename + `
`
for i := 0; i < b.N; i++ {
runFilestreamBenchmark(b, fmt.Sprintf("include-exclude-lines-%d", i), cfg, lineCount)
}
})
})
- %s
`, identity, path)
}

func TestTakeOverTags(t *testing.T) {
Expand All @@ -214,6 +167,7 @@ func TestTakeOverTags(t *testing.T) {
},
},
}
logger := logptest.NewTestingLogger(t, "")
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
filename := generateFile(t, t.TempDir(), 5)
Expand All @@ -225,7 +179,7 @@ prospector.scanner.fingerprint.enabled: false
take_over.enabled: %t
paths:
- %s`, testCase.takeOver, filename)
runner := createFilestreamTestRunner(context.Background(), t, testCase.name, cfg, 5, true)
runner := createFilestreamTestRunner(t, logger, testCase.name, cfg, 5, true)
events := runner(t)
for _, event := range events {
testCase.testFunc(t, event)
Expand Down Expand Up @@ -395,11 +349,11 @@ func TestOpenFile_GZIPNeverTruncated(t *testing.T) {
// `testID` must be unique for each test run
// `cfg` must be a valid YAML string containing valid filestream configuration
// `expEventCount` is an expected amount of produced events
func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCount int) {
func runFilestreamBenchmark(b *testing.B, logger *logp.Logger, testID string, cfg string, expEventCount int) {
b.Helper()
// we don't include initialization in the benchmark time
b.StopTimer()
runner := createFilestreamTestRunner(context.Background(), b, testID, cfg, int64(expEventCount), false)
runner := createFilestreamTestRunner(b, logger, testID, cfg, int64(expEventCount), false)
// this is where the benchmark actually starts
b.StartTimer()
_ = runner(b)
Expand All @@ -414,16 +368,15 @@ func runFilestreamBenchmark(b *testing.B, testID string, cfg string, expEventCou
// Events should not be collected in benchmarks due to high extra costs of using the channel.
//
// returns a runner function that returns produced events.
func createFilestreamTestRunner(ctx context.Context, b testing.TB, testID string, cfg string, eventLimit int64, collectEvents bool) func(t testing.TB) []beat.Event {
logger := logp.L()
func createFilestreamTestRunner(b testing.TB, logger *logp.Logger, testID string, cfg string, eventLimit int64, collectEvents bool) func(t testing.TB) []beat.Event {
c, err := conf.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(b, err)

p := Plugin(logger, createTestStore(b))
input, err := p.Manager.Create(c)
require.NoError(b, err)

ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(b.Context())
v2ctx := v2.Context{
ID: testID,
IDWithoutName: testID,
Expand All @@ -434,24 +387,21 @@ func createFilestreamTestRunner(ctx context.Context, b testing.TB, testID string
Logger: logger,
}

connector, events := newTestPipeline(eventLimit, collectEvents)
var out []beat.Event
if collectEvents {
out = make([]beat.Event, 0, eventLimit)
}
connector, events := newTestPipeline(eventLimit, collectEvents)
go func() {
// even if `collectEvents` is false we need to range the channel
// and wait until it's closed indicating that the input finished its job
defer cancel()
for event := range events {
out = append(out, event)
}
cancel()
}()

return func(t testing.TB) []beat.Event {
err := input.Run(v2ctx, connector)
require.NoError(b, err)

return out
}
}
Expand Down Expand Up @@ -492,7 +442,11 @@ func (s *testStore) CleanupInterval() time.Duration {
}

func newTestPipeline(eventLimit int64, collectEvents bool) (pc beat.PipelineConnector, out <-chan beat.Event) {
ch := make(chan beat.Event, eventLimit)
var chBuf int64
if collectEvents {
chBuf = eventLimit
}
ch := make(chan beat.Event, chBuf)
return &testPipeline{limit: eventLimit, out: ch, collect: collectEvents}, ch
}

Expand Down
13 changes: 9 additions & 4 deletions filebeat/input/filestream/internal/input-logfile/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,14 @@ type HarvesterStatus struct {
Size int64
}

func (hg *defaultHarvesterGroup) notifyObserver(srcID string, size int64) {
if hg.notifyChan != nil {
hg.notifyChan <- HarvesterStatus{srcID, size}
func (hg *defaultHarvesterGroup) notifyObserver(canceler inputv2.Canceler, srcID string, size int64) {
if hg.notifyChan == nil {
return
}

select {
case hg.notifyChan <- HarvesterStatus{srcID, size}:
case <-canceler.Done():
}
}

Expand Down Expand Up @@ -316,7 +321,7 @@ func startHarvester(
return
}

hg.notifyObserver(srcID, st.Offset)
hg.notifyObserver(canceler, srcID, st.Offset)
ctx.Logger.Debugf("Harvester '%s' closed with offset: %d", srcID, st.Offset)
}()

Expand Down
Loading