diff --git a/plugins/cisco_asa/.CHECKSUM b/plugins/cisco_asa/.CHECKSUM index 0ffcc2dc2d..fadac40637 100644 --- a/plugins/cisco_asa/.CHECKSUM +++ b/plugins/cisco_asa/.CHECKSUM @@ -1,12 +1,16 @@ { - "spec": "f42e20a913cfe638a9c2793035fa079e", - "manifest": "6ab898bbdce4d40bf420002e5162b9ac", - "setup": "eca16a8087075a1120ff61fa5f217c5e", + "spec": "64fb87c7d1f66aefd4e7f7614a19f9af", + "manifest": "e2419ec1df006ce82cf0909b1bb65bff", + "setup": "48492ed5afb320410fc6d17a1ca24f07", "schemas": [ { "identifier": "add_address_to_group/schema.py", "hash": "28866c00bc5f8c943458f0b6c5176647" }, + { + "identifier": "block_host/schema.py", + "hash": "74aeff1b3af9a4b757210a741b75bb29" + }, { "identifier": "check_if_address_object_in_group/schema.py", "hash": "6ab78079cdba4b3af229361568ba0a1a" @@ -19,6 +23,10 @@ "identifier": "delete_address_object/schema.py", "hash": "379b6330742e8b45c24a532b2d03253c" }, + { + "identifier": "get_blocked_hosts/schema.py", + "hash": "2a0caee8d3b1264280da82fceb3d0bba" + }, { "identifier": "remove_address_from_group/schema.py", "hash": "fa9a4bbca8b9b526d23c7f0e02727360" diff --git a/plugins/cisco_asa/bin/icon_cisco_asa b/plugins/cisco_asa/bin/icon_cisco_asa index 6773c9a579..d15cfb356e 100644 --- a/plugins/cisco_asa/bin/icon_cisco_asa +++ b/plugins/cisco_asa/bin/icon_cisco_asa @@ -6,7 +6,7 @@ from sys import argv Name = "Cisco Adaptive Security Appliance" Vendor = "rapid7" -Version = "1.4.2" +Version = "1.5.0" Description = "The Cisco ASA plugin allows you to automate the management of network objects" @@ -36,12 +36,16 @@ def main(): ) self.add_action(actions.AddAddressToGroup()) + self.add_action(actions.BlockHost()) + self.add_action(actions.CheckIfAddressObjectInGroup()) self.add_action(actions.CreateAddressObject()) self.add_action(actions.DeleteAddressObject()) + self.add_action(actions.GetBlockedHosts()) + self.add_action(actions.RemoveAddressFromGroup()) diff --git a/plugins/cisco_asa/help.md b/plugins/cisco_asa/help.md index 16125c1c08..3d626af5f0 100644 --- a/plugins/cisco_asa/help.md +++ b/plugins/cisco_asa/help.md @@ -6,12 +6,18 @@ * Determine if a host is blocked by checking if it's found in an address group applied to a firewall rule * Block and unblock hosts from the firewall through object management +* Block and unblock hosts with the shun command +* Check which hosts are blocked with the shun command # Requirements * Username and Password for an ASA account with the appropriate privilege level for the action * Cisco ASA server with the [REST API server enabled](https://www.cisco.com/c/en/us/td/docs/security/asa/api/qsg-asa-api.html) +# Supported Product Versions + +* 9.13(1) + # Documentation ## Setup @@ -52,6 +58,78 @@ Example input: ### Actions +#### Block Host + +This action is used to block hosts by IP address using the shun command. + +##### Input + +|Name|Type|Default|Required|Description|Enum|Example| +|----|----|-------|--------|-----------|----|-------| +|destination_ip|string|None|False|Destination IP address|None|198.51.100.100| +|destination_port|integer|None|False|Destination port|None|443| +|protocol|string|None|False|IP protocol, for example TCP or UDP|None|TCP| +|shun|boolean|True|True|True to block a host or false to unblock a host using the shun command|None|True| +|source_ip|string|None|True|Source IP address you want to block or unblock|None|198.51.100.100| +|source_port|integer|None|False|Source port|None|443| + +Example input: + +``` +{ + "destination_ip": "198.51.100.100", + "destination_port": 443, + "protocol": "TCP", + "shun": true, + "source_ip": "198.51.100.100", + "source_port": 443 +} +``` + +##### Output + +|Name|Type|Required|Description| +|----|----|--------|-----------| +|success|boolean|True|Whether the block or unblock action was successful| + +Example output: + +``` +{ + "success": true +} +``` + +#### Get Blocked Hosts + +This action is used to get blocked hosts. + +##### Input + +_This action does not contain any inputs._ + +##### Output + +|Name|Type|Required|Description| +|----|----|--------|-----------| +|hosts|[]hosts|True|List of hosts blocked with shun command| + +Example output: + +``` +{ + "hosts": [ + { + "source_ip": "10.1.1.27", + "dest_ip": "10.2.2.89", + "source_port": "444", + "dest_port": "555", + "protocol": "6" + } + ] +} +``` + #### Create Address Object This action is used to create Address Object by the Object IP address. @@ -254,7 +332,7 @@ _This plugin does not contain any triggers._ |----|----|--------|-----------| |Host|host|False|Host| |Kind|string|False|Kind| -|Name|string|False|Name| +|Object Name|string|False|The name of the object| |Object ID|string|False|Object ID| |Self Link|string|False|Self link| @@ -262,8 +340,18 @@ _This plugin does not contain any triggers._ |Name|Type|Required|Description| |----|----|--------|-----------| -|Kind|string|False|Kind| -|Value|string|False|Value| +|Kind|string|False|Kind is the type of object e.g. IPv4Address, IPv4FQDN, etc.| +|Value|string|False|The value of the object. This will be the actual IPv4, IPv6, FQDN, etc. address the object refers to.| + +#### hosts + +|Name|Type|Required|Description| +|----|----|--------|-----------| +|Destination IP|string|False|Destination IP address| +|Destination Port|string|False|Destination port| +|Protocol|string|False|Protocol| +|Source IP|string|False|Source IP address| +|Source Port|string|False|Source port| ## Troubleshooting @@ -271,6 +359,7 @@ _This plugin does not contain any troubleshooting information._ # Version History +* 1.5.0 - Add new actions Get Blocked Hosts and Block Host * 1.4.2 - Add `docs_url` in plugin spec | Update `source_url` in plugin spec * 1.4.1 - Fix None check in actions Add Address to Group and Create Address Object * 1.4.0 - Add new action Create Address Object diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/__init__.py b/plugins/cisco_asa/icon_cisco_asa/actions/__init__.py index 913a9c83f3..1d927b7d47 100644 --- a/plugins/cisco_asa/icon_cisco_asa/actions/__init__.py +++ b/plugins/cisco_asa/icon_cisco_asa/actions/__init__.py @@ -1,6 +1,8 @@ # GENERATED BY KOMAND SDK - DO NOT EDIT from .add_address_to_group.action import AddAddressToGroup +from .block_host.action import BlockHost from .check_if_address_object_in_group.action import CheckIfAddressObjectInGroup from .create_address_object.action import CreateAddressObject from .delete_address_object.action import DeleteAddressObject +from .get_blocked_hosts.action import GetBlockedHosts from .remove_address_from_group.action import RemoveAddressFromGroup diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/block_host/__init__.py b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/__init__.py new file mode 100755 index 0000000000..f9de975d94 --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/__init__.py @@ -0,0 +1,2 @@ +# GENERATED BY KOMAND SDK - DO NOT EDIT +from .action import BlockHost diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/block_host/action.py b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/action.py new file mode 100755 index 0000000000..080e97be40 --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/action.py @@ -0,0 +1,23 @@ +import insightconnect_plugin_runtime +from .schema import BlockHostInput, BlockHostOutput, Input, Output, Component + +# Custom imports below + + +class BlockHost(insightconnect_plugin_runtime.Action): + def __init__(self): + super(self.__class__, self).__init__( + name="block_host", description=Component.DESCRIPTION, input=BlockHostInput(), output=BlockHostOutput() + ) + + def run(self, params={}): + return { + Output.SUCCESS: self.connection.cisco_asa_api.block_host( + params.get(Input.SHUN), + params.get(Input.SOURCE_IP), + params.get(Input.DESTINATION_IP), + params.get(Input.SOURCE_PORT), + params.get(Input.DESTINATION_PORT), + params.get(Input.PROTOCOL), + ) + } diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/block_host/schema.py b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/schema.py new file mode 100755 index 0000000000..c602d92a54 --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/block_host/schema.py @@ -0,0 +1,98 @@ +# GENERATED BY KOMAND SDK - DO NOT EDIT +import insightconnect_plugin_runtime +import json + + +class Component: + DESCRIPTION = "Block hosts by IP address(IPv4) using the shun command" + + +class Input: + DESTINATION_IP = "destination_ip" + DESTINATION_PORT = "destination_port" + PROTOCOL = "protocol" + SHUN = "shun" + SOURCE_IP = "source_ip" + SOURCE_PORT = "source_port" + + +class Output: + SUCCESS = "success" + + +class BlockHostInput(insightconnect_plugin_runtime.Input): + schema = json.loads(""" + { + "type": "object", + "title": "Variables", + "properties": { + "destination_ip": { + "type": "string", + "title": "Destination IP", + "description": "Destination IP address", + "order": 3 + }, + "destination_port": { + "type": "integer", + "title": "Destination Port", + "description": "Destination port", + "order": 5 + }, + "protocol": { + "type": "string", + "title": "Protocol", + "description": "IP protocol, for example TCP or UDP", + "order": 6 + }, + "shun": { + "type": "boolean", + "title": "Shun", + "description": "True to block a host or false to unblock a host using the shun command", + "default": true, + "order": 1 + }, + "source_ip": { + "type": "string", + "title": "Source IP", + "description": "Source IP address you want to block or unblock", + "order": 2 + }, + "source_port": { + "type": "integer", + "title": "Source Port", + "description": "Source port", + "order": 4 + } + }, + "required": [ + "shun", + "source_ip" + ] +} + """) + + def __init__(self): + super(self.__class__, self).__init__(self.schema) + + +class BlockHostOutput(insightconnect_plugin_runtime.Output): + schema = json.loads(""" + { + "type": "object", + "title": "Variables", + "properties": { + "success": { + "type": "boolean", + "title": "Success", + "description": "Whether the block or unblock action was successful", + "order": 1 + } + }, + "required": [ + "success" + ] +} + """) + + def __init__(self): + super(self.__class__, self).__init__(self.schema) diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/__init__.py b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/__init__.py new file mode 100755 index 0000000000..db08d6b056 --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/__init__.py @@ -0,0 +1,2 @@ +# GENERATED BY KOMAND SDK - DO NOT EDIT +from .action import GetBlockedHosts diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/action.py b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/action.py new file mode 100755 index 0000000000..196f58520e --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/action.py @@ -0,0 +1,17 @@ +import insightconnect_plugin_runtime +from .schema import GetBlockedHostsInput, GetBlockedHostsOutput, Output, Component + +# Custom imports below + + +class GetBlockedHosts(insightconnect_plugin_runtime.Action): + def __init__(self): + super(self.__class__, self).__init__( + name="get_blocked_hosts", + description=Component.DESCRIPTION, + input=GetBlockedHostsInput(), + output=GetBlockedHostsOutput(), + ) + + def run(self, params={}): # pylint: disable=unused-argument + return {Output.HOSTS: self.connection.cisco_asa_api.get_blocked_hosts()} diff --git a/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/schema.py b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/schema.py new file mode 100755 index 0000000000..d49580a255 --- /dev/null +++ b/plugins/cisco_asa/icon_cisco_asa/actions/get_blocked_hosts/schema.py @@ -0,0 +1,87 @@ +# GENERATED BY KOMAND SDK - DO NOT EDIT +import insightconnect_plugin_runtime +import json + + +class Component: + DESCRIPTION = "Get hosts blocked with shun command" + + +class Input: + pass + +class Output: + HOSTS = "hosts" + + +class GetBlockedHostsInput(insightconnect_plugin_runtime.Input): + schema = json.loads(""" + {} + """) + + def __init__(self): + super(self.__class__, self).__init__(self.schema) + + +class GetBlockedHostsOutput(insightconnect_plugin_runtime.Output): + schema = json.loads(""" + { + "type": "object", + "title": "Variables", + "properties": { + "hosts": { + "type": "array", + "title": "Hosts", + "description": "List of hosts blocked with shun command", + "items": { + "$ref": "#/definitions/hosts" + }, + "order": 1 + } + }, + "required": [ + "hosts" + ], + "definitions": { + "hosts": { + "type": "object", + "title": "hosts", + "properties": { + "dest_ip": { + "type": "string", + "title": "Destination IP", + "description": "Destination IP address", + "order": 2 + }, + "dest_port": { + "type": "string", + "title": "Destination Port", + "description": "Destination port", + "order": 4 + }, + "protocol": { + "type": "string", + "title": "Protocol", + "description": "Protocol", + "order": 5 + }, + "source_ip": { + "type": "string", + "title": "Source IP", + "description": "Source IP address", + "order": 1 + }, + "source_port": { + "type": "string", + "title": "Source Port", + "description": "Source port", + "order": 3 + } + } + } + } +} + """) + + def __init__(self): + super(self.__class__, self).__init__(self.schema) diff --git a/plugins/cisco_asa/icon_cisco_asa/util/api.py b/plugins/cisco_asa/icon_cisco_asa/util/api.py index 5297761691..8fae2e67ec 100644 --- a/plugins/cisco_asa/icon_cisco_asa/util/api.py +++ b/plugins/cisco_asa/icon_cisco_asa/util/api.py @@ -2,6 +2,7 @@ import requests from collections import OrderedDict from insightconnect_plugin_runtime.exceptions import PluginException +from typing import Optional class CiscoAsaAPI: @@ -74,6 +75,67 @@ def create_address_object(self, name: str, object_type: str, address: str) -> di json_data={"name": name, "host": {"kind": object_type, "value": address}}, ) + def cli(self, commands: list) -> dict: + return self._call_api("POST", "cli", json_data={"commands": commands}) + + def block_host( + self, + shun: str, + source_ip: str, + destination_ip: Optional[str], + source_port: Optional[int], + destination_port: Optional[int], + protocol: Optional[str], + ) -> bool: + if shun: + if not destination_ip: + destination_ip = "0.0.0.0" # nosec + if not source_port: + source_port = 0 + if not destination_port: + destination_port = 0 + if not protocol: + protocol = "0" + self.cli([f"shun {source_ip} {destination_ip} {source_port} {destination_port} {protocol}"]) + else: + self.cli([f"no shun {source_ip}"]) + return True + + def get_blocked_hosts(self) -> list: + response = self.cli(["show shun"]).get("response", []) + blocked_hosts = [] + + if not response or not isinstance(response, list): + return blocked_hosts + + hosts = response[0].split("\n") + for host in hosts: + split_host = host.split(" ") if host != "" else [] + if self._has_data_in_hosts(split_host): + blocked_hosts.append( + { + "source_ip": split_host[2], + "dest_ip": split_host[3], + "source_port": split_host[4], + "dest_port": split_host[5], + "protocol": split_host[6], + } + ) + return blocked_hosts + + @staticmethod + def _has_data_in_hosts(split_host: list) -> bool: + if ( + len(split_host) == 7 # pylint: disable=too-many-boolean-expressions + and split_host[2] + and split_host[3] + and split_host[4] + and split_host[5] + and split_host[6] + ): + return True + return False + def _call_api(self, method: str, path: str, json_data: dict = None, params: dict = None): response = {"text": ""} headers = OrderedDict([("Content-Type", "application/json"), ("User-Agent", self.user_agent)]) @@ -98,7 +160,7 @@ def _call_api(self, method: str, path: str, json_data: dict = None, params: dict if response.status_code >= 400: response_data = response.text raise PluginException(preset=PluginException.Preset.UNKNOWN, data=response_data) - if response.status_code == 201 or response.status_code == 204: + if response.status_code == 201 or response.status_code == 204: # pylint: disable=consider-using-in return {} if 200 <= response.status_code < 300: return response.json() diff --git a/plugins/cisco_asa/plugin.spec.yaml b/plugins/cisco_asa/plugin.spec.yaml index bcc12d997d..843186ba9d 100644 --- a/plugins/cisco_asa/plugin.spec.yaml +++ b/plugins/cisco_asa/plugin.spec.yaml @@ -4,7 +4,8 @@ products: [insightconnect] name: cisco_asa title: Cisco Adaptive Security Appliance description: The Cisco ASA plugin allows you to automate the management of network objects -version: 1.4.2 +version: 1.5.0 +supported_versions: ["9.13(1)"] vendor: rapid7 support: community status: [] @@ -59,6 +60,32 @@ types: type: string description: Self link required: false + hosts: + source_ip: + title: Source IP + description: Source IP address + type: string + required: false + dest_ip: + title: Destination IP + description: Destination IP address + type: string + required: false + source_port: + title: Source Port + description: Source port + type: string + required: false + dest_port: + title: Destination Port + description: Destination port + type: string + required: false + protocol: + title: Protocol + description: Protocol + type: string + required: false connection: credentials: @@ -224,3 +251,59 @@ actions: description: Returns true if object was created type: boolean required: true + get_blocked_hosts: + title: Get Blocked Hosts + description: Get hosts blocked with shun command + output: + hosts: + title: Hosts + description: List of hosts blocked with shun command + type: "[]hosts" + required: true + block_host: + title: Block Host + description: Block hosts by IP address(IPv4) using the shun command + input: + shun: + title: Shun + description: True to block a host or false to unblock a host using the shun command + type: boolean + required: true + default: true + example: true + source_ip: + title: Source IP + description: Source IP address you want to block or unblock + type: string + required: true + example: 198.51.100.100 + destination_ip: + title: Destination IP + description: Destination IP address + type: string + required: false + example: 198.51.100.100 + source_port: + title: Source Port + description: Source port + type: integer + required: false + example: 443 + destination_port: + title: Destination Port + description: Destination port + type: integer + required: false + example: 443 + protocol: + title: Protocol + description: IP protocol, for example TCP or UDP + type: string + required: false + example: TCP + output: + success: + title: Success + description: Whether the block or unblock action was successful + type: boolean + required: true diff --git a/plugins/cisco_asa/requirements.txt b/plugins/cisco_asa/requirements.txt index 519a7752a6..2e89dbae79 100644 --- a/plugins/cisco_asa/requirements.txt +++ b/plugins/cisco_asa/requirements.txt @@ -1,4 +1,5 @@ # List third-party dependencies here, separated by newlines. # All dependencies must be version-pinned, eg. requests==1.2.0 # See: https://pip.pypa.io/en/stable/user_guide/#requirements-files -validators==0.17.0 \ No newline at end of file +validators==0.17.0 +parameterized==0.8.1 diff --git a/plugins/cisco_asa/setup.py b/plugins/cisco_asa/setup.py index a9d29f648f..41cb2dc297 100644 --- a/plugins/cisco_asa/setup.py +++ b/plugins/cisco_asa/setup.py @@ -3,7 +3,7 @@ setup(name="cisco_asa-rapid7-plugin", - version="1.4.2", + version="1.5.0", description="The Cisco ASA plugin allows you to automate the management of network objects", author="rapid7", author_email="", diff --git a/plugins/cisco_asa/unit_test/__init__.py b/plugins/cisco_asa/unit_test/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/cisco_asa/unit_test/payloads/block_host.json.resp b/plugins/cisco_asa/unit_test/payloads/block_host.json.resp new file mode 100644 index 0000000000..64aa124e00 --- /dev/null +++ b/plugins/cisco_asa/unit_test/payloads/block_host.json.resp @@ -0,0 +1,3 @@ +{ + "response" : ["Shun 1.1.1.1 added in context: single_vf\nShun 1.1.1.1 successful\n"] +} diff --git a/plugins/cisco_asa/unit_test/payloads/block_host2.json.resp b/plugins/cisco_asa/unit_test/payloads/block_host2.json.resp new file mode 100644 index 0000000000..d32f7b16d1 --- /dev/null +++ b/plugins/cisco_asa/unit_test/payloads/block_host2.json.resp @@ -0,0 +1,3 @@ +{ + "response" : ["Shun 2.2.2.2 added in context: single_vf\nShun 2.2.2.2 successful\n"] +} diff --git a/plugins/cisco_asa/unit_test/payloads/blocked_hosts.json.resp b/plugins/cisco_asa/unit_test/payloads/blocked_hosts.json.resp new file mode 100644 index 0000000000..a3f4b3f29f --- /dev/null +++ b/plugins/cisco_asa/unit_test/payloads/blocked_hosts.json.resp @@ -0,0 +1,3 @@ +{ + "response" : ["shun (management) 1.1.1.1 2.2.2.2 444 555 6\nshun (management) 3.3.3.3 4.4.4.4 333 444 6\n"] +} diff --git a/plugins/cisco_asa/unit_test/payloads/unblock_host.json.resp b/plugins/cisco_asa/unit_test/payloads/unblock_host.json.resp new file mode 100644 index 0000000000..63fa551d82 --- /dev/null +++ b/plugins/cisco_asa/unit_test/payloads/unblock_host.json.resp @@ -0,0 +1,3 @@ +{ + "response" : [] +} diff --git a/plugins/cisco_asa/unit_test/test_block_host.py b/plugins/cisco_asa/unit_test/test_block_host.py new file mode 100644 index 0000000000..48993adf06 --- /dev/null +++ b/plugins/cisco_asa/unit_test/test_block_host.py @@ -0,0 +1,102 @@ +import sys +import os + +from unittest import TestCase +from icon_cisco_asa.actions.block_host import BlockHost +from icon_cisco_asa.actions.block_host.schema import Input, Output +from unit_test.util import Util +from unittest.mock import patch +from parameterized import parameterized +from insightconnect_plugin_runtime.exceptions import PluginException + +sys.path.append(os.path.abspath("../")) + + +@patch("requests.request", side_effect=Util.mocked_requests) +class TestGetBlockedHosts(TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.action = Util.default_connector(BlockHost()) + + @parameterized.expand( + [ + ["block_host", True, "1.1.1.1", None, None, None, None, True], + ["block_host2", True, "2.2.2.2", "3.3.3.3", 333, 444, "tcp", True], + ["block_host_without_ports", True, "2.2.2.2", "3.3.3.3", None, None, "tcp", True], + ["block_host_without_dest_ip", True, "2.2.2.2", None, 333, 444, "tcp", True], + ["block_host_without_protocol", True, "2.2.2.2", "3.3.3.3", 333, 444, None, True], + ["block_host_empty_strings", True, "1.1.1.1", "", "", "", "", True], + ["unblock_host", False, "1.1.1.1", None, None, None, None, True], + ] + ) + def test_block_host(self, mock_post, name, shun, source_ip, dest_ip, source_port, dest_port, protocol, expected): + actual = self.action.run( + { + Input.SHUN: shun, + Input.SOURCE_IP: source_ip, + Input.DESTINATION_IP: dest_ip, + Input.SOURCE_PORT: source_port, + Input.DESTINATION_PORT: dest_port, + Input.PROTOCOL: protocol, + } + ) + expected = {Output.SUCCESS: expected} + self.assertEqual(actual, expected) + + @parameterized.expand( + [ + [ + "block_host_invalid_source_ip", + True, + "999.999.999.999", + "2.2.2.2", + 333, + 444, + "tcp", + "Something unexpected occurred.", + "Check the logs and if the issue persists please contact support.", + 'Response was: {"response": "Error: Invalid Hostname"}', + ], + [ + "block_host_invalid_dest_ip", + True, + "1.1.1.1", + "999.999.999.999", + 333, + 444, + "tcp", + "Something unexpected occurred.", + "Check the logs and if the issue persists please contact support.", + 'Response was: {"response": "Error: Invalid Hostname"}', + ], + [ + "unblock_host_invalid_ip", + False, + "999.999.999.999", + None, + None, + None, + None, + "Something unexpected occurred.", + "Check the logs and if the issue persists please contact support.", + 'Response was: {"response": "Error: Invalid Hostname"}', + ], + ] + ) + def test_block_host_bad( + self, mock_post, name, shun, source_ip, dest_ip, source_port, dest_port, protocol, cause, assistance, data + ): + with self.assertRaises(PluginException) as e: + self.action.run( + { + Input.SHUN: shun, + Input.SOURCE_IP: source_ip, + Input.DESTINATION_IP: dest_ip, + Input.SOURCE_PORT: source_port, + Input.DESTINATION_PORT: dest_port, + Input.PROTOCOL: protocol, + } + ) + self.assertEqual(e.exception.cause, cause) + self.assertEqual(e.exception.assistance, assistance) + self.assertEqual(e.exception.data, data) diff --git a/plugins/cisco_asa/unit_test/test_get_blocked_hosts.py b/plugins/cisco_asa/unit_test/test_get_blocked_hosts.py new file mode 100644 index 0000000000..b8fb2b5323 --- /dev/null +++ b/plugins/cisco_asa/unit_test/test_get_blocked_hosts.py @@ -0,0 +1,211 @@ +import sys +import os + +from unittest import TestCase +from icon_cisco_asa.actions.get_blocked_hosts import GetBlockedHosts +from icon_cisco_asa.actions.get_blocked_hosts.schema import Output +from unit_test.util import Util +from unittest.mock import patch + +sys.path.append(os.path.abspath("../")) + + +class TestGetBlockedHosts(TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.action = Util.default_connector(GetBlockedHosts()) + + @patch("requests.request", side_effect=Util.mocked_requests) + def test_get_blocked_hosts(self, mock_post): + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "1.1.1.1", + "dest_ip": "2.2.2.2", + "source_port": "444", + "dest_port": "555", + "protocol": "6", + }, + { + "source_ip": "3.3.3.3", + "dest_ip": "4.4.4.4", + "source_port": "333", + "dest_port": "444", + "protocol": "6", + }, + ] + } + + self.assertEqual(actual, expected) + + class Response: + def __init__(self, text, status_code): + self.status_code = status_code + self.text = text + + def json(self): + return self.text + + @patch("requests.request") + def test_get_blocked_hosts_single_host(self, mock_post): + text = {"response": ["shun (management) 1.1.1.1 2.2.2.2 444 555 6\n"]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "1.1.1.1", + "dest_ip": "2.2.2.2", + "source_port": "444", + "dest_port": "555", + "protocol": "6", + } + ] + } + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_without_new_line(self, mock_post): + text = {"response": ["shun (management) 1.1.1.1 2.2.2.2 444 555 6"]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "1.1.1.1", + "dest_ip": "2.2.2.2", + "source_port": "444", + "dest_port": "555", + "protocol": "6", + } + ] + } + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_incomplete_response(self, mock_post): + text = {"response": ["shun (management) 1.1.1.1 2.2.2.2\n"]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = {Output.HOSTS: []} + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_incomplete_response2(self, mock_post): + text = {"response": ["shun (management) 1.1.1.1 2.2.2.2\nshun (management) 3.3.3.3 4.4.4.4 333 444 6\n"]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "3.3.3.3", + "dest_ip": "4.4.4.4", + "source_port": "333", + "dest_port": "444", + "protocol": "6", + } + ] + } + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_incomplete_response3(self, mock_post): + text = {"response": ["shun (management) 1.1.1.1 2.2.2.2 444 555 6\nshun (management) 3.3.3.3 4.4.4.4\n"]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "1.1.1.1", + "dest_ip": "2.2.2.2", + "source_port": "444", + "dest_port": "555", + "protocol": "6", + } + ] + } + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_bad_response(self, mock_post): + text = { + "response": [ + "shun (management) 1.1.1.1 2.2.2.2 444 555 6\n\n \n \n\n \nshun (management) 3.3.3.3 " + "4.4.4.4 333 444 6\n" + ] + } + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = { + Output.HOSTS: [ + { + "source_ip": "1.1.1.1", + "dest_ip": "2.2.2.2", + "source_port": "444", + "dest_port": "555", + "protocol": "6", + }, + { + "source_ip": "3.3.3.3", + "dest_ip": "4.4.4.4", + "source_port": "333", + "dest_port": "444", + "protocol": "6", + }, + ] + } + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_no_blocked_hosts(self, mock_post): + text = {"response": [""]} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = {Output.HOSTS: []} + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_empty_list(self, mock_post): + text = {"response": []} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = {Output.HOSTS: []} + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_response_as_string(self, mock_post): + text = {"response": "test"} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = {Output.HOSTS: []} + + self.assertEqual(actual, expected) + + @patch("requests.request") + def test_get_blocked_hosts_no_response(self, mock_post): + text = {"response": None} + mock_post.return_value = TestGetBlockedHosts.Response(text, 200) + actual = self.action.run() + + expected = {Output.HOSTS: []} + + self.assertEqual(actual, expected) diff --git a/plugins/cisco_asa/unit_test/util.py b/plugins/cisco_asa/unit_test/util.py new file mode 100644 index 0000000000..6cb50a0e64 --- /dev/null +++ b/plugins/cisco_asa/unit_test/util.py @@ -0,0 +1,69 @@ +import json +import logging +import os +from icon_cisco_asa.connection.connection import Connection +from icon_cisco_asa.connection.schema import Input + + +class Util: + @staticmethod + def default_connector(action): + default_connection = Connection() + default_connection.logger = logging.getLogger("connection logger") + params = { + Input.URL: "https://example.com", + Input.CREDENTIALS: {"password": "password", "username": "user"}, + Input.PORT: 443, + Input.SSL_VERIFY: True, + Input.USER_AGENT: "REST API Agent", + } + default_connection.connect(params) + action.connection = default_connection + action.logger = logging.getLogger("action logger") + return action + + @staticmethod + def read_file_to_string(filename): + with open(filename) as my_file: + return my_file.read() + + @staticmethod + def mocked_requests(*args, **kwargs): + class MockResponse: + def __init__(self, filename, status_code): + self.filename = filename + self.status_code = status_code + if self.filename == "invalid_hostname": + self.text = 'Response was: {"response": "Error: Invalid Hostname"}' + else: + self.text = "Error message" + + def json(self): + return json.loads( + Util.read_file_to_string( + os.path.join(os.path.dirname(os.path.realpath(__file__)), f"payloads/{self.filename}.json.resp") + ) + ) + + if kwargs.get("json") == {"commands": ["show shun"]}: + return MockResponse("blocked_hosts", 200) + if kwargs.get("json") == {"commands": ["shun 1.1.1.1 0.0.0.0 0 0 0"]}: + return MockResponse("block_host", 200) + if kwargs.get("json") == {"commands": ["shun 2.2.2.2 3.3.3.3 333 444 tcp"]}: + return MockResponse("block_host2", 200) + if kwargs.get("json") == {"commands": ["shun 2.2.2.2 3.3.3.3 0 0 tcp"]}: + return MockResponse("block_host2", 200) + if kwargs.get("json") == {"commands": ["shun 2.2.2.2 0.0.0.0 333 444 tcp"]}: + return MockResponse("block_host2", 200) + if kwargs.get("json") == {"commands": ["shun 2.2.2.2 3.3.3.3 333 444 0"]}: + return MockResponse("block_host2", 200) + if kwargs.get("json") == {"commands": ["shun 1.1.1.1 999.999.999.999 333 444 tcp"]}: + return MockResponse("invalid_hostname", 400) + if kwargs.get("json") == {"commands": ["shun 999.999.999.999 2.2.2.2 333 444 tcp"]}: + return MockResponse("invalid_hostname", 400) + if kwargs.get("json") == {"commands": ["no shun 1.1.1.1"]}: + return MockResponse("unblock_host", 200) + if kwargs.get("json") == {"commands": ["no shun 999.999.999.999"]}: + return MockResponse("invalid_hostname", 400) + + raise Exception("Not implemented")