Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package semantic

import (
"errors"
"fmt"
"io/fs"
"path"
"slices"
"strings"

"gopkg.in/yaml.v3"
Expand All @@ -22,8 +24,9 @@ const (
)

type policyTemplateInput struct {
Type string `yaml:"type"`
TemplatePath string `yaml:"template_path"` // optional for integration packages
Type string `yaml:"type"`
TemplatePath string `yaml:"template_path"`
TemplatePaths []string `yaml:"template_paths"`
}

type integrationPolicyTemplate struct {
Expand All @@ -37,15 +40,27 @@ type integrationPackageManifest struct { // package manifest
}

type stream struct {
Input string `yaml:"input"`
TemplatePath string `yaml:"template_path"`
Input string `yaml:"input"`
TemplatePath string `yaml:"template_path"`
TemplatePaths []string `yaml:"template_paths"`
}

type dataStreamManifest struct {
Streams []stream `yaml:"streams"`
}

// ValidateIntegrationPolicyTemplates validates the template_path fields at the policy template level for integration type packages
// dataStreamManifestReadError records which data_stream/<name>/manifest.yml failed to read or parse.
type dataStreamManifestReadError struct {
relPath string
err error
}

func (e *dataStreamManifestReadError) Error() string { return e.err.Error() }
func (e *dataStreamManifestReadError) Unwrap() error { return e.err }

// ValidateIntegrationPolicyTemplates validates agent input and stream template files for
// integration packages, following Fleet/EPM resolution (template_paths before template_path;
// stream default stream.yml.hbs when neither is set on a stream).
func ValidateIntegrationPolicyTemplates(fsys fspath.FS) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors

Expand All @@ -63,53 +78,117 @@ func ValidateIntegrationPolicyTemplates(fsys fspath.FS) specerrors.ValidationErr
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %w", fsys.Path(manifestPath), errFailedToParseManifest)}
}

// only validate integration type packages
if manifest.Type != packageTypeIntegration {
return nil
}

// read at once all data stream manifests
dataStreamsManifestMap, err := readDataStreamsManifests(fsys)
if err != nil {
var dsReadErr *dataStreamManifestReadError
if errors.As(err, &dsReadErr) {
return specerrors.ValidationErrors{
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %w", fsys.Path(dsReadErr.relPath), dsReadErr.err)}
}
return specerrors.ValidationErrors{
specerrors.NewStructuredErrorf("file \"%s\" is invalid: %w", fsys.Path(manifestPath), err)}
specerrors.NewStructuredErrorf("invalid data stream manifests: %w", err)}
}

errs = append(errs, validateAllDataStreamStreamTemplates(fsys, dataStreamsManifestMap)...)

for _, policyTemplate := range manifest.PolicyTemplates {
err = validateIntegrationPackagePolicyTemplate(fsys, policyTemplate, dataStreamsManifestMap)
if err != nil {
if err := validateIntegrationPolicyTemplateInputs(fsys, policyTemplate); err != nil {
errs = append(errs, specerrors.NewStructuredErrorf(
"file \"%s\" is invalid: policy template \"%s\" references input template_path: %w",
"file \"%s\" is invalid: policy template \"%s\": %w",
fsys.Path(manifestPath), policyTemplate.Name, err))
}
}

return errs
}

// validateIntegrationPackagePolicyTemplate validates the template_path fields at the policy template level for integration type packages
func validateIntegrationPackagePolicyTemplate(fsys fspath.FS, policyTemplate integrationPolicyTemplate, dsManifestMap map[string]dataStreamManifest) error {
// validateIntegrationPolicyTemplateInputs validates policy template inputs[] template files
// under agent/input when template_paths or template_path is set (Fleet: template_paths first).
func validateIntegrationPolicyTemplateInputs(fsys fspath.FS, policyTemplate integrationPolicyTemplate) error {
for _, input := range policyTemplate.Inputs {
if input.TemplatePath != "" {
// validate the provided template_path file exists
err := validateAgentInputTemplatePath(fsys, input.TemplatePath)
if err != nil {
return fmt.Errorf("error validating input \"%s\": %w", input.Type, err)
if len(input.TemplatePaths) > 0 {
for _, tp := range input.TemplatePaths {
if err := validateAgentInputTemplatePath(fsys, tp); err != nil {
return fmt.Errorf("failed validation for policy input %q: %w", input.Type, err)
}
}
continue
}
if input.TemplatePath != "" {
if err := validateAgentInputTemplatePath(fsys, input.TemplatePath); err != nil {
return fmt.Errorf("failed validation for policy input %q: %w", input.Type, err)
}
}
}
return nil
}

// validateAllDataStreamStreamTemplates validates every stream row in every data stream manifest.
func validateAllDataStreamStreamTemplates(fsys fspath.FS, dsMap map[string]dataStreamManifest) specerrors.ValidationErrors {
var errs specerrors.ValidationErrors

dsDirs := make([]string, 0, len(dsMap))
for d := range dsMap {
dsDirs = append(dsDirs, d)
}
slices.Sort(dsDirs)

for _, dsDir := range dsDirs {
dsManifestPath := path.Join(dsDir, "manifest.yml")
manifest := dsMap[dsDir]
for _, s := range manifest.Streams {
if err := validateSingleDataStreamStreamTemplates(fsys, dsDir, s); err != nil {
errs = append(errs, specerrors.NewStructuredErrorf(
"file \"%s\" is invalid: data stream \"%s\" stream input %q: %w",
fsys.Path(dsManifestPath), dsDir, s.Input, err))
}
Comment on lines +145 to +148
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of the index of the stream, I think it could be added the stream title. According to the spec, that field is mandatory:

Suggested change
errs = append(errs, specerrors.NewStructuredErrorf(
"file \"%s\" is invalid: data stream \"%s\" stream %d (input %q): %w",
fsys.Path(dsManifestPath), dsDir, i+1, streamInputLabel(s.Input), err))
}
errs = append(errs, specerrors.NewStructuredErrorf(
"file \"%s\" is invalid: data stream \"%s\" stream %q (input %q): %w",
fsys.Path(dsManifestPath), dsDir, s.Title, streamInputLabel(s.Input), err))
}

IIUC for that it should be added title to the struct:

type stream struct {
	Input         string   `yaml:"input"`
	Title         string   `yaml:"title"`
	TemplatePath  string   `yaml:"template_path"`
	TemplatePaths []string `yaml:"template_paths"`
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've used the streams input key instead. thnks

}
}
return errs
}

err := validateInputWithStreams(fsys, input.Type, dsManifestMap)
if err != nil {
return fmt.Errorf("error validating input from streams \"%s\": %w", input.Type, err)
// validateSingleDataStreamStreamTemplates checks stream template files under dsDir/agent/stream
// using Fleet parseAndVerifyStreams / compile precedence (template_paths first, else template_path
// or default stream.yml.hbs).
func validateSingleDataStreamStreamTemplates(fsys fspath.FS, dsDir string, s stream) error {
dir := path.Join(dsDir, "agent", "stream")

if len(s.TemplatePaths) > 0 {
for _, tp := range s.TemplatePaths {
if err := validateStreamTemplateFile(fsys, dir, tp); err != nil {
return err
}
}
return nil
}

tp := s.TemplatePath
if tp == "" {
tp = defaultStreamTemplatePath
}
return validateStreamTemplateFile(fsys, dir, tp)
}

func validateStreamTemplateFile(fsys fspath.FS, dir, templatePath string) error {
foundFile, err := findPathAtDirectory(fsys, dir, templatePath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
return errTemplateNotFound
}
return err
}
if foundFile == "" {
return errTemplateNotFound
}
return nil
}

// readDataStreamsManifests reads all data stream manifests and returns a map of data stream directory to its manifest relevant content
func readDataStreamsManifests(fsys fspath.FS) (map[string]dataStreamManifest, error) {
// map of data stream directory to its manifest
dsManifestMap := make(map[string]dataStreamManifest, 0)

dsManifests, err := fs.Glob(fsys, "data_stream/*/manifest.yml")
Expand All @@ -119,12 +198,12 @@ func readDataStreamsManifests(fsys fspath.FS) (map[string]dataStreamManifest, er
for _, file := range dsManifests {
data, err := fs.ReadFile(fsys, file)
if err != nil {
return nil, err
return nil, &dataStreamManifestReadError{relPath: file, err: err}
}
var m dataStreamManifest
err = yaml.Unmarshal(data, &m)
if err != nil {
return nil, err
return nil, &dataStreamManifestReadError{relPath: file, err: err}
}

dsDir := path.Dir(file)
Expand All @@ -134,57 +213,21 @@ func readDataStreamsManifests(fsys fspath.FS) (map[string]dataStreamManifest, er
return dsManifestMap, nil
}

// validateInputWithStreams validates that for the given input type, the streams of each dataset related to it have valid template_path files
// an input is related to a data_stream if any of its streams has the same input type as input
func validateInputWithStreams(fsys fspath.FS, input string, dsMap map[string]dataStreamManifest) error {
for dsDir, manifest := range dsMap {
for _, stream := range manifest.Streams {
// only consider streams that match the input type of the policy template
if stream.Input != input {
continue
}
// if template_path is not set at the stream level, default to "stream.yml.hbs"
if stream.TemplatePath == "" {
stream.TemplatePath = defaultStreamTemplatePath
}

dir := path.Join(dsDir, "agent", "stream")
foundFile, err := findPathAtDirectory(fsys, dir, stream.TemplatePath)
if err != nil {
return err
}
if foundFile == "" {
return errTemplateNotFound
}

return nil
}
}

return nil
}

// findPathAtDirectory looks for a file matching the templatePath in the given directory (dir)
// It checks for exact matches, files ending with the templatePath, or templatePath + ".link"
func findPathAtDirectory(fsys fspath.FS, dir, templatePath string) (string, error) {
// Check for exact match, files ending with stream.TemplatePath, or stream.TemplatePath + ".link"
entries, err := fs.ReadDir(fsys, dir)
if err != nil {
return "", err
}

// Filter matches to ensure they match our criteria:
// 1. Exact name match
// 2. Ends with stream.TemplatePath
// 3. Equals stream.TemplatePath + ".link"
var foundFile string
for _, entry := range entries {
name := entry.Name()
if name == templatePath || name == templatePath+".link" {
foundFile = path.Join(dir, name)
break
}
// fallback to check for suffix match, in case the path is prefixed
if strings.HasSuffix(name, templatePath) || strings.HasSuffix(name, templatePath+".link") {
foundFile = path.Join(dir, name)
break
Expand Down
Loading