diff --git a/build/config/condition.libsonnet b/build/config/condition.libsonnet index 57009058..e5967038 100644 --- a/build/config/condition.libsonnet +++ b/build/config/condition.libsonnet @@ -13,6 +13,10 @@ }, }, ip: { + valid(key, negate=false): { + type: 'ip', + settings: { key: key, type: 'valid', negate: negate }, + }, loopback(key, negate=false): { type: 'ip', settings: { key: key, type: 'loopback', negate: negate }, diff --git a/build/config/ip_database.libsonnet b/build/config/ip_database.libsonnet new file mode 100644 index 00000000..e45b6851 --- /dev/null +++ b/build/config/ip_database.libsonnet @@ -0,0 +1,14 @@ +{ + ip2location(database): { + type: 'ip2location', + settings: { database: database }, + }, + maxmind_asn(database, language='en'): { + type: 'maxmind_asn', + settings: { database: database, language: language }, + }, + maxmind_city(database, language='en'): { + type: 'maxmind_city', + settings: { database: database, language: language }, + }, +} diff --git a/build/config/patterns.libsonnet b/build/config/patterns.libsonnet index 235b3cd9..5538a77d 100644 --- a/build/config/patterns.libsonnet +++ b/build/config/patterns.libsonnet @@ -1,6 +1,7 @@ // functions in this file contain pre-configured conditions and processors that represent commonly used patterns across many data pipelines. local conditionlib = import './condition.libsonnet'; +local ipdatabaselib = import './ip_database.libsonnet'; local processlib = import './process.libsonnet'; { @@ -56,4 +57,21 @@ local processlib = import './process.libsonnet'; }, ], }, + ip_database: { + // performs lookup for any valid, public IP address in any IP enrichment database + lookup_public_address(input, output, db_options): [{ + local conditions = [ + conditionlib.ip.valid(input), + conditionlib.ip.loopback(input, negate=true), + conditionlib.ip.multicast(input, negate=true), + conditionlib.ip.multicast_link_local(input, negate=true), + conditionlib.ip.private(input, negate=true), + conditionlib.ip.unicast_link_local(input, negate=true), + conditionlib.ip.unspecified(input, negate=true), + ], + processors: [ + processlib.ip_database(input=input, output=output, database_options=db_options, condition_operator='and', condition_inspectors=conditions), + ], + }], + }, } diff --git a/build/config/process.libsonnet b/build/config/process.libsonnet index 2ec2c897..ddb00782 100644 --- a/build/config/process.libsonnet +++ b/build/config/process.libsonnet @@ -100,6 +100,20 @@ input_key: input, }, }, + dns(input, + output, + _function, + timeout=1000, + condition_operator='', + condition_inspectors=[]): { + type: 'dns', + settings: { + options: { 'function': _function, timeout: timeout }, + condition: { operator: condition_operator, inspectors: condition_inspectors }, + input_key: input, + output_key: output, + }, + }, domain(input, output, _function, @@ -216,6 +230,19 @@ output_key: output, }, }, + ip_database(input, + output, + database_options, + condition_operator='', + condition_inspectors=[]): { + type: 'ip_database', + settings: { + options: { 'function': database_options.type, database_options: database_options }, + condition: { operator: condition_operator, inspectors: condition_inspectors }, + input_key: input, + output_key: output, + }, + }, lambda(input, output, _function, diff --git a/condition/ip.go b/condition/ip.go index f73e925a..4238dd7b 100644 --- a/condition/ip.go +++ b/condition/ip.go @@ -18,8 +18,10 @@ IP evaluates IP addresses by their type and usage. This inspector uses the stand The inspector has these settings: Type: - IP address type used during inspection - must be one of: + IP address type used during inspection. + + Must be one of: + valid: valid address of any type loopback: valid loopback address multicast: valid multicast address multicast_link_local: valid link local multicast address @@ -28,10 +30,11 @@ The inspector has these settings: unicast_link_local: valid link local unicast address unspecified: valid "unspecified" address (e.g., 0.0.0.0, ::) Key (optional): - JSON key-value to retrieve for inspection + JSON key-value to retrieve for inspection. Negate (optional): - if set to true, then the inspection is negated (i.e., true becomes false, false becomes true) - defaults to false + If set to true, then the inspection is negated (i.e., true becomes false, false becomes true). + + Defaults to false. The inspector supports these patterns: @@ -66,9 +69,10 @@ func (c IP) Inspect(ctx context.Context, capsule config.Capsule) (output bool, e } ip := net.ParseIP(check) - var matched bool switch s := c.Type; s { + case "valid": + matched = ip != nil case "loopback": matched = ip.IsLoopback() case "multicast": diff --git a/condition/ip_test.go b/condition/ip_test.go index e866c718..7af07de5 100644 --- a/condition/ip_test.go +++ b/condition/ip_test.go @@ -22,6 +22,22 @@ var ipTests = []struct { []byte(`{"ip_address":"192.168.1.2"}`), true, }, + { + "valid", + IP{ + Type: "valid", + }, + []byte("192.168.1.2"), + true, + }, + { + "invalid", + IP{ + Type: "valid", + }, + []byte("foo"), + false, + }, { "multicast", IP{ diff --git a/internal/ip/database/database.go b/internal/ip/database/database.go index cec35c5f..f2f21a4f 100644 --- a/internal/ip/database/database.go +++ b/internal/ip/database/database.go @@ -5,6 +5,7 @@ import ( "context" "fmt" + "github.com/brexhq/substation/config" "github.com/brexhq/substation/internal/errors" "github.com/brexhq/substation/internal/ip" ) @@ -15,21 +16,28 @@ const errInvalidFactoryInput = errors.Error("invalid factory input") // OpenCloser provides tools for opening, closing, and getting values from IP address enrichment databases. type OpenCloser interface { ip.Getter - Open(context.Context, string) error + Open(context.Context) error Close() error IsEnabled() bool } // Factory returns an OpenCloser. The returned OpenCloser must be opened before it can be used. -func Factory(db string) (OpenCloser, error) { - switch db { +// func Factory(db string) (OpenCloser, error) { +func Factory(cfg config.Config) (OpenCloser, error) { + switch t := cfg.Type; t { case "ip2location": - return &IP2Location{}, nil + var db IP2Location + _ = config.Decode(cfg.Settings, &db) + return &db, nil case "maxmind_asn": - return &MaxMindASN{}, nil + var db MaxMindASN + _ = config.Decode(cfg.Settings, &db) + return &db, nil case "maxmind_city": - return &MaxMindCity{}, nil + var db MaxMindCity + _ = config.Decode(cfg.Settings, &db) + return &db, nil default: - return nil, fmt.Errorf("database %s: %v", db, errInvalidFactoryInput) + return nil, fmt.Errorf("database %s: %v", t, errInvalidFactoryInput) } } diff --git a/internal/ip/database/example_test.go b/internal/ip/database/example_test.go index 3cf9993a..2511e70a 100644 --- a/internal/ip/database/example_test.go +++ b/internal/ip/database/example_test.go @@ -9,12 +9,12 @@ import ( ) func Example_iP2Location() { - // the location of the enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL - location := "location://path/to/ip2location.bin" - // create IP2Location container, open database, and close database when function returns - ip2loc := database.IP2Location{} - if err := ip2loc.Open(context.TODO(), location); err != nil { + ip2loc := database.IP2Location{ + Database: "location://path/to/ip2location.bin", + } + + if err := ip2loc.Open(context.TODO()); err != nil { // handle error panic(err) } @@ -34,12 +34,12 @@ func Example_iP2Location() { } func Example_maxMindCity() { - // the location of the enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL - location := "location://path/to/maxmind.mmdb" - // create MaxMind City container, open database, and close database when function returns - mm := database.MaxMindCity{} - if err := mm.Open(context.TODO(), location); err != nil { + mm := database.MaxMindCity{ + Database: "location://path/to/maxmind.mmdb", + } + + if err := mm.Open(context.TODO()); err != nil { // handle error panic(err) } diff --git a/internal/ip/database/ip2location.go b/internal/ip/database/ip2location.go index e6ba9be5..2092f5f7 100644 --- a/internal/ip/database/ip2location.go +++ b/internal/ip/database/ip2location.go @@ -11,7 +11,8 @@ import ( // IP2Location provides read access to an IP2Location binary database. type IP2Location struct { - db *ip2location.DB + Database string `json:"database"` + db *ip2location.DB } // IsEnabled returns true if the database is open and ready for use. @@ -20,8 +21,8 @@ func (d *IP2Location) IsEnabled() bool { } // Open retrieves the database and opens it for querying. The location of the database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL. -func (d *IP2Location) Open(ctx context.Context, location string) error { - path, err := file.Get(ctx, location) +func (d *IP2Location) Open(ctx context.Context) error { + path, err := file.Get(ctx, d.Database) defer os.Remove(path) if err != nil { diff --git a/internal/ip/database/maxmind.go b/internal/ip/database/maxmind.go index abe7b714..e65810ba 100644 --- a/internal/ip/database/maxmind.go +++ b/internal/ip/database/maxmind.go @@ -11,19 +11,11 @@ import ( "github.com/oschwald/geoip2-golang" ) -// GetMaxMindLanguage configures the language that is used when reading values from MaxMind databases. The value is retrieved from the MAXMIND_LANGUAGE environment variable. If the environment variable is missing, then the default language is English. -func GetMaxMindLanguage() string { - lang, ok := os.LookupEnv("MAXMIND_LANGUAGE") - if !ok { - return "en" - } - return lang -} - // MaxMindASN provides read access to MaxMind ASN database. type MaxMindASN struct { + Database string `json:"database"` + Language string `json:"language"` db *geoip2.Reader - language string } // IsEnabled returns true if the database is open and ready for use. @@ -32,10 +24,13 @@ func (d *MaxMindASN) IsEnabled() bool { } // Open retrieves the database and opens it for querying. The location of the database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL. MaxMind language support is provided by calling GetMaxMindLanguage to retrieve a user-configured language. -func (d *MaxMindASN) Open(ctx context.Context, location string) error { - d.language = GetMaxMindLanguage() +func (d *MaxMindASN) Open(ctx context.Context) error { + // language defaults to English + if d.Language == "" { + d.Language = "en" + } - path, err := file.Get(ctx, location) + path, err := file.Get(ctx, d.Database) defer os.Remove(path) if err != nil { @@ -84,8 +79,9 @@ func (d *MaxMindASN) Get(addr string) (*ip.EnrichmentRecord, error) { // MaxMindCity provides read access to a MaxMind City database. type MaxMindCity struct { + Database string `json:"database"` + Language string `json:"language"` db *geoip2.Reader - language string } // IsEnabled returns true if the database is open and ready for use. @@ -94,10 +90,13 @@ func (d *MaxMindCity) IsEnabled() bool { } // Open retrieves the database and opens it for querying. MaxMind language support is provided by calling GetMaxMindLanguage to retrieve a user-configured language. -func (d *MaxMindCity) Open(ctx context.Context, location string) error { - d.language = GetMaxMindLanguage() +func (d *MaxMindCity) Open(ctx context.Context) error { + // language defaults to English + if d.Language == "" { + d.Language = "en" + } - path, err := file.Get(ctx, location) + path, err := file.Get(ctx, d.Database) defer os.Remove(path) if err != nil { @@ -140,9 +139,9 @@ func (d *MaxMindCity) Get(addr string) (*ip.EnrichmentRecord, error) { Latitude: float32(resp.Location.Latitude), Longitude: float32(resp.Location.Longitude), }, - Continent: resp.Continent.Names[d.language], - Country: resp.Country.Names[d.language], - City: resp.City.Names[d.language], + Continent: resp.Continent.Names[d.Language], + Country: resp.Country.Names[d.Language], + City: resp.City.Names[d.Language], PostalCode: resp.Postal.Code, Accuracy: float32(resp.Location.AccuracyRadius), TimeZone: resp.Location.TimeZone, diff --git a/process/example_test.go b/process/example_test.go index c12e5b91..30deb50d 100644 --- a/process/example_test.go +++ b/process/example_test.go @@ -51,10 +51,6 @@ func Example_iPDatabase() { capsule := config.NewCapsule() capsule.SetData([]byte(`{"ip":"8.8.8.8"}`)) - // the location of the IP enrichment database must be provided by environment variable and can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL - // _ = os.Setenv("MAXMIND_ASN", "location://path/to/maxmind.mmdb") - // _ = os.Setenv("MAXMIND_CITY", "location://path/to/maxmind.mmdb") - // in native Substation applications configuration is handled by compiling Jsonnet and loading JSON into the application cfg := []config.Config{ { @@ -64,6 +60,14 @@ func Example_iPDatabase() { "output_key": "as", "options": map[string]interface{}{ "function": "maxmind_asn", + "database_options": map[string]interface{}{ + "type": "maxmind_asn", + "settings": map[string]interface{}{ + // the location of the IP enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL + "database": "location://path/to/maxmind.mmdb", + "language": "en", + }, + }, }, }, }, @@ -74,6 +78,14 @@ func Example_iPDatabase() { "output_key": "geo", "options": map[string]interface{}{ "function": "maxmind_city", + "database_options": map[string]interface{}{ + "type": "maxmind_city", + "settings": map[string]interface{}{ + // the location of the IP enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL + "database": "location://path/to/maxmind.mmdb", + "language": "en", + }, + }, }, }, }, diff --git a/process/ip_database.go b/process/ip_database.go index 49886f8b..25c79901 100644 --- a/process/ip_database.go +++ b/process/ip_database.go @@ -3,8 +3,6 @@ package process import ( "context" "fmt" - "os" - "strings" "github.com/brexhq/substation/condition" "github.com/brexhq/substation/config" @@ -14,7 +12,7 @@ import ( var ipDatabases = make(map[string]ipdb.OpenCloser) /* -IPDatabase processes data by querying IP addresses in enrichment databases, including geographic location (geo) and autonomous system (asn) databases. The processor supports multiple database providers by contextually retrieving and loading databases using environment variables and can be reused if multiple databases need to be queried. +IPDatabase processes data by querying IP addresses in enrichment databases, including geographic location (geo) and autonomous system (asn) databases. The processor supports multiple database providers and can be reused if multiple databases need to be queried. IP address information is abstracted from each enrichment database into a single record that contains these categories: @@ -62,15 +60,18 @@ IPDatabaseOptions contains custom options for the IPDatabase processor. Function: Selects the enrichment database queried by the processor. - The database is contextually retrieved using an environment variable and lazy loaded on first invocation. Each environment variable should contain the location of the database, which can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL. + The database is lazy loaded on first invocation and can be loaded from a path on local disk, an HTTP(S) URL, or an AWS S3 URL. Must be one of: - ip2location (IP2LOCATION) - maxmind_asn (MAXMIND_ASN) - maxmind_city (MAXMIND_CITY) + ip2location + maxmind_asn + maxmind_city + Options: + Configuration passed directly to the internal IP database package. Similar to processors, each database has its own config requirements. See internal/ip/database for more information. */ type IPDatabaseOptions struct { - Function string `json:"function"` + Function string `json:"function"` + DatabaseOptions config.Config `json:"database_options"` } // Close closes enrichment database resources opened by the IPDatabase processor. @@ -107,19 +108,18 @@ func (p IPDatabase) Apply(ctx context.Context, capsule config.Capsule) (config.C } // lazy load IP enrichment database - // location of the database is set by environment variable + // db must go into the map after opening to avoid race conditions if _, ok := ipDatabases[p.Options.Function]; !ok { - location := os.Getenv(strings.ToUpper(p.Options.Function)) - - db, err := ipdb.Factory(p.Options.Function) + db, err := ipdb.Factory(p.Options.DatabaseOptions) if err != nil { return capsule, fmt.Errorf("process ip_database: %v", err) } - ipDatabases[p.Options.Function] = db - if err := ipDatabases[p.Options.Function].Open(ctx, location); err != nil { + if err := db.Open(ctx); err != nil { return capsule, fmt.Errorf("process ip_database: %v", err) } + + ipDatabases[p.Options.Function] = db } res := capsule.Get(p.InputKey).String() diff --git a/process/process.go b/process/process.go index 65e71b88..10c0129c 100644 --- a/process/process.go +++ b/process/process.go @@ -174,7 +174,6 @@ func ApplicatorFactory(cfg config.Config) (Applicator, error) { case "split": var p Split _ = config.Decode(cfg.Settings, &p) - return p, nil case "time": var p Time