Skip to content

Commit 50675cc

Browse files
authored
Add fleet reserved variable validation (#1134)
Adds validation around the inclusion of use_apm, and data_stream.dataset variable overrides, which fleet already injects automatically if not present. For use_apm, it checks that it is is of type bool and it is used only on otelcol inputs of streams that are traces or dynamic_signal_types. For data_stream.dataset it checks that it is of type text.
1 parent bf511a4 commit 50675cc

30 files changed

Lines changed: 805 additions & 0 deletions

File tree

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package semantic
6+
7+
import (
8+
"fmt"
9+
"io/fs"
10+
"path"
11+
"slices"
12+
"strings"
13+
14+
"gopkg.in/yaml.v3"
15+
16+
"github.com/elastic/package-spec/v3/code/go/internal/fspath"
17+
"github.com/elastic/package-spec/v3/code/go/pkg/specerrors"
18+
)
19+
20+
const (
21+
useAPMVarName = "use_apm"
22+
datasetVarName = "data_stream.dataset"
23+
tracesDataStreamType = "traces"
24+
)
25+
26+
// varScope identifies where in a package manifest a variable is declared.
27+
type varScope string
28+
29+
const (
30+
scopeRoot varScope = "root" // package-level vars
31+
scopePolicyTemplate varScope = "policy template" // policy_templates[].vars
32+
scopeInput varScope = "input" // policy_templates[].inputs[].vars
33+
scopeStream varScope = "stream" // data stream stream entries (the valid scope for reserved vars)
34+
)
35+
36+
// reservedVarRule defines the constraints for a single Fleet-reserved variable.
37+
// Adding a new reserved variable is a matter of adding an entry to reservedVarRules.
38+
type reservedVarRule struct {
39+
// allowedScopes lists the scopes where this variable may be declared.
40+
allowedScopes []varScope
41+
// validate checks type and eligibility constraints at an allowed scope.
42+
// May be nil if only scope enforcement is needed.
43+
validate func(v varDef, ctx varValidationContext) specerrors.ValidationErrors
44+
}
45+
46+
// isAllowedAt reports whether scope is in the rule's allowedScopes.
47+
func (r reservedVarRule) isAllowedAt(scope varScope) bool {
48+
return slices.Contains(r.allowedScopes, scope)
49+
}
50+
51+
// scopeViolationMsg returns a human-readable description of where the variable
52+
// must be declared, derived from allowedScopes (e.g. "stream level" or
53+
// "root or stream level").
54+
func (r reservedVarRule) scopeViolationMsg() string {
55+
names := make([]string, len(r.allowedScopes))
56+
for i, s := range r.allowedScopes {
57+
names[i] = string(s) + " level"
58+
}
59+
return strings.Join(names, " or ")
60+
}
61+
62+
var reservedVarRules = map[string]reservedVarRule{
63+
useAPMVarName: {allowedScopes: []varScope{scopeStream}, validate: validateUseAPMVar},
64+
datasetVarName: {allowedScopes: []varScope{scopeStream}, validate: validateDatasetVar},
65+
}
66+
67+
type varDef struct {
68+
Name string `yaml:"name"`
69+
Type string `yaml:"type"`
70+
}
71+
72+
// varValidationContext carries all available context at the point a variable is
73+
// validated. filePath, contextStr, manifest, and scope are always set.
74+
// policyTemplate is set at scopePolicyTemplate, scopeInput, and scopeStream
75+
// for input packages. stream is set at scopeStream only.
76+
type varValidationContext struct {
77+
manifest packageManifest
78+
scope varScope
79+
filePath string
80+
contextStr string
81+
policyTemplate *policyTemplate
82+
stream *normalizedStream
83+
}
84+
85+
// normalizedStream holds stream-specific fields used for content validation.
86+
// It is a unified representation regardless of whether the stream originates
87+
// from an input package policy template or an integration data stream manifest,
88+
// mirroring Fleet's own stream normalization approach.
89+
type normalizedStream struct {
90+
inputType string
91+
dataStreamType string
92+
dynamicSignalTypes bool
93+
}
94+
95+
// policyInput parses the type and vars from an integration policy template input entry.
96+
type policyInput struct {
97+
Type string `yaml:"type"`
98+
Vars []varDef `yaml:"vars"`
99+
}
100+
101+
// policyTemplate parses fields from a package policy template. Input (singular)
102+
// is used by input packages; Inputs (plural) is used by integration packages.
103+
type policyTemplate struct {
104+
Name string `yaml:"name"`
105+
Input string `yaml:"input"`
106+
Type string `yaml:"type"`
107+
DynamicSignalTypes bool `yaml:"dynamic_signal_types"`
108+
Vars []varDef `yaml:"vars"`
109+
Inputs []policyInput `yaml:"inputs"`
110+
}
111+
112+
// packageManifest captures all fields needed for reserved variable validation
113+
// across both input and integration packages.
114+
type packageManifest struct {
115+
Type string `yaml:"type"`
116+
Vars []varDef `yaml:"vars"`
117+
PolicyTemplates []policyTemplate `yaml:"policy_templates"`
118+
}
119+
120+
// streamEntry parses the fields needed from a data stream manifest stream entry.
121+
type streamEntry struct {
122+
Input string `yaml:"input"`
123+
DynamicSignalTypes bool `yaml:"dynamic_signal_types"`
124+
Vars []varDef `yaml:"vars"`
125+
}
126+
127+
// ValidateFleetReservedVars validates that Fleet-reserved variables, when
128+
// explicitly defined in package manifests, conform to Fleet's expectations.
129+
func ValidateFleetReservedVars(fsys fspath.FS) specerrors.ValidationErrors {
130+
manifestPath := "manifest.yml"
131+
data, err := fs.ReadFile(fsys, manifestPath)
132+
if err != nil {
133+
return specerrors.ValidationErrors{
134+
specerrors.NewStructuredErrorf("file \"%s\" is invalid: failed to read manifest: %w", fsys.Path(manifestPath), err)}
135+
}
136+
137+
var manifest packageManifest
138+
if err := yaml.Unmarshal(data, &manifest); err != nil {
139+
return specerrors.ValidationErrors{
140+
specerrors.NewStructuredErrorf("file \"%s\" is invalid: failed to parse manifest: %w", fsys.Path(manifestPath), err)}
141+
}
142+
143+
switch manifest.Type {
144+
case inputPackageType:
145+
return validateInputReservedVars(fsys, manifest)
146+
case integrationPackageType:
147+
return validateIntegrationReservedVars(fsys, manifest)
148+
}
149+
150+
return nil
151+
}
152+
153+
// reservedVarChecker accumulates validation errors while walking the var
154+
// declarations in a package manifest. Its check method is the single call
155+
// site for all reserved variable validation, regardless of scope.
156+
type reservedVarChecker struct {
157+
errs specerrors.ValidationErrors
158+
}
159+
160+
// check validates every var in vars using the provided context.
161+
func (c *reservedVarChecker) check(vars []varDef, ctx varValidationContext) {
162+
for _, v := range vars {
163+
c.errs = append(c.errs, validateReservedVar(v, ctx)...)
164+
}
165+
}
166+
167+
func validateInputReservedVars(fsys fspath.FS, manifest packageManifest) specerrors.ValidationErrors {
168+
filePath := fsys.Path("manifest.yml")
169+
c := &reservedVarChecker{}
170+
171+
c.check(manifest.Vars, varValidationContext{
172+
manifest: manifest,
173+
scope: scopeRoot,
174+
filePath: filePath,
175+
contextStr: "package root vars",
176+
})
177+
178+
// Policy template vars are stream context for input packages: Fleet synthesizes
179+
// a data stream from each policy template, placing these vars into streams[0].vars.
180+
for _, pt := range manifest.PolicyTemplates {
181+
stream := normalizeInputStream(pt)
182+
c.check(pt.Vars, varValidationContext{
183+
manifest: manifest,
184+
scope: scopeStream,
185+
filePath: filePath,
186+
contextStr: fmt.Sprintf("policy template %q", pt.Name),
187+
policyTemplate: &pt,
188+
stream: &stream,
189+
})
190+
}
191+
192+
return c.errs
193+
}
194+
195+
func validateIntegrationReservedVars(fsys fspath.FS, manifest packageManifest) specerrors.ValidationErrors {
196+
filePath := fsys.Path("manifest.yml")
197+
c := &reservedVarChecker{}
198+
199+
c.check(manifest.Vars, varValidationContext{
200+
manifest: manifest,
201+
scope: scopeRoot,
202+
filePath: filePath,
203+
contextStr: "package root vars",
204+
})
205+
206+
for _, pt := range manifest.PolicyTemplates {
207+
c.check(pt.Vars, varValidationContext{
208+
manifest: manifest,
209+
scope: scopePolicyTemplate,
210+
filePath: filePath,
211+
contextStr: fmt.Sprintf("policy template %q vars", pt.Name),
212+
policyTemplate: &pt,
213+
})
214+
for _, input := range pt.Inputs {
215+
c.check(input.Vars, varValidationContext{
216+
manifest: manifest,
217+
scope: scopeInput,
218+
filePath: filePath,
219+
contextStr: fmt.Sprintf("policy template %q input %q vars", pt.Name, input.Type),
220+
policyTemplate: &pt,
221+
})
222+
}
223+
}
224+
225+
dataStreams, err := listDataStreams(fsys)
226+
if err != nil {
227+
return append(c.errs, specerrors.NewStructuredErrorf("failed to list data streams: %w", err))
228+
}
229+
230+
for _, ds := range dataStreams {
231+
manifestPath := path.Join(dataStreamDir, ds, "manifest.yml")
232+
data, err := fs.ReadFile(fsys, manifestPath)
233+
if err != nil {
234+
c.errs = append(c.errs, specerrors.NewStructuredErrorf(
235+
"file \"%s\" is invalid: failed to read manifest: %w", fsys.Path(manifestPath), err))
236+
continue
237+
}
238+
239+
var dsManifest struct {
240+
Type string `yaml:"type"`
241+
Streams []streamEntry `yaml:"streams"`
242+
}
243+
if err := yaml.Unmarshal(data, &dsManifest); err != nil {
244+
c.errs = append(c.errs, specerrors.NewStructuredErrorf(
245+
"file \"%s\" is invalid: failed to parse manifest: %w", fsys.Path(manifestPath), err))
246+
continue
247+
}
248+
249+
dsFilePath := fsys.Path(manifestPath)
250+
for _, entry := range dsManifest.Streams {
251+
stream := normalizeIntegrationStream(entry, dsManifest.Type)
252+
c.check(entry.Vars, varValidationContext{
253+
manifest: manifest,
254+
scope: scopeStream,
255+
filePath: dsFilePath,
256+
contextStr: fmt.Sprintf("stream with input type %q", entry.Input),
257+
stream: &stream,
258+
})
259+
}
260+
}
261+
262+
return c.errs
263+
}
264+
265+
func normalizeInputStream(pt policyTemplate) normalizedStream {
266+
return normalizedStream{
267+
inputType: pt.Input,
268+
dataStreamType: pt.Type,
269+
dynamicSignalTypes: pt.DynamicSignalTypes,
270+
}
271+
}
272+
273+
func normalizeIntegrationStream(entry streamEntry, dsType string) normalizedStream {
274+
return normalizedStream{
275+
inputType: entry.Input,
276+
dataStreamType: dsType,
277+
dynamicSignalTypes: entry.DynamicSignalTypes,
278+
}
279+
}
280+
281+
// validateReservedVar is the single entry point for all reserved variable
282+
// validation. It checks scope constraints via the rule's allowedScopes and,
283+
// when the scope is valid, runs per-variable content validation.
284+
func validateReservedVar(v varDef, ctx varValidationContext) specerrors.ValidationErrors {
285+
rule, ok := reservedVarRules[v.Name]
286+
if !ok {
287+
return nil
288+
}
289+
290+
var errs specerrors.ValidationErrors
291+
292+
if !rule.isAllowedAt(ctx.scope) {
293+
errs = append(errs, specerrors.NewStructuredErrorf(
294+
"file \"%s\" is invalid: %s: variable \"%s\" must only be declared at %s",
295+
ctx.filePath, ctx.contextStr, v.Name, rule.scopeViolationMsg()))
296+
}
297+
298+
if rule.isAllowedAt(ctx.scope) && rule.validate != nil {
299+
errs = append(errs, rule.validate(v, ctx)...)
300+
}
301+
302+
return errs
303+
}
304+
305+
// validateUseAPMVar enforces that use_apm is:
306+
// - defined on an otelcol input
307+
// - of type "bool"
308+
// - only present on "traces" data streams or when "dynamic_signal_types" is true
309+
func validateUseAPMVar(v varDef, ctx varValidationContext) specerrors.ValidationErrors {
310+
stream := ctx.stream
311+
var errs specerrors.ValidationErrors
312+
313+
if stream.inputType != otelcolInputType {
314+
errs = append(errs, newReservedVarError(ctx.filePath, ctx.contextStr, useAPMVarName,
315+
fmt.Sprintf("%q input", otelcolInputType), fmt.Sprintf("%q", stream.inputType)))
316+
}
317+
if v.Type != "bool" {
318+
errs = append(errs, newReservedVarError(ctx.filePath, ctx.contextStr, useAPMVarName,
319+
`type "bool"`, fmt.Sprintf("%q", v.Type)))
320+
}
321+
if stream.dataStreamType != tracesDataStreamType && !stream.dynamicSignalTypes {
322+
errs = append(errs, newReservedVarError(ctx.filePath, ctx.contextStr, useAPMVarName,
323+
fmt.Sprintf("%q data stream type or \"dynamic_signal_types: true\"", tracesDataStreamType),
324+
fmt.Sprintf("%q data stream type", stream.dataStreamType)))
325+
}
326+
327+
return errs
328+
}
329+
330+
// validateDatasetVar enforces that data_stream.dataset is of type "text".
331+
func validateDatasetVar(v varDef, ctx varValidationContext) specerrors.ValidationErrors {
332+
if v.Type != "text" {
333+
return specerrors.ValidationErrors{
334+
newReservedVarError(ctx.filePath, ctx.contextStr, datasetVarName,
335+
`type "text"`, fmt.Sprintf("%q", v.Type)),
336+
}
337+
}
338+
return nil
339+
}
340+
341+
// newReservedVarError constructs a validation error for a Fleet-reserved
342+
// variable that doesn't satisfy its constraint. expected describes the
343+
// requirement (e.g. `type "bool"`) and got is the actual value found.
344+
func newReservedVarError(filePath, contextStr, varName, expected, got string) *specerrors.StructuredError {
345+
return specerrors.NewStructuredErrorf(
346+
"file \"%s\" is invalid: %s: variable \"%s\" must be %s, got %s",
347+
filePath, contextStr, varName, expected, got)
348+
}

0 commit comments

Comments
 (0)