Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions build/config/patterns.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,30 @@ local ipdatabaselib = import './ip_database.libsonnet';
local processlib = import './process.libsonnet';

{
dns: {
// writes results from the Team Cymru Malware Hash Registry to '!metadata team_cymru_mhr'
// this pattern will cause significant data latency in a data pipeline and should be used in combination with a caching deployment pattern
// https://www.team-cymru.com/mhr
query_team_cymru_mhr(input): [{
processors: [
// creates the MHR query domain by concatenating the input with the MHR service domain
processlib.copy(input=input, output='!metadata query_team_cymru_mhr.-1'),
processlib.insert(output='!metadata query_team_cymru_mhr.-1', value='hash.cymru.com'),
processlib.concat(input='!metadata query_team_cymru_mhr', output='!metadata query_team_cymru_mhr', separator='.'),
// performs MHR query and parses returned value `["epoch" "hits"]` into JSON `{"team_cymru":{"epoch":"", "hits":""}}`
processlib.dns(input='!metadata query_team_cymru_mhr', output='!metadata response_team_cymru_mhr', _function='query_txt'),
processlib.split(input='!metadata response_team_cymru_mhr.0', output='!metadata response_team_cymru_mhr', separator=' '),
processlib.copy(input='!metadata response_team_cymru_mhr.0', output='!metadata team_cymru_mhr.epoch'),
processlib.copy(input='!metadata response_team_cymru_mhr.1', output='!metadata team_cymru_mhr.hits'),
// converts JSON values from strings into integers
processlib.convert(input='!metadata team_cymru_mhr.epoch', output='!metadata team_cymru_mhr.epoch', type='int'),
processlib.convert(input='!metadata team_cymru_mhr.hits', output='!metadata team_cymru_mhr.hits', type='int'),
// delete remaining keys
processlib.delete(input='!metadata query_team_cymru_mhr'),
processlib.delete(input='!metadata response_team_cymru_mhr'),
],
}],
},
drop: {
// drops randomly selected data. this can be useful for integration tests.
random_data: [{
Expand Down
3 changes: 2 additions & 1 deletion build/config/process.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@
output,
_function,
timeout=1000,
error_on_failure=false,
condition_operator='',
condition_inspectors=[]): {
type: 'dns',
settings: {
options: { 'function': _function, timeout: timeout },
options: { 'function': _function, timeout: timeout, error_on_failure: error_on_failure },
condition: { operator: condition_operator, inspectors: condition_inspectors },
input_key: input,
output_key: output,
Expand Down
23 changes: 15 additions & 8 deletions process/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ DNSOptions contains custom options for the DNS processor.
Amount of time to wait (in milliseconds) for a response.

Defaults to 1000 milliseconds (1 second).

ErrorOnFailure (optional):
If set to true, then errors from the DNS request will cause the processor to fail.

Defaults to false.
*/
type DNSOptions struct {
Function string `json:"function"`
Timeout int `json:"timeout"`
Function string `json:"function"`
Timeout int `json:"timeout"`
ErrorOnFailure bool `json:"error_on_failure"`
}

// Close closes resources opened by the DNS processor.
Expand All @@ -86,6 +92,7 @@ func (p DNS) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]confi
}

// Apply processes encapsulated data with the DNS processor.
//nolint: gocognit
func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error) {
// error early if required options are missing
if p.Options.Function == "" {
Expand All @@ -110,7 +117,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
switch p.Options.Function {
case "forward_lookup":
addrs, err := dnsResolver.LookupHost(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand All @@ -121,7 +128,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
return capsule, nil
case "reverse_lookup":
names, err := dnsResolver.LookupAddr(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand All @@ -132,7 +139,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
return capsule, nil
case "query_txt":
records, err := dnsResolver.LookupTXT(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand All @@ -153,7 +160,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
switch p.Options.Function {
case "forward_lookup":
addrs, err := dnsResolver.LookupHost(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand All @@ -163,7 +170,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
return capsule, nil
case "reverse_lookup":
names, err := dnsResolver.LookupAddr(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand All @@ -172,7 +179,7 @@ func (p DNS) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule,
return capsule, nil
case "query_txt":
records, err := dnsResolver.LookupTXT(resolverCtx, res)
if err != nil {
if err != nil && p.Options.ErrorOnFailure {
return capsule, fmt.Errorf("process dns: %v", err)
}

Expand Down